diff --git a/mqtt/subscriber.py b/mqtt/subscriber.py new file mode 100644 index 0000000..434a68a --- /dev/null +++ b/mqtt/subscriber.py @@ -0,0 +1,75 @@ +import paho.mqtt.client as mqtt +import json +import logging +from datetime import datetime + +# ── Logging setup ───────────────────────────────────────────────────────────── +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" +) +logger = logging.getLogger(__name__) + + +def load_config(path="config.json") -> dict: + """Load broker settings and topics from config.json.""" + with open(path, "r") as f: + return json.load(f) + + +def on_connect(client, userdata, flags, reason_code, properties=None): + if reason_code == 0: + config = userdata["config"] + broker = config["broker"]["address"] + port = config["broker"]["port"] + logger.info("Connected to HiveMQ broker at %s:%s", broker, port) + + # Subscribe to every topic defined in config.json + topics = config["topics"] + for room, sensors in topics.items(): + for sensor_type, topic_string in sensors.items(): + client.subscribe(topic_string) + logger.info("Subscribed to: %s", topic_string) + else: + logger.error("Failed to connect, reason code: %s", reason_code) + + +def on_message(client, userdata, msg): + topic = msg.topic + payload = msg.payload.decode("utf-8", errors="replace") + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + print(f"\n[{timestamp}] Message received") + print(f" Topic : {topic}") + print(f" Payload : {payload}") + print("-" * 50) + + +def on_disconnect(client, userdata, reason_code, properties=None): + if reason_code != 0: + logger.warning("Unexpected disconnect (code %s).", reason_code) + + +def main(): + config = load_config("config.json") + + broker = config["broker"]["address"] + port = config["broker"]["port"] + + client = mqtt.Client( + mqtt.CallbackAPIVersion.VERSION2, + userdata={"config": config} + ) + + client.on_connect = on_connect + client.on_message = on_message + client.on_disconnect = on_disconnect + + logger.info("Connecting to HiveMQ broker at %s:%s ...", broker, port) + client.connect(broker, port, keepalive=60) + client.loop_forever() + + +if __name__ == "__main__": + main() diff --git a/subscriber.py b/subscriber.py new file mode 100644 index 0000000..bad03b2 --- /dev/null +++ b/subscriber.py @@ -0,0 +1,141 @@ +import paho.mqtt.client as mqtt +import json +import logging +from datetime import datetime + +# ── Logging setup ──────────────────────────────────────────────────────────── +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" +) +logger = logging.getLogger(__name__) + +# ── Broker config ───────────────────────────────────────────────────────────── +BROKER_HOST = "localhost" +BROKER_PORT = 1883 +TOPIC = "campus/#" + +# ── Required fields and their expected types ────────────────────────────────── +REQUIRED_FIELDS = { + "room": str, + "sensor_type": str, + "value": (int, float), + "unit": str, + "timestamp": str, +} + + +def validate_message(payload: str) -> tuple[bool, dict | None, str]: + """ + Validate an incoming MQTT message payload. + + Returns: + (True, parsed_data, "") on success + (False, None, error_message) on failure + """ + + # 1. Must be valid JSON + try: + data = json.loads(payload) + except json.JSONDecodeError as e: + return False, None, f"Invalid JSON: {e}" + + # 2. Must be a JSON object (not a list or primitive) + if not isinstance(data, dict): + return False, None, "Message must be a JSON object, not a list or primitive" + + # 3. Check all required fields are present and correct type + for field, expected_type in REQUIRED_FIELDS.items(): + if field not in data: + return False, None, f"Missing required field: '{field}'" + + if not isinstance(data[field], expected_type): + expected_name = ( + expected_type.__name__ + if not isinstance(expected_type, tuple) + else " or ".join(t.__name__ for t in expected_type) + ) + actual_name = type(data[field]).__name__ + return False, None, ( + f"Field '{field}' must be {expected_name}, " + f"got {actual_name} (value: {repr(data[field])})" + ) + + # 4. value must be a real number (reject NaN / Infinity) + import math + if math.isnan(data["value"]) or math.isinf(data["value"]): + return False, None, f"Field 'value' must be a finite number, got {data['value']}" + + return True, data, "" + + +def insert_to_database(data: dict): + """ + Placeholder for Timi's database insert function. + Replace this import and call once database.py is ready. + + Expected usage: + from database import insert_reading + insert_reading(data) + """ + # TODO: uncomment when Timi's database.py is ready + # from database import insert_reading + # insert_reading(data) + + # Temporary confirmation log until DB is connected + logger.info( + "VALID | room=%s sensor=%s value=%s%s time=%s", + data["room"], + data["sensor_type"], + data["value"], + data["unit"], + data["timestamp"], + ) + + +def on_connect(client, userdata, flags, reason_code, properties=None): + if reason_code == 0: + logger.info("Connected to broker at %s:%s", BROKER_HOST, BROKER_PORT) + client.subscribe(TOPIC) + logger.info("Subscribed to topic: %s", TOPIC) + else: + logger.error("Failed to connect, reason code: %s", reason_code) + + +def on_message(client, userdata, msg): + topic = msg.topic + payload = msg.payload.decode("utf-8", errors="replace") + + logger.debug("Received message on topic: %s", topic) + + is_valid, data, error = validate_message(payload) + + if not is_valid: + logger.warning( + "REJECTED | topic=%s | reason=%s | raw=%s", + topic, error, payload[:120] + ) + return + + insert_to_database(data) + + +def on_disconnect(client, userdata, reason_code, properties=None): + if reason_code != 0: + logger.warning("Unexpected disconnect (code %s). Will attempt reconnect.", reason_code) + + +def main(): + client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + client.on_connect = on_connect + client.on_message = on_message + client.on_disconnect = on_disconnect + + logger.info("Connecting to broker at %s:%s ...", BROKER_HOST, BROKER_PORT) + client.connect(BROKER_HOST, BROKER_PORT, keepalive=60) + client.loop_forever() + + +if __name__ == "__main__": + main()