Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from cv2_41 import cv2
- import numpy as np
- import logging
- import logging.handlers
- import os
- import time
- # ============================================================================
- def init_logging(log_to_console = True):
- import sys
- main_logger = logging.getLogger()
- LOG_FILENAME = 'debug.log'
- # Check if log exists and should therefore be rolled
- needRoll = os.path.isfile(LOG_FILENAME)
- formatter = logging.Formatter(
- fmt='%(asctime)s.%(msecs)03d %(levelname)-8s <%(threadName)s> [%(name)s] %(message)s'
- , datefmt='%Y-%m-%d %H:%M:%S')
- handler_file = logging.handlers.RotatingFileHandler(LOG_FILENAME
- , maxBytes = 2**24
- , backupCount = 10)
- handler_file.setFormatter(formatter)
- main_logger.addHandler(handler_file)
- if not sys.executable.endswith("pythonw.exe") and log_to_console:
- handler_stream = logging.StreamHandler(sys.stdout)
- handler_stream.setFormatter(formatter)
- main_logger.addHandler(handler_stream)
- main_logger.setLevel(logging.DEBUG)
- if needRoll:
- # Roll over on application start
- handler_file.doRollover()
- # ============================================================================
- FILE_NAME = 'testGood.mp4'
- MIN_CONTOUR_AREA = 1350
- MAX_CONTOUR_AREA = 3500
- # Region of interest for processing
- ROI_POINTS = [
- (400, 0) # Top Left
- , (400, 800) # Bottom Left
- , (1480, 800) # Bottom Right
- , (1480, 0) # Top Right
- ]
- TRACKING_BOX_SCALE = (1.10, 1.10)
- # Colors for visualization
- LABEL_COLOR = (0, 200, 255)
- STALE_COLOR = (127, 255, 255)
- TRACKED_COLOR = (127, 255, 127)
- UNTRACKED_COLOR = (127, 127, 255)
- # ============================================================================
- class IDManager(object):
- def __init__(self):
- self.available_ids = set()
- self.next_id = 0
- def acquire_id(self):
- if self.available_ids:
- return self.available_ids.pop()
- new_id = self.next_id
- self.next_id += 1
- return new_id
- def release_id(self, id):
- self.available_ids.add(id)
- # ============================================================================
- class TrackingEntry(object):
- def __init__(self, id, frame_number, frame, bounding_box):
- self.id = id
- self.first_frame_number = frame_number
- self.tracker = cv2.TrackerCSRT_create()
- self.bounding_boxes = [bounding_box]
- extended_bounding_box = (bounding_box[0]
- , bounding_box[1]
- , TRACKING_BOX_SCALE[0] * bounding_box[2]
- , TRACKING_BOX_SCALE[1] * bounding_box[3])
- self.tracker.init(frame, extended_bounding_box)
- def __repr__(self):
- return "TrackingEntry(id=%d,first_frame=%d,bounds=%s)" % (self.id, self.first_frame_number, self.bounding_boxes)
- @property
- def last_bounding_box(self):
- return self.bounding_boxes[-1]
- def update(self, frame):
- return self.tracker.update(frame)
- def add_bounding_box(self, bounding_box):
- self.bounding_boxes.append(bounding_box)
- def add_missed_detection(self):
- self.bounding_boxes.append(None)
- @property
- def detection_count(self):
- return len(self.bounding_boxes)
- # ============================================================================
- class FrameExtractor(object):
- def __init__(self, frame_shape, roi_points, bg_subtractor):
- self.frame_shape = frame_shape
- self.roi_bounding_box = cv2.boundingRect(roi_points)
- self.bg_subtractor = bg_subtractor
- self._create_roi_mask(roi_points)
- def _create_roi_mask(self, vertices):
- mask = np.zeros(self.frame_shape, np.uint8)
- color = (255, ) * self.frame_shape[2]
- cv2.fillPoly(mask, vertices, color)
- x,y,w,h = self.roi_bounding_box
- mask = cv2.pyrDown(mask[y:y+h,x:x+w])
- if (mask.size - cv2.countNonZero(mask.flatten())) == 0:
- mask = None
- self.roi_mask = mask
- def extract_frame(self, source_image):
- x,y,w,h = self.roi_bounding_box
- source_roi = source_image[y:y+h,x:x+w]
- #drop resolution of working frames
- source_roi = cv2.pyrDown(source_roi)
- #isolate region of interest
- roi = cv2.bitwise_and(source_roi, self.roi_mask) if self.roi_mask is not None else source_roi
- #apply background subraction
- fgmask = self.bg_subtractor.apply(roi)
- #remove shadow pixels and replace them with black pixels
- _,thresh = cv2.threshold(fgmask, 127, 255, cv2.THRESH_BINARY)
- return roi, thresh
- # ============================================================================
- def update_tracked_objects(tracked_objects, frame):
- #update existing IDs
- for tracked in tracked_objects.itervalues():
- success, box = tracked.update(frame)
- if success:
- x,y,w,h = [int(v) for v in box]
- tracked.add_bounding_box([x,y,w,h])
- else:
- tracked.add_missed_detection()
- # ----------------------------------------------------------------------------
- def visualize_tracked_objects(tracked_objects, frame, color):
- for tracked in tracked_objects.itervalues():
- if tracked.last_bounding_box:
- x,y,w,h = tracked.last_bounding_box
- cv2.rectangle(frame, (x,y), (x+w, y+h), color, 4)
- # ============================================================================
- def detect_stale_objects(tracked_objects):
- # check for tracking which has stopped or tracking which hasnt moved
- del_list = []
- for tracked in tracked_objects.itervalues():
- n = tracked.detection_count - 1
- if n <= 0:
- continue
- bounds = tracked.bounding_boxes
- if (bounds[n] == bounds[n-1]) and (bounds[0] != bounds[n]):
- if (bounds[n][1] > bounds[0][1]):
- del_list.append((tracked.id, True, 'Counted(1)'))
- else:
- del_list.append((tracked.id, False, 'Discarded(1)'))
- elif (n > 5) and (bounds[n] == bounds[n-1]) and (bounds[0] == bounds[n]):
- del_list.append((tracked.id, False, 'Discarded(2)'))
- elif bounds[-1] == None:
- del_list.append((tracked.id, True, 'Counted(2)'))
- return del_list
- # ----------------------------------------------------------------------------
- def visualize_stale_objects(tracked_objects, stale_list, frame, color, text_color):
- for id, to_count, label in stale_list:
- tracked = tracked_objects[id]
- pos = (tracked.bounding_boxes[-2][0], tracked.bounding_boxes[-2][1] - 10)
- cv2.putText(frame, label, pos, cv2.FONT_HERSHEY_SIMPLEX, 0.6, text_color, 2)
- if tracked.last_bounding_box:
- x,y,w,h = tracked.last_bounding_box
- cv2.rectangle(frame, (x,y), (x+w, y+h), color, 2)
- # ----------------------------------------------------------------------------
- def visualize_total_count(count, frame, color):
- msg = ('Count = %d' % count)
- cv2.putText(frame, msg, (20, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
- # ----------------------------------------------------------------------------
- def update_count(count, stale_list):
- for _, to_count, _ in stale_list:
- if to_count:
- count += 1
- return count
- # ----------------------------------------------------------------------------
- def retire_stale_objects(tracked_objects, id_manager, stale_list):
- logger = logging.getLogger('retire_stale_objects')
- for id, to_count, label in stale_list:
- tracked = tracked_objects[id]
- del tracked_objects[id]
- id_manager.release_id(id)
- logger.debug("Retired: '%s' -- %s", label, tracked)
- del tracked
- logger.debug("There are %d tracked objects remaining.", len(tracked_objects))
- # ============================================================================
- def is_valid_countour(contour):
- contour_area = cv2.contourArea(contour)
- return (MIN_CONTOUR_AREA <= contour_area <= MAX_CONTOUR_AREA)
- # ----------------------------------------------------------------------------
- def detect_objects(frame):
- contours, _ = cv2.findContours(frame, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
- object_list = []
- for contour in contours:
- if not is_valid_countour(contour):
- continue
- bounding_box = list(cv2.boundingRect(contour))
- object_list.append(bounding_box)
- return object_list
- # ----------------------------------------------------------------------------
- def find_movement(thresh1, thresh2):
- return detect_objects(cv2.absdiff(thresh1, thresh2))
- # ----------------------------------------------------------------------------
- def is_already_tracked(tracked_objects, bounding_box):
- (x1,y1,w1,h1) = bounding_box
- for entry in tracked_objects.itervalues():
- (x2,y2,w2,h2) = entry.last_bounding_box
- if (x2 < (x1 + w1 / 2) < (x2 + w2)) and (y2 < (y1 + h1 / 2) < (y2 + h2)):
- return True
- return False
- # ----------------------------------------------------------------------------
- def detect_untracked(tracked_objects, thresh):
- # Check if movement was being tracked
- object_list = detect_objects(thresh)
- untracked_list = []
- for bounding_rect in object_list:
- if not is_already_tracked(tracked_objects, bounding_rect):
- untracked_list.append(bounding_rect)
- return untracked_list
- # ----------------------------------------------------------------------------
- def visualize_untracked(untracked_list, frame, color):
- for untracked in untracked_list:
- (x,y,w,h) = untracked
- cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
- # ----------------------------------------------------------------------------
- def register_untracked(tracked_objects, id_manager, untracked_list, frame_number, frame):
- logger = logging.getLogger('process_video_stream')
- # Assign tracking
- for untracked in untracked_list:
- new_id = id_manager.acquire_id()
- new_entry = TrackingEntry(new_id, frame_number, frame, untracked)
- tracked_objects[new_id] = new_entry
- logger.debug('Registered new object: %s' % new_entry)
- # ============================================================================
- def process_video_stream(cap):
- logger = logging.getLogger('process_video_stream')
- frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- frame_shape = (frame_height, frame_width, 3)
- # Declare region of interest eliminating some background issues
- # and use it for isolation mask
- roi_points_array = np.array([ROI_POINTS], np.int32)
- logger.debug("ROI points: %s", ROI_POINTS)
- # Background removal initiation either KNN or MOG2, KNN yeilded best results in testing
- bg_subtractor = cv2.createBackgroundSubtractorKNN()
- frame_extractor = FrameExtractor(frame_shape, roi_points_array, bg_subtractor)
- # Grab initial frame
- check, raw_frame = cap.read()
- if not check:
- raise RuntimeError("No frames in the video stream.")
- frame_number = 0
- # Preprocess the first frame
- curr_frame, curr_thresh = frame_extractor.extract_frame(raw_frame)
- tracked_objects = {}
- id_manager = IDManager()
- count = 0
- # Main loop
- while True:
- prev_frame, prev_thresh = curr_frame, curr_thresh
- # Read new frames until no more are left
- check, raw_frame = cap.read()
- if not check:
- logger.debug("Reached end of stream.")
- break
- frame_number += 1
- logger.debug("Frame #%04d started.", frame_number)
- curr_frame, curr_thresh = frame_extractor.extract_frame(raw_frame)
- if tracked_objects:
- logger.debug("* Updating %d tracked object%s..."
- , len(tracked_objects)
- , ('s' if len(tracked_objects) != 1 else ''))
- update_tracked_objects(tracked_objects, prev_frame)
- visualize_tracked_objects(tracked_objects, prev_frame, TRACKED_COLOR)
- stale_list = detect_stale_objects(tracked_objects)
- if stale_list:
- logger.debug("* Retiring %d stale tracked object%s..."
- , len(stale_list)
- , ('s' if len(stale_list) != 1 else ''))
- visualize_stale_objects(tracked_objects, stale_list, prev_frame, STALE_COLOR, LABEL_COLOR)
- count = update_count(count, stale_list)
- retire_stale_objects(tracked_objects, id_manager, stale_list)
- visualize_total_count(count, prev_frame, LABEL_COLOR)
- # Find movement
- movement_bounds = find_movement(prev_thresh, curr_thresh)
- if movement_bounds:
- untracked_list = detect_untracked(tracked_objects, prev_thresh)
- logger.debug("* Detected %d movement%s and %d untracked object%s."
- , len(movement_bounds), ('s' if len(movement_bounds) != 1 else '')
- , len(untracked_list), ('s' if len(untracked_list) != 1 else ''))
- visualize_untracked(untracked_list, prev_frame, UNTRACKED_COLOR)
- register_untracked(tracked_objects, id_manager, untracked_list, frame_number, prev_frame)
- logger.debug("Frame #%04d complete.", frame_number)
- # Visualization
- cv2.imshow("Frame", prev_frame)
- key = cv2.waitKey(1) & 0xFF
- if key == 27:
- logger.debug("Exit requested.")
- break
- # TODO Handle remaining tracked objects
- if tracked_objects:
- logger.debug("There %s %d tracked object%s remaining."
- , ('are' if len(tracked_objects) != 1 else 'is')
- , len(tracked_objects)
- , ('s' if len(tracked_objects) != 1 else ''))
- return count
- # ============================================================================
- def main():
- logger = logging.getLogger('main')
- # Initialize video input
- cap = cv2.VideoCapture(FILE_NAME)
- if not cap.isOpened():
- raise RuntimeError("Unable to open file '%s'%." % FILE_NAME)
- count = process_video_stream(cap)
- logger.debug("TOTAL COUNT = %d", count)
- cap.release()
- cv2.destroyAllWindows()
- # ============================================================================
- if __name__ == "__main__":
- init_logging(True)
- start_time = time.time()
- main()
- print("\nRuntime: %s seconds" % (time.time() - start_time))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement