diff --git a/.idea/MQTT_Project.iml b/.idea/MQTT_Project.iml index 2368678..814d220 100644 --- a/.idea/MQTT_Project.iml +++ b/.idea/MQTT_Project.iml @@ -4,7 +4,7 @@ - + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index d56657a..abeef14 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,4 +1,4 @@ - + \ No newline at end of file diff --git a/MQTT_Client/client.py b/MQTT_Client/client.py new file mode 100644 index 0000000..070301f --- /dev/null +++ b/MQTT_Client/client.py @@ -0,0 +1,195 @@ +import socket +import threading +import time +from http.server import BaseHTTPRequestHandler, HTTPServer +import json +from datetime import datetime +import tkinter as tk +from tkinter import messagebox + +# List to store all connected and authenticated client sockets +clients = [] +# List of allowed authentication keys +allowed_keys = ['key123', 'key456'] +# List to store scheduled messages +scheduled_messages = [] + +def broadcast(message): + """Send a message to all connected and authenticated clients.""" + for client in clients: + try: + client.sendall(message.encode()) + except: + # Remove the client if message sending fails + clients.remove(client) + +def handle_client(client_socket, addr): + """Handle incoming data and authentication for each client.""" + print(f"Accepted connection from {addr}") + try: + # First, receive the key for authentication + key = client_socket.recv(1024).decode().strip() + print(f"Received key: {key}") + + if key in allowed_keys: + clients.append(client_socket) + print(f"Client {addr} provided a valid key, authentication successful.") + else: + print(f"Client {addr} provided an invalid key.") + client_socket.close() + return + + while True: + data = client_socket.recv(1024) + if not data: + break + print(f"Received data: {data.decode()}") + # Broadcast the received message to all clients + broadcast(data.decode()) + except Exception as e: + print(f"Error handling data from {addr}: {e}") + finally: + if client_socket in clients: + clients.remove(client_socket) + client_socket.close() + print(f"Connection with {addr} closed") + +def periodic_broadcast(): + """Broadcast scheduled messages to all clients at the specified times.""" + while True: + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + for msg_info in scheduled_messages: + if msg_info['time'] == now: + broadcast(msg_info['message']) + scheduled_messages.remove(msg_info) + time.sleep(1) + +class RequestHandler(BaseHTTPRequestHandler): + """Handle HTTP requests for setting scheduled messages and immediate broadcasts.""" + + def do_POST(self): + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + try: + data = json.loads(post_data) + if self.path == "/schedule": + global scheduled_messages + scheduled_messages.append({ + "time": data["time"], + "message": data["message"] + }) + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.end_headers() + response = {"status": "success", "message": "Scheduled message set"} + self.wfile.write(json.dumps(response).encode()) + elif self.path == "/broadcast": + broadcast(data["message"]) + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.end_headers() + response = {"status": "success", "message": "Message broadcasted immediately"} + self.wfile.write(json.dumps(response).encode()) + else: + self.send_response(404) + self.send_header('Content-Type', 'application/json') + self.end_headers() + response = {"status": "error", "message": "Endpoint not found"} + self.wfile.write(json.dumps(response).encode()) + except json.JSONDecodeError: + self.send_response(400) + self.send_header('Content-Type', 'application/json') + self.end_headers() + response = {"status": "error", "message": "Invalid JSON"} + self.wfile.write(json.dumps(response).encode()) + +def start_api_server(host='localhost', port=8000): + """Start the API server to handle HTTP requests.""" + server = HTTPServer((host, port), RequestHandler) + print(f"Starting API server on {host}:{port}") + server.serve_forever() + +def start_broker(host='localhost', port=5500): + """Start the broker server and listen for incoming connections.""" + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.bind((host, port)) + server_socket.listen(5) + print(f"Listening on {host}:{port}") + + try: + while True: + client_socket, addr = server_socket.accept() + # Start a new thread to handle the client + client_thread = threading.Thread(target=handle_client, args=(client_socket, addr)) + client_thread.start() + finally: + server_socket.close() + print("Broker server closed") + +class App: + def __init__(self, root): + self.root = root + self.root.title("Message Broker UI") + + self.frame = tk.Frame(self.root) + self.frame.pack(pady=10) + + self.message_label = tk.Label(self.frame, text="Message:") + self.message_label.grid(row=0, column=0, padx=5, pady=5) + + self.message_entry = tk.Entry(self.frame, width=50) + self.message_entry.grid(row=0, column=1, padx=5, pady=5) + + self.broadcast_button = tk.Button(self.frame, text="Broadcast Now", command=self.broadcast_now) + self.broadcast_button.grid(row=0, column=2, padx=5, pady=5) + + self.schedule_label = tk.Label(self.frame, text="Schedule Time (YYYY-MM-DD HH:MM:SS):") + self.schedule_label.grid(row=1, column=0, padx=5, pady=5) + + self.schedule_entry = tk.Entry(self.frame, width=50) + self.schedule_entry.grid(row=1, column=1, padx=5, pady=5) + + self.schedule_button = tk.Button(self.frame, text="Schedule Message", command=self.schedule_message) + self.schedule_button.grid(row=1, column=2, padx=5, pady=5) + + def broadcast_now(self): + message = self.message_entry.get() + if message: + broadcast(message) + messagebox.showinfo("Info", "Message broadcasted immediately") + else: + messagebox.showwarning("Warning", "Message cannot be empty") + + def schedule_message(self): + message = self.message_entry.get() + schedule_time = self.schedule_entry.get() + if message and schedule_time: + try: + datetime.strptime(schedule_time, "%Y-%m-%d %H:%M:%S") + scheduled_messages.append({"time": schedule_time, "message": message}) + messagebox.showinfo("Info", "Message scheduled successfully") + except ValueError: + messagebox.showerror("Error", "Invalid date format. Use YYYY-MM-DD HH:MM:SS") + else: + messagebox.showwarning("Warning", "Message and schedule time cannot be empty") + +if __name__ == "__main__": + # Start the periodic broadcast thread + broadcast_thread = threading.Thread(target=periodic_broadcast) + broadcast_thread.daemon = True + broadcast_thread.start() + + # Start the API server + api_thread = threading.Thread(target=start_api_server) + api_thread.daemon = True + api_thread.start() + + # Start the broker server + broker_thread = threading.Thread(target=start_broker) + broker_thread.daemon = True + broker_thread.start() + + # Start the GUI + root = tk.Tk() + app = App(root) + root.mainloop()