Skip to content

[SUBSCRIBER] Add message validation logic #29

Merged
merged 2 commits into from
Mar 12, 2026
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions mqtt/subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import paho.mqtt.client as mqtt
import json
import logging
from datetime import datetime

# ── Logging setup ─────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)


def load_config(path="config.json") -> dict:
"""Load broker settings and topics from config.json."""
with open(path, "r") as f:
return json.load(f)


def on_connect(client, userdata, flags, reason_code, properties=None):
if reason_code == 0:
config = userdata["config"]
broker = config["broker"]["address"]
port = config["broker"]["port"]
logger.info("Connected to HiveMQ broker at %s:%s", broker, port)

# Subscribe to every topic defined in config.json
topics = config["topics"]
for room, sensors in topics.items():
for sensor_type, topic_string in sensors.items():
client.subscribe(topic_string)
logger.info("Subscribed to: %s", topic_string)
else:
logger.error("Failed to connect, reason code: %s", reason_code)


def on_message(client, userdata, msg):
topic = msg.topic
payload = msg.payload.decode("utf-8", errors="replace")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

print(f"\n[{timestamp}] Message received")
print(f" Topic : {topic}")
print(f" Payload : {payload}")
print("-" * 50)


def on_disconnect(client, userdata, reason_code, properties=None):
if reason_code != 0:
logger.warning("Unexpected disconnect (code %s).", reason_code)


def main():
config = load_config("config.json")

broker = config["broker"]["address"]
port = config["broker"]["port"]

client = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION2,
userdata={"config": config}
)

client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

logger.info("Connecting to HiveMQ broker at %s:%s ...", broker, port)
client.connect(broker, port, keepalive=60)
client.loop_forever()


if __name__ == "__main__":
main()
141 changes: 141 additions & 0 deletions subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import paho.mqtt.client as mqtt
import json
import logging
from datetime import datetime

# ── Logging setup ────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)

# ── Broker config ─────────────────────────────────────────────────────────────
BROKER_HOST = "localhost"
BROKER_PORT = 1883
TOPIC = "campus/#"

# ── Required fields and their expected types ──────────────────────────────────
REQUIRED_FIELDS = {
"room": str,
"sensor_type": str,
"value": (int, float),
"unit": str,
"timestamp": str,
}


def validate_message(payload: str) -> tuple[bool, dict | None, str]:
"""
Validate an incoming MQTT message payload.

Returns:
(True, parsed_data, "") on success
(False, None, error_message) on failure
"""

# 1. Must be valid JSON
try:
data = json.loads(payload)
except json.JSONDecodeError as e:
return False, None, f"Invalid JSON: {e}"

# 2. Must be a JSON object (not a list or primitive)
if not isinstance(data, dict):
return False, None, "Message must be a JSON object, not a list or primitive"

# 3. Check all required fields are present and correct type
for field, expected_type in REQUIRED_FIELDS.items():
if field not in data:
return False, None, f"Missing required field: '{field}'"

if not isinstance(data[field], expected_type):
expected_name = (
expected_type.__name__
if not isinstance(expected_type, tuple)
else " or ".join(t.__name__ for t in expected_type)
)
actual_name = type(data[field]).__name__
return False, None, (
f"Field '{field}' must be {expected_name}, "
f"got {actual_name} (value: {repr(data[field])})"
)

# 4. value must be a real number (reject NaN / Infinity)
import math
if math.isnan(data["value"]) or math.isinf(data["value"]):
return False, None, f"Field 'value' must be a finite number, got {data['value']}"

return True, data, ""


def insert_to_database(data: dict):
"""
Placeholder for Timi's database insert function.
Replace this import and call once database.py is ready.

Expected usage:
from database import insert_reading
insert_reading(data)
"""
# TODO: uncomment when Timi's database.py is ready
# from database import insert_reading
# insert_reading(data)

# Temporary confirmation log until DB is connected
logger.info(
"VALID | room=%s sensor=%s value=%s%s time=%s",
data["room"],
data["sensor_type"],
data["value"],
data["unit"],
data["timestamp"],
)


def on_connect(client, userdata, flags, reason_code, properties=None):
if reason_code == 0:
logger.info("Connected to broker at %s:%s", BROKER_HOST, BROKER_PORT)
client.subscribe(TOPIC)
logger.info("Subscribed to topic: %s", TOPIC)
else:
logger.error("Failed to connect, reason code: %s", reason_code)


def on_message(client, userdata, msg):
topic = msg.topic
payload = msg.payload.decode("utf-8", errors="replace")

logger.debug("Received message on topic: %s", topic)

is_valid, data, error = validate_message(payload)

if not is_valid:
logger.warning(
"REJECTED | topic=%s | reason=%s | raw=%s",
topic, error, payload[:120]
)
return

insert_to_database(data)


def on_disconnect(client, userdata, reason_code, properties=None):
if reason_code != 0:
logger.warning("Unexpected disconnect (code %s). Will attempt reconnect.", reason_code)


def main():
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect

logger.info("Connecting to broker at %s:%s ...", BROKER_HOST, BROKER_PORT)
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
client.loop_forever()


if __name__ == "__main__":
main()