Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
//Publisher Code
import paho.mqtt.client as mqtt
import random
import time
import threading
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
# MQTT Broker address and port
broker_address = 'broker.emqx.io'
port = 1883
# Topic name
topic = '/sensor_data/'
# Connect to the MQTT Broker
def connect(name):
def on_connect(client, userdata, flags, rc):
if rc == 0:
logging.info(f'MQTT connected for {name}')
else:
logging.error(f'Connection failed for {name}')
client = mqtt.Client(f'python-mqtt-{random.randint(0, 1000)}')
client.on_connect = on_connect
client.connect(broker_address, port)
return client
# Publish sensor data
def publish(client):
while True:
illuminance = random.randint(0, 100)
motion = random.choice([True, False])
time.sleep(1)
msg = f"{illuminance},{motion}"
result = client.publish(topic, msg)
status = result.rc
if status == 0:
logging.info(f"Send `{msg}` to topic `{topic}`")
else:
logging.error(f"Failed to send message to topic {topic}")
client = connect('sensor')
thread = threading.Thread(target=publish, args=(client,))
thread.start()
client.loop_forever()
//Subscriber Code
import paho.mqtt.client as mqtt
import sqlite3
import time
import random
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
# MQTT Broker address and port
broker_address = 'broker.emqx.io'
port = 1883
# Topic name
topic = '/sensor_data/'
# Initialize SQLite database connection
db_file = 'sensor_data.db'
def init_db():
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS SensorData (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
illuminance INTEGER,
motion_detected BOOLEAN
)
''')
conn.commit()
conn.close()
init_db()
# Connect to the MQTT Broker
def connect() -> mqtt.Client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
logging.info('MQTT connected')
else:
logging.error('Connection failed')
client = mqtt.Client(f'python-mqtt-{random.randint(0, 1000)}')
client.on_connect = on_connect
client.connect(broker_address, port)
return client
# Subscribe and process messages
def subscribe(client: mqtt.Client):
def on_message(client, userdata, msg):
try:
data = msg.payload.decode().split(',')
if len(data) == 2:
illuminance = int(data[0])
motion_detected = data[1].lower() == 'true'
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO SensorData (illuminance, motion_detected)
VALUES (?, ?)
''', (illuminance, motion_detected))
conn.commit()
conn.close()
if motion_detected:
if illuminance < 30:
logging.info('HIGH') # turn lights to high
elif illuminance < 70:
logging.info('MEDIUM') # turn lights to medium
else:
logging.info('LOW') # turn lights to low
else:
logging.info('close the lamp') # turn lights off
else:
logging.error('Invalid message format')
except ValueError:
logging.error('Error in message conversion')
time.sleep(1.1)
client.subscribe(topic)
client.on_message = on_message
client = connect()
subscribe(client)
client.loop_forever()