Permalink
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
only_motion_detector/camera.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
164 lines (132 sloc)
6.41 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
import time | |
import paho.mqtt.client as mqtt | |
import os | |
from datetime import datetime | |
import firebase_admin | |
from firebase_admin import credentials, storage | |
from playsound import playsound | |
# MQTT settings | |
MQTT_BROKER = "broker.hivemq.com" | |
MQTT_PORT = 1883 | |
MQTT_TOPIC = "home/camera_motion" | |
# Firebase settings | |
FIREBASE_CREDENTIALS_PATH = "ServiceAccountKey.json" | |
FIREBASE_STORAGE_BUCKET = "only-motion-detector.appspot.com" | |
# Alarm sound file path | |
ALARM_SOUND_PATH = "alarm.mp3" | |
# Initialize Firebase Admin SDK | |
cred = credentials.Certificate(FIREBASE_CREDENTIALS_PATH) | |
firebase_admin.initialize_app(cred, {'storageBucket': FIREBASE_STORAGE_BUCKET}) | |
bucket = storage.bucket() | |
# Initialize MQTT client | |
client = mqtt.Client() | |
client.connect(MQTT_BROKER, MQTT_PORT) | |
# Initialize the camera | |
macbook_camera = cv2.VideoCapture(0) | |
iphone_camera = cv2.VideoCapture(1) | |
# Get the video frame width and height for camera | |
macbook_camera.set(cv2.CAP_PROP_FRAME_WIDTH, 740) | |
macbook_camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) | |
iphone_camera.set(cv2.CAP_PROP_FRAME_WIDTH, 740) | |
iphone_camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) | |
frame_width_macbook = int(macbook_camera.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
frame_height_macbook = int(macbook_camera.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
frame_width_iphone = int(iphone_camera.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
frame_height_iphone = int(iphone_camera.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
# Create directory for recordings if it doesn't exist | |
recordings_folder = "recordings" | |
os.makedirs(recordings_folder, exist_ok=True) | |
# Generate a unique filename with timestamp for MacBook camera | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
mp4_video_path_macbook = os.path.join(recordings_folder, f'motion_detected_macbook_{timestamp}.mp4') | |
# Define the codec and create a VideoWriter object for MacBook camera | |
fourcc_macbook = cv2.VideoWriter_fourcc(*'mp4v') | |
out_macbook = cv2.VideoWriter(mp4_video_path_macbook, fourcc_macbook, 20.0, (frame_width_macbook, frame_height_macbook)) | |
# Optional: Generate a unique filename with timestamp for iPhone camera | |
mp4_video_path_iphone = os.path.join(recordings_folder, f'motion_detected_iphone_{timestamp}.mp4') | |
fourcc_iphone = cv2.VideoWriter_fourcc(*'mp4v') | |
out_iphone = cv2.VideoWriter(mp4_video_path_iphone, fourcc_iphone, 20.0, (frame_width_iphone, frame_height_iphone)) | |
# Give some time for cameras to warm up | |
time.sleep(2) | |
# Read the first frame from MacBook camera | |
ret_macbook, frame1_macbook = macbook_camera.read() | |
ret_macbook, frame2_macbook = macbook_camera.read() | |
# Read the first frame from iPhone camera (if available) | |
ret_iphone, frame1_iphone = iphone_camera.read() | |
ret_iphone, frame2_iphone = iphone_camera.read() | |
while True: | |
# Process MacBook camera frames | |
if ret_macbook: | |
# Compute the absolute difference between the two frames | |
diff_macbook = cv2.absdiff(frame1_macbook, frame2_macbook) | |
gray_macbook = cv2.cvtColor(diff_macbook, cv2.COLOR_BGR2GRAY) | |
blur_macbook = cv2.GaussianBlur(gray_macbook, (5, 5), 0) | |
_, thresh_macbook = cv2.threshold(blur_macbook, 20, 255, cv2.THRESH_BINARY) | |
dilated_macbook = cv2.dilate(thresh_macbook, None, iterations=3) | |
contours_macbook, _ = cv2.findContours(dilated_macbook, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) | |
motion_detected_macbook = False | |
for contour in contours_macbook: | |
if cv2.contourArea(contour) < 500: | |
continue | |
(x, y, w, h) = cv2.boundingRect(contour) | |
cv2.rectangle(frame1_macbook, (x, y), (x + w, y + h), (0, 255, 0), 2) | |
motion_detected_macbook = True | |
# If motion is detected, publish to MQTT, print message, and sound alarm | |
if motion_detected_macbook: | |
client.publish(MQTT_TOPIC, "Motion Detected (MacBook)") | |
print("Motion Detected (MacBook)") | |
playsound(ALARM_SOUND_PATH) | |
# Write the frame to the output file for MacBook camera | |
out_macbook.write(frame1_macbook) | |
# Show the frame with contours for MacBook camera | |
cv2.imshow("Feed (MacBook)", frame1_macbook) | |
# Update the frames for MacBook camera | |
frame1_macbook = frame2_macbook | |
ret_macbook, frame2_macbook = macbook_camera.read() | |
# Process iPhone camera frames (if available) | |
if ret_iphone: | |
# Compute the absolute difference between the two frames (example) | |
diff_iphone = cv2.absdiff(frame1_iphone, frame2_iphone) | |
gray_iphone = cv2.cvtColor(diff_iphone, cv2.COLOR_BGR2GRAY) | |
blur_iphone = cv2.GaussianBlur(gray_iphone, (5, 5), 0) | |
_, thresh_iphone = cv2.threshold(blur_iphone, 20, 255, cv2.THRESH_BINARY) | |
dilated_iphone = cv2.dilate(thresh_iphone, None, iterations=3) | |
contours_iphone, _ = cv2.findContours(dilated_iphone, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) | |
motion_detected_iphone = False | |
for contour in contours_iphone: | |
if cv2.contourArea(contour) < 500: | |
continue | |
(x, y, w, h) = cv2.boundingRect(contour) | |
cv2.rectangle(frame1_iphone, (x, y), (x + w, y + h), (0, 255, 0), 2) | |
motion_detected_iphone = True | |
# If motion is detected, publish to MQTT, print message, and sound alarm | |
if motion_detected_iphone: | |
client.publish(MQTT_TOPIC, "Motion Detected (iPhone)") | |
print("Motion Detected (iPhone)") | |
playsound(ALARM_SOUND_PATH) | |
# Write the frame to the output file for iPhone camera (if available) | |
if iphone_camera.isOpened(): | |
out_iphone.write(frame1_iphone) | |
# Show the frame with contours for iPhone camera (if available) | |
cv2.imshow("Feed (iPhone)", frame1_iphone) | |
# Update the frames for iPhone camera (if available) | |
frame1_iphone = frame2_iphone | |
ret_iphone, frame2_iphone = iphone_camera.read() | |
# Exit on 'q' key press | |
if cv2.waitKey(10) & 0xFF == ord('q'): | |
break | |
# Upload to Firebase Cloud Storage | |
def upload_to_firebase(file_path, blob_name): | |
blob = bucket.blob(blob_name) | |
blob.upload_from_filename(file_path) | |
print(f'File {file_path} uploaded to {blob_name}.') | |
upload_to_firebase(mp4_video_path_macbook, f'motion_detected_macbook_{timestamp}.mp4') | |
upload_to_firebase(mp4_video_path_iphone, f'motion_detected_iphone_{timestamp}.mp4') | |
# Clean up | |
macbook_camera.release() | |
out_macbook.release() | |
if iphone_camera.isOpened(): | |
iphone_camera.release() | |
out_iphone.release() | |
cv2.destroyAllWindows() |