Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
import paho.mqtt.client as mqtt
import sqlite3
import logging
import random
import queue
import threading
from flask import Flask, request, jsonify
# Configure logging
logging.basicConfig(level=logging.INFO)
# MQTT Broker address and port
broker_address = 'broker.emqx.io'
port = 1883
# Topic name
topic = '/sensor_data/'
# Initialize SQLite database connection
db_file = 'sensor_data.db'
def init_db():
with sqlite3.connect(db_file) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS SensorData (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
illuminance INTEGER,
motion_detected BOOLEAN
)
''')
conn.commit()
init_db()
# Queue for storing messages
message_queue = queue.Queue()
# Worker thread function for processing messages
def message_worker():
while True:
illuminance, motion_detected = message_queue.get()
if illuminance is None:
break
try:
with sqlite3.connect(db_file) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO SensorData (illuminance, motion_detected)
VALUES (?, ?)
''', (illuminance, motion_detected))
conn.commit()
if motion_detected:
if illuminance < 30:
logging.info('HIGH') # turn lights to high
elif illuminance < 70:
logging.info('MEDIUM') # turn lights to medium
else:
logging.info('LOW') # turn lights to low
else:
logging.info('close the lamp') # turn lights off
except Exception as e:
logging.error(f'Error processing message: {e}')
# Start worker thread
worker_thread = threading.Thread(target=message_worker)
worker_thread.start()
# Flask app initialization
app = Flask(__name__)
# Connect to the MQTT Broker
def connect() -> mqtt.Client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
logging.info('MQTT connected')
else:
logging.error(f'Connection failed with code {rc}')
def on_disconnect(client, userdata, rc):
if rc != 0:
logging.warning('Unexpected disconnection, attempting to reconnect')
client.reconnect()
client = mqtt.Client(f'python-mqtt-{random.randint(0, 1000)}')
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect(broker_address, port)
return client
# Subscribe and process messages
def subscribe(client: mqtt.Client):
def on_message(client, userdata, msg):
try:
data = msg.payload.decode().split(',')
if len(data) == 2:
try:
illuminance = int(data[0])
motion_detected = data[1].strip().lower() == 'true'
message_queue.put((illuminance, motion_detected))
except ValueError as e:
logging.error(f'Error converting message values: {e}')
else:
logging.error(f'Invalid message format: {msg.payload.decode()}')
except Exception as e:
logging.error(f'Error processing message: {e}')
client.subscribe(topic)
client.on_message = on_message
# Flask API endpoints
# 接收传感器数据的端点
@app.route('/api/sensor_data', methods=['POST'])
def receive_sensor_data():
data = request.json
illuminance = data.get('illuminance')
motion_detected = data.get('motion_detected')
if illuminance is None or motion_detected is None:
return jsonify({'error': 'Missing data parameters'}), 400
try:
message_queue.put((illuminance, motion_detected))
return jsonify({'message': 'Data received and processed successfully'}), 201
except Exception as e:
return jsonify({'error': f'Failed to process data: {str(e)}'}), 500
# 获取最新传感器数据的端点
@app.route('/api/latest_sensor_data', methods=['GET'])
def get_latest_sensor_data():
try:
with sqlite3.connect(db_file) as conn:
cursor = conn.cursor()
cursor.execute('SELECT * FROM SensorData ORDER BY timestamp DESC LIMIT 1')
row = cursor.fetchone()
if row:
sensor_data = {
'id': row[0],
'timestamp': row[1],
'illuminance': row[2],
'motion_detected': row[3]
}
return jsonify(sensor_data), 200
else:
return jsonify({'message': 'No sensor data available'}), 404
except Exception as e:
return jsonify({'error': f'Failed to fetch data: {str(e)}'}), 500
# 启动 Flask 应用
if __name__ == '__main__':
client = connect()
subscribe(client)
client.loop_start()
app.run(debug=True)
# 停止工作线程
message_queue.put((None, None))
worker_thread.join()