Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import logging
- import logging.handlers
- import os
- import time
- import sys
- import cv2
- import numpy as np
- # ============================================================================
- CASCADE_FILENAME = 'data/haarcascade_frontalface_alt.xml'
- MASK_FILENAME = 'data/mask.png'
- IMAGE_DIR = "images"
- IMAGE_FILENAME_FORMAT = IMAGE_DIR + "/frame_%04d.png"
- # Support either live acquisition or replay of saved frames
- CAPTURE_FROM_CAMERA = True
- if CAPTURE_FROM_CAMERA:
- IMAGE_SOURCE = 0 # Default camera
- else:
- IMAGE_SOURCE = IMAGE_FILENAME_FORMAT # Image sequence
- # Time to wait between frames, 0=forever
- WAIT_TIME = 2500 # ms
- SCALING_FACTOR = 0.5
- LOG_TO_FILE = False
- # ============================================================================
- def init_logging():
- main_logger = logging.getLogger()
- formatter = logging.Formatter(
- fmt='%(asctime)s.%(msecs)03d %(levelname)-8s [%(name)s] %(message)s'
- , datefmt='%Y-%m-%d %H:%M:%S')
- handler_stream = logging.StreamHandler(sys.stdout)
- handler_stream.setFormatter(formatter)
- main_logger.addHandler(handler_stream)
- if LOG_TO_FILE:
- handler_file = logging.handlers.RotatingFileHandler("debug.log"
- , maxBytes = 2**24
- , backupCount = 10)
- handler_file.setFormatter(formatter)
- main_logger.addHandler(handler_file)
- main_logger.setLevel(logging.DEBUG)
- return main_logger
- # ============================================================================
- def process_frame(frame_number, frame, face_cascade, face_mask, scaling_factor):
- log = logging.getLogger("process_frame")
- resized_frame = cv2.resize(frame
- , None
- , fx=scaling_factor
- , fy=scaling_factor
- , interpolation=cv2.INTER_AREA)
- log.debug("Resized frame: shape=%s", resized_frame.shape)
- # Let's make a copy to draw into
- processed_frame = resized_frame.copy()
- grayscale_frame = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2GRAY)
- CASCADE_SCALE_FACTOR = 1.3
- CASCADE_MIN_NEIGHBORS = 5
- log.debug("Detecting faces...")
- face_rects = face_cascade.detectMultiScale(grayscale_frame
- , scaleFactor=CASCADE_SCALE_FACTOR
- , minNeighbors=CASCADE_MIN_NEIGHBORS)
- log.debug("Got %d matches.", len(face_rects))
- for (i, face_rect) in enumerate(face_rects):
- (x,y,w,h) = face_rect
- log.debug("Match #%d: pos=(x=%d, y=%d) size=(w=%d, h=%d)", i, x, y, w, h)
- if (h <= 0) or (w <= 0):
- log.debug("Invalid match size, skipping.")
- continue
- # Q: What's the reason for this?
- h2 = int(1.4 * h)
- w2 = int(1.0 * w)
- y2 = y - 0.1 * h
- x2 = x
- log.debug("Scaled bounding box: pos=(x=%d, y=%d) size=(w=%d, h=%d)", x2, y2, w2, h2)
- frame_roi = processed_frame[y2:y2+h2, x2:x2+w2]
- log.debug("Frame ROI: shape=%s", frame_roi.shape)
- resized_face_mask = cv2.resize(face_mask, (w2, h2), interpolation=cv2.INTER_AREA)
- log.debug("Resized face mask: shape=%s", resized_face_mask.shape)
- grayscale_face_mask = cv2.cvtColor(resized_face_mask, cv2.COLOR_BGR2GRAY)
- ret, mask = cv2.threshold(grayscale_face_mask, 180, 255, cv2.THRESH_BINARY_INV)
- mask_inv = cv2.bitwise_not(mask)
- masked_face = cv2.bitwise_and(resized_face_mask, resized_face_mask, mask=mask)
- masked_frame = cv2.bitwise_and(frame_roi, frame_roi, mask=mask_inv)
- processed_frame[y2:y2+h2, x2:x2+w2] = cv2.add(masked_face, masked_frame)
- return processed_frame
- # ============================================================================
- def main():
- log = logging.getLogger("main")
- # Create the cascade classifier
- log.debug("Loading cascade classifier `%s`...", CASCADE_FILENAME)
- face_cascade = cv2.CascadeClassifier(CASCADE_FILENAME)
- if face_cascade.empty():
- raise IOError('Unable to load the face cascade classifier xml file')
- # Load and prepare the face mask
- log.debug("Loading face mask `%s`...", MASK_FILENAME)
- face_mask = cv2.imread(MASK_FILENAME)
- if face_mask is None:
- raise IOError('Unable to load the face mask image')
- log.debug("Face mask loaded, shape=%s", face_mask.shape)
- # Set up image source
- log.debug("Initializing video capture device #%d...", IMAGE_SOURCE)
- cap = cv2.VideoCapture(IMAGE_SOURCE)
- frame_width = cap.get(cv2.cv.CV_CAP_PROP_FRAME_WIDTH)
- frame_height = cap.get(cv2.cv.CV_CAP_PROP_FRAME_HEIGHT)
- log.debug("Video capture frame size=(w=%d, h=%d)", frame_width, frame_height)
- log.debug("Starting capture loop...")
- frame_number = -1
- while True:
- frame_number += 1
- log.debug("Capturing frame #%d...", frame_number)
- ret, frame = cap.read()
- if not ret:
- log.error("Frame capture failed, stopping...")
- break
- log.debug("Got frame #%d: shape=%s", frame_number, frame.shape)
- # Archive raw frames from camera to disk, so we can play them back later
- if CAPTURE_FROM_CAMERA:
- file_name = IMAGE_FILENAME_FORMAT % frame_number
- log.debug("Saving frame #%d as '%s'", frame_number, file_name)
- cv2.imwrite(file_name, frame)
- log.debug("Processing frame #%d...", frame_number)
- processed = process_frame(frame_number, frame, face_cascade, face_mask, SCALING_FACTOR)
- cv2.imshow('Source Image', frame)
- cv2.imshow('Processed Image', processed)
- log.debug("Frame #%d processed.", frame_number)
- c = cv2.waitKey(WAIT_TIME)
- if c == 27:
- log.debug("ESC detected, stopping...")
- break
- log.debug("Closing video capture device...")
- cap.release()
- cv2.destroyAllWindows()
- log.debug("Done.")
- # ============================================================================
- if __name__ == "__main__":
- log = init_logging()
- if not os.path.exists(IMAGE_DIR):
- log.debug("Creating image directory `%s`...", IMAGE_DIR)
- os.makedirs(IMAGE_DIR)
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement