Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
import pyaudio
import struct
import numpy as np
import paho.mqtt.client as mqtt
from tkinter import messagebox
import pygame
import time
# MQTT settings
MQTT_BROKER = "broker.hivemq.com"
MQTT_PORT = 1883
MQTT_TOPIC_SOUND = "home/sound_detected"
# Sound detection settings
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
THRESHOLD = 180
# Alarm sound file path
ALARM_SOUND_PATH = "alarm.mp3"
# Initialize pygame for sound playback
pygame.mixer.init()
alarm_sound = pygame.mixer.Sound(ALARM_SOUND_PATH)
# Initialize MQTT client
client = mqtt.Client()
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
# Initialize PyAudio
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
def detect_sound():
"""Detects sound based on RMS value."""
try:
data = stream.read(CHUNK, exception_on_overflow=False)
samples = struct.unpack(str(CHUNK) + 'h', data)
rms = np.sqrt(np.mean(np.square(samples)))
return rms > THRESHOLD
except Exception as e:
print(f"Error in detect_sound: {e}")
return False
def play_alarm():
"""Plays the alarm sound."""
alarm_sound.play()
messagebox.showinfo(title="Alert", message="Sound detected")
time.sleep(alarm_sound.get_length()) # Wait until the sound finishes
try:
while True:
if detect_sound():
play_alarm()
client.publish(MQTT_TOPIC_SOUND, "Sound Detected")
except KeyboardInterrupt:
print("Exiting...")
# Clean up
stream.stop_stream()
stream.close()
p.terminate()
client.loop_stop()
client.disconnect()