Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Maverick-Integrative-Project/Lawrence's Broker
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
733 lines (688 sloc)
28.2 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import time | |
import threading # By Fernando Campos | |
import paho.mqtt.client as paho # pip install paho-mqtt | |
import cv2 # pip install opencv-python | |
from inference_sdk import InferenceHTTPClient # pip install inference-sdk | |
import glob | |
import os # By Fernando Campos | |
from pathlib import Path | |
import logging # By Fernando Campos | |
#By Fernando Campos on 31/03/2024 | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
API_KEY = os.getenv('API_KEY') | |
# Proceed with the initialization that requires API_KEY | |
CLIENT = InferenceHTTPClient( | |
api_url="https://detect.roboflow.com", | |
api_key=API_KEY) | |
model_id = "coco/9" | |
this_file = str(Path(os.getcwd())) | |
if "\\" in this_file: | |
this_file = this_file + "\\" | |
else: | |
this_file = this_file +"/" | |
logging.info("Working Directory: " + this_file) | |
class mqtt_message: | |
raw_message = bytes #when decoding, is just bytes instead of bytearray | |
# when encoding contains full message rather than last message | |
raw_array = bytearray #bytearrays are a mutable form of bytes | |
raw_size = int | |
is_overlap = bool | |
is_decoded = bool | |
excess = bytearray #data which is not part of the first message | |
excess_size = int | |
header = bytearray #contains command byte and contents information | |
header_size = int | |
contents = bytearray #contains data such as topic and message | |
contents_size = int | |
command_bits = int | |
command = str #connect, publish, subscribe... | |
flag_list = list | |
first_data_raw = bytearray | |
second_data_raw = bytearray | |
first_data_size = int | |
second_data_size = int | |
first_data = str | |
second_data = str | |
decoded = list | |
is_missing = bool | |
decoded_all = list #a single socket will often recieve multiple messages back2back, the decode function only decodes the first message. | |
number_of_messages = int | |
def __init__(self, argument1, argument2 = "", argument3 = ""): #argument is for data to be encoded or decoded | |
self.is_missing = False | |
self.is_overlap = False | |
if type(argument1) == str: | |
self.is_decoded = True | |
self.command = argument1 | |
self.first_data = argument2 | |
self.second_data = argument3 | |
elif type(argument1) == bytes or bytearray: | |
self.raw_message = argument1 | |
self.raw_array = bytearray(argument1) | |
self.raw_size = len(self.raw_array) | |
self.is_decoded = False | |
else: | |
print("unknown argument type (only accepts bytes, bytearrays and strings)") | |
def decode(self): | |
#got help from http://www.steves-internet-guide.com/mqtt-protocol-messages-overview/#:~:text=MQTT%20is%20a%20binary%20based%20protocol%20were%20the,names%20and%20Passwords%20are%20encoded%20as%20UTF-8%20strings. | |
#this function identifies the command and locates the size and position of the contents. | |
self.command_bits = self.raw_array[0]>>4 | |
flag_bits = self.raw_array[0]<<4 | |
first_flag = bool(flag_bits>>7) | |
second_flag = bool((flag_bits>>6)%2) | |
third_flag = bool((flag_bits>>5)%4) | |
fourth_flag = bool((flag_bits>>4)%8) | |
self.flag_list = [first_flag,second_flag,third_flag,fourth_flag] | |
size = 0 | |
for byte in range(1,self.raw_size): | |
print(str(self.raw_array[byte])) | |
if self.raw_array[byte] == 0: | |
header_end = byte | |
break | |
continuation_bit = self.raw_array[byte] >> 7 | |
if continuation_bit: | |
size = (size + self.raw_array[byte] - 128) << 7 # FIXME | |
else: | |
size = size + self.raw_array[byte] | |
header_end = byte + 1 | |
break | |
self.header = self.raw_array[0:header_end+1] | |
self.header_size = len(self.header) | |
self.contents_size = size-1 | |
contents_end = header_end+size | |
self.contents = self.raw_array[header_end+1:contents_end] | |
#the contents are decoded differently for each command | |
print(str(contents_end)+" "+str(self.raw_size)) | |
if contents_end < self.raw_size: | |
#print(str(contents_end)+" "+str(self.raw_size)) | |
self.excess = self.raw_array[contents_end:] | |
self.excess_size = len(self.excess) | |
if self.excess_size < 3: | |
self.is_overlap = False | |
else: | |
self.is_overlap = True | |
self.is_missing = False | |
elif contents_end > self.raw_size: | |
if contents_end < 1000: | |
self.is_missing = False #probably, for some reason the var header size doesnt match | |
self.is_overlap = False | |
else: | |
self.is_missing = True | |
self.is_overlap = False | |
print("part of message is missing") | |
else: | |
self.is_overlap = False | |
if self.command_bits == 1: | |
self.decode_connect() | |
elif self.command_bits == 2: | |
self.command = "connack" | |
elif self.command_bits == 3: | |
self.command = "publish" | |
self.decode_publish() | |
elif self.command_bits == 4: | |
self.command = "puback" | |
elif self.command_bits == 9: | |
self.command = "forward" | |
self.decode_forward() | |
elif self.command_bits == 8: | |
self.command = "subscribe" | |
self.decode_subscribe() | |
elif self.command_bits == 9: | |
self.command = "suback" | |
elif self.command_bits == 10: | |
self.command = "unsubscribe" | |
self.decode_unsubscribe() | |
elif self.command_bits == 14: | |
self.command = "disconnect" | |
self.decode_disconnect() | |
else: | |
print("unknown command") | |
self.decoded = [self.command,self.first_data,self.second_data] | |
self.number_of_messages = 1 | |
return self.decoded | |
def decode_connect(self): | |
count = 0 | |
name_found = False | |
username_found = False | |
password_found = False | |
for number in range(0,self.contents_size): | |
if self.contents[number] == 0: | |
count = count + 1 | |
if count == 2 and name_found is False: | |
name_start = number + 2 | |
name_found = True | |
elif count == 3 and name_found and username_found is False: | |
username_start = number + 2 | |
username_found = True | |
elif count == 4 and username_found and password_found is False: | |
password_start = number + 2 | |
password_found = True | |
break | |
if username_found: | |
self.command = "connect: with password" | |
self.first_data_size = self.contents[username_start-1] | |
self.first_data_raw = self.contents[username_start:username_start+self.first_data_size] | |
self.first_data = self.first_data_raw.decode("ascii") | |
self.second_data_size = self.contents[password_start-1] | |
self.second_data_raw = self.contents[password_start:password_start+self.second_data_size] | |
self.second_data = self.second_data_raw.decode("ascii") | |
else: | |
self.command = "connect" | |
self.first_data_size = self.contents[name_start-1] | |
name_end = name_start + self.first_data_size | |
self.first_data_raw = self.contents[name_start:name_end] | |
self.first_data = self.first_data_raw.decode("ascii") | |
self.second_data = "" | |
self.second_data_size = 0 | |
def decode_publish(self): | |
self.first_data_size = self.contents[0] | |
topic_end = 1+self.first_data_size | |
self.first_data_raw = self.contents[1:topic_end] | |
self.first_data = self.first_data_raw.decode("ascii") | |
self.second_data_raw = self.contents[topic_end:] | |
self.second_data_size = len(self.second_data_raw) | |
try: | |
self.second_data = self.second_data_raw.decode("ascii") | |
except: | |
print("cannot decode payload. displaying bytes") | |
self.second_data = self.second_data_raw | |
def decode_forward(self): | |
pass#does not need further decoding | |
def decode_subscribe(self): | |
for byte in range(0,self.contents_size): | |
if self.contents[byte] == 0: | |
topic_start = byte + 2 | |
break | |
self.first_data_size = self.contents[topic_start-1] | |
self.first_data_raw = self.contents[topic_start:topic_start+self.first_data_size] | |
self.first_data = self.first_data_raw.decode("ascii") | |
self.second_data = "" | |
def decode_unsubscribe(self): #yes it is identical to function above | |
for byte in range(0,self.contents_size): | |
if self.contents[byte] == 0: | |
topic_start = byte + 2 | |
break | |
self.first_data_size = self.contents[topic_start-1] | |
self.first_data_raw = self.contents[topic_start:topic_start+self.first_data_size] | |
self.first_data = self.first_data_raw.decode("ascii") | |
self.second_data = "" | |
def decode_disconnect(self): | |
pass#does not need further decoding | |
def decode_puback(self): | |
if self.contents[0] == 0: | |
self.first_data = "Success" | |
elif self.contents[0] == 0x10: | |
self.first_data = "No Matching Subscribers" | |
else: | |
self.first_data = "Unknown Error" | |
self.second_data = "" | |
def decode_excess(self): | |
if self.is_overlap: | |
excess_object = mqtt_message(self.excess) | |
return excess_object.decode() | |
else: | |
print("error, there is no excess") | |
def decode_all(self): | |
decoding_list = [] | |
mqtt_object = mqtt_message(self.raw_message) | |
count = 0 | |
while True: | |
decoding_list.append(mqtt_object.decode()) | |
count = count + 1 | |
if mqtt_object.contents_size > 1000: | |
mqtt_object.second_data = mqtt_object.raw_array[mqtt_object.header_size:] | |
break | |
if mqtt_object.is_overlap: | |
mqtt_object = mqtt_message(mqtt_object.excess) | |
if mqtt_object.is_missing: | |
print("must be stuff missing") | |
self.is_missing = True | |
break | |
else: | |
break | |
self.number_of_messages = count | |
self.decoded_all = decoding_list | |
return self.decoded_all | |
def encode(self): | |
if self.command == "forward": | |
self.encode_forward() | |
self.is_overlap = True | |
print(self.second_data) | |
self.encode_publish(self.first_data, self.second_data) | |
else: | |
self.encode_connect() | |
if self.command == "publish": | |
self.is_overlap = True | |
self.encode_publish(self.first_data, self.second_data) | |
#we have no need to enocode other types of messages | |
else: | |
self.is_overlap = False | |
return self.raw_message | |
def build_header(self, command): | |
count = 0 | |
remainder = self.contents_size | |
while remainder > 1: | |
count = count + 1 | |
remainder = remainder / 128 | |
#print("content size is: "+str(self.contents_size)) | |
self.header_size = 2 + count | |
if count == 1: | |
var_header = chr(self.contents_size) | |
elif count ==2: | |
var_header = chr(int(self.contents_size/128)) + chr(self.contents_size%128) | |
elif count ==3: | |
var_header = chr(int(self.contents_size/128/128)) + chr(self.contents_size%128) + chr(self.contents_size%128%128) | |
self.command = command | |
if command == "connect": | |
self.command_bits = 1 | |
elif command == "publish": | |
self.command_bits = 3 | |
elif command == "forward": | |
self.command_bits = 9 | |
elif command == "subscribe": | |
self.command_bits = 8 | |
elif command == "unsubscribe": | |
self.command_bits = 10 | |
elif command == "disconnect": | |
self.command_bits = 14 | |
#assume no flags are used | |
self.header = bytearray((self.command_bits<<4).to_bytes(1,"big")) + bytearray((var_header+chr(0)).encode("utf-8")) | |
return self.header | |
def encode_connect(self): | |
self.second_data = "" | |
#can't be bothered, my broker won't need to send username and password | |
if bool(self.second_data) is False: | |
if self.first_data is False: | |
name = "Lawrence's_Broker" | |
name_length = len(name) | |
self.contents_size = 11 + name_length | |
else: | |
name = self.first_data | |
name_length = len(self.first_data) | |
self.contents_size = 11 + name_length | |
self.build_header("connect") | |
string = "\x04MQTT\x04\x02\x00<\x00" + chr(name_length) + name | |
self.contents = bytearray(string.encode("utf-8")) | |
self.raw_array = self.header + self.contents | |
self.raw_message = self.raw_array | |
self.raw_size = len(self.raw_message) | |
def encode_forward(self): | |
self.contents_size = 2 | |
self.build_header("forward") | |
self.contents = bytearray(b"\x01") | |
self.raw_array = self.header + self.contents | |
self.raw_message = self.raw_array | |
self.raw_size = len(self.raw_message) | |
def encode_publish(self,topic,payload): | |
self.excess = self.raw_message | |
self.excess_size =self.raw_size | |
self.contents_size = self.contents_size + len(topic) + len(payload) + 1 | |
self.build_header("publish") | |
if type(self.second_data) == str: | |
string = chr(len(topic)) + topic + payload | |
self.contents = bytearray(string.encode("utf-8")) | |
elif type(self.second_data) == bytes or bytearray: | |
print("bytes or bytearray") | |
string = chr(len(topic)) + topic | |
self.contents = bytearray(string.encode("utf-8")) + bytearray(self.second_data) | |
self.raw_array = self.header + self.contents | |
if self.is_overlap: | |
self.raw_message = self.excess + self.raw_array | |
else: | |
self.raw_message = self.raw_array | |
self.raw_size = len(self.raw_message) | |
def qos_respsonse(self, command=False): | |
if command is False: | |
command = self.command | |
if command == "connect": | |
return bytearray(b"\x02\x00\x00") | |
#Could add here the qos responses for subscribe and publish | |
def forward_publish(client, topic, message): | |
publication = mqtt_message("forward", topic, message) | |
message = publication.encode() | |
print("outgoing message: "+ str(message)) | |
client.send(message) | |
def getmyip(): | |
hostname = socket.gethostname() | |
address = socket.gethostbyname(hostname) | |
return address | |
print(getmyip()) | |
def sendmessage(message, address=False): | |
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
client.connect(address) | |
print("connected to: "+str(address)) | |
encodedmessage = bytes(message, encoding="utf-8") | |
client.send(encodedmessage) | |
print("message sent: "+message) | |
def authenticate_user(username, password): | |
if check_username_and_password(username, password): | |
return True | |
else: | |
return False | |
# By Fernando Campos on 31/03/2024 | |
class ConnectionManager: | |
def __init__(self): | |
self.connections = [] | |
self.lock = threading.Lock() | |
def add_connection(self, connection): | |
with self.lock: | |
self.connections.append(connection) | |
def remove_connection(self, connection): | |
with self.lock: | |
self.connections.remove(connection) | |
# Initialize the connection manager | |
connection_manager = ConnectionManager() | |
def run_server(port=1883): #Work by Lawrence | |
stop = False | |
def connection_handler(port): | |
ip = getmyip() | |
attempts = 0 | |
while attempts < 20: | |
#because of reusing port issues | |
#this function keeps trying ports until it works | |
try: | |
address = (ip,port) | |
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server.bind(address) | |
server.listen(10) #can hold a queue of 10 connections at once | |
break | |
except: | |
port = port+1 | |
print("listener started. IP "+ip+" PORT "+str(port)) | |
while stop is False: | |
client, client_address = server.accept() | |
print("got a connection from "+str(client_address)) | |
data = client.recv(2097152)# max file size is 2 megabytes | |
disconnect = False | |
while data: | |
mqtt = mqtt_message(data) | |
print(client) | |
decoded_msg = mqtt.decode_all() | |
if mqtt.is_missing: | |
expected_size = mqtt.contents_size | |
current_size = mqtt.second_data_size | |
print("expected "+str(expected_size)+ " current " + str(current_size)) | |
while current_size < expected_size - 15: | |
nextdata = client.recv(2097152) | |
if nextdata != data and type(decoded_msg[-1][2]) != str: | |
decoded_msg[-1][2] = decoded_msg[-1][2] + bytearray(data) | |
current_size = current_size + len(data) | |
print("more data concatenated") | |
continue | |
#print(decoded_msg) | |
for message in range(0,len(decoded_msg)): | |
print(decoded_msg[message][0]) | |
#forward_publish(client, "topic","message") | |
if decoded_msg[message][0] == "publish": | |
on_publish(decoded_msg[message]) | |
elif decoded_msg[message][0] == "connect" or "connect: with password": | |
connect_message = decoded_msg[message][0] | |
on_connect(decoded_msg[message][1],decoded_msg[message][2], client) | |
elif decoded_msg[message][0] == "subscribe": | |
on_subscribe(connect_message[1],connect_message[2],message[1]) | |
#forward_publish(client, "thisistopic", "messge") | |
elif decoded_msg[message][0] == "unsubscribe": | |
on_unsubscribe(connect_message[1],connect_message[2],message[1]) | |
pass | |
elif decoded_msg[message][0] == "disconnect": | |
disconnect = True | |
break | |
if disconnect: | |
break | |
data = client.recv(2097152) #picks up the next message, if there is one. | |
client.close() | |
server.close() | |
joblistener = threading.Thread(target=connection_handler, args=(port,)) | |
joblistener.start() | |
input(">< press enter to stop|") | |
stop = True | |
print("listener ended\n") | |
#publishtestbytes = bytearray(b'\x10\x10\x00\x04MQTT\x04\x02\x00<\x00\x04Test0\x0e\x00\x05topicmessage') | |
#subscribetestbytes = bytearray(b'\x10\x10\x00\x04MQTT\x04\x02\x00<\x00\x04Test\x82\n\x00\x01\x00\x05topic\x00') | |
#unsubscribetestbytes = bytearray(b'\x10\x10\x00\x04MQTT\x04\x02\x00<\x00\x04Test\xa2\t\x00\x01\x00\x05topic') | |
def testpic(port=1883,ip="192.168.1.120",topic="obj-rec"): | |
global this_file | |
client = paho.Client(paho.CallbackAPIVersion.VERSION1,"Test") | |
client.connect(ip,port) | |
cam = cv2.VideoCapture(0) | |
result, image = cam.read() | |
if result: | |
cv2.imwrite(this_file+'webcam.jpg', image) | |
time.sleep(4) | |
file = open(this_file+"webcam.jpg","rb") | |
payload = file.read() | |
file.close() | |
#predictions = get_predictions("picture.jpg") | |
#showpic(image) | |
client.publish(topic, payload) | |
def testpublish(topic, message, ip="192.168.43.18", port = 1883): | |
def on_message(client, userdata, msg): | |
print(f"Recieved '{msg.payload.decode()}' from '{msg.topic}' topic") | |
client = paho.Client(paho.CallbackAPIVersion.VERSION1,"Test") | |
client.username_pw_set("username", "password") #must happen before connect to affect message | |
client.connect(ip,port) | |
#client.publish(topic+"hello"+"goodbye", message) | |
#client.unsubscribe(topic,0) | |
#client.subscribe(topic,qos=0) | |
client.on_message = on_message | |
client.loop_start() | |
client.disconnect() | |
#From Johnson | |
#Function for on_publish | |
def on_publish(mqtt): | |
global this_file #retrieves the address of where this python script is saved | |
mqtt = topic_logic(mqtt) | |
all_subscribed_people = list_subscribers() | |
filename = str(time.time())[5:-3]+".txt" | |
new_subscriber_list = this_file + filename #File path for subscribed people | |
new_data_file = this_file+filename | |
write_subscribed_users(all_subscribed_people, new_subscriber_list) #write data file | |
mqtt = payload_logic(mqtt) | |
file = open(new_data_file,"w") | |
file.write(mqtt[2]) | |
file.close() | |
print("The list of subscribed people written to ", new_subscriber_list) | |
def list_subscribers(): #This will return the list of topic files | |
global this_file | |
topic_files_list = glob.glob(this_file+"topics\*.txt") #File path for topic | |
#print(topic_files_list) | |
all_subscribed_people = {} #To list the subscribed peoples for the topic | |
#For loop will go through the topic files and then return with subscribed people for their topic in a directory | |
for topic_file in topic_files_list: #This will extract the topic name from the file | |
extract_topic_name = os.path.basename(topic_file).split('.')[0] | |
all_subscribed_people[extract_topic_name] = [] | |
topics_file = open(topic_file, "r") | |
for line in topics_file: | |
line_parts = line.strip().split(",") #This is used to split the lines using a comma | |
if len(line_parts) >= 2: #To see if there is atleast person and subscription on line_part | |
person = line_parts[0] | |
all_subscribed_people[extract_topic_name].append(person) | |
print(all_subscribed_people) | |
return all_subscribed_people | |
#Function to write subscribed users | |
def write_subscribed_users(all_subscribed_people, new_pub_sub_path): | |
subscribers_file = open(new_pub_sub_path, "w") | |
for topic, users in all_subscribed_people.items(): | |
subscribers_file.write(f"Topic Name: {topic}\n") | |
subscribers_file.write("All subscribed people: \n") | |
for user in users: | |
subscribers_file.write(f"{user}\n") | |
subscribers_file.write("\n") | |
#By Lawrence ... again | |
def topic_logic(mqtt): | |
#mqtt is a list with format: ["command","topic","message"] #command is probably "publish" | |
topic_string = mqtt[1] | |
if topic_string[0] == "[": | |
#this is for handling a list of topics, the publisher sends a string containing list syntax | |
topic_list = [] | |
next_topic = "" | |
for position in range(1,len(topic_string)-1): | |
if topic_string[position] == ",": | |
topic_list.append(next_topic) | |
elif topic_string[position] == "]": | |
topic_list.append(next_topic) | |
break | |
elif topic_string[position] == "'": | |
pass | |
else: | |
next_topic.append(topic_list[position]) | |
else: | |
topic_list = [topic_string] | |
output_command = mqtt[0] | |
output_topics = topic_list | |
output_message = mqtt[2] | |
for position in range(0,len(topic_list)): | |
#for each topic, we check its name and perform operations on the message | |
if topic_list[position] == "obj-rec": | |
if type(mqtt[2]) == bytes or bytearray: | |
#perform object recognition | |
extra_topics = object_recognition(mqtt[2]) | |
output_topics = output_topics + extra_topics | |
#add elif statements for topic based logic | |
return [output_command, output_topics, output_message] | |
def payload_logic(mqtt): | |
#if the published data is an image, replace the data with list of objects | |
if len(mqtt[2]) > 1000: | |
for position in range(0,len(mqtt[1])): | |
if mqtt[1][position] == "obj-rec": | |
objrec = position | |
break | |
returnlist = mqtt[1][objrec:] | |
return str(returnlist) | |
def object_recognition(bytearray): | |
global this_file | |
file = open(this_file+"obj_rec.jpg","wb") | |
file.write(bytearray) | |
predictions_list = get_predictions(this_file+"obj_rec.jpg") | |
return predictions_list | |
def get_predictions(photopath): #work by Lawrence | |
CLIENT = InferenceHTTPClient(api_url="https://detect.roboflow.com",api_key="gCKCuAfD1ZqFgdPTYvLS") | |
detections = CLIENT.infer(photopath, model_id="coco/9") | |
#print(detections) | |
number_of_predictions = len(detections["predictions"]) | |
predictions_list = [] | |
for position in range(number_of_predictions): | |
predictions_list.append(detections) | |
predictions_list = deleteDuplicates(predictions_list) | |
#payload = "Predictions: " + ', '.join(result) | |
#client.publish("cam174/x", payload) | |
#print("Predictions sent to broker") | |
print("Objects Detected: " + str(predictions_list)) | |
return predictions_list | |
#Written by Semen | |
def deleteDuplicates(objects_list): | |
#New list without duplicates | |
new_objects_list = [] | |
#List of all duplicates | |
duplicats_list = [] | |
#Searching for target, target is duplicated words | |
for target in objects_list: | |
#If duplicated word has already detected - add to list of duplicates | |
if target in new_objects_list: | |
duplicats_list.append(target) | |
#If word wasn't detected before - add to list | |
else: | |
new_objects_list.append(target) | |
return new_objects_list | |
#Written by Semen | |
#Function for checking the username and password exsistance | |
#Note: chage the file path to yours | |
def check_username_and_password(username, password): | |
global this_file | |
userfile = open(this_file+"users.txt", "r") | |
for line in userfile: | |
parts = line.strip().split(",") #Processing each line of the file | |
if len(parts) >= 2: # Check if there are both username and password | |
name, key = parts[0], ','.join(parts[1:]) #Extraction of username and password | |
if name == username: #Checking if extracted name=username | |
if key == password: #Checking if extracted key=password | |
return True | |
else: | |
print("Incorrect password") | |
return False | |
print("Username not found.") | |
return False | |
#Written by Semen | |
#Function to check for username existence to avoid duplicates | |
#Note: chage the file path to yours | |
def check_username(username): | |
global this_file | |
user_exists_file = open(this_file+"users.txt", "r") | |
for line in user_exists_file: | |
parts = line.strip().split(",") #Splitting each line into parts by comma | |
if len(parts) >= 2: | |
name, _ = parts[0], ','.join(parts[1:]) #Extracting the username | |
if name == username: | |
return True | |
return False | |
#Written by Semen | |
#Subscribe function | |
def on_subscribe(username, password, topic): | |
global this_file | |
#If password is entered can proceed if not - else statement | |
#If details exsist - add the username to topic | |
if check_username_and_password(username, password): | |
file_path = this_file + f"topics\{topic}.txt" | |
user_topic_file = open(file_path, "a") | |
user_topic_file.write(f"\n{username}") | |
print("Authentification succsessful. Subscription to the topic succsessful.") | |
return True | |
else: | |
#Adding new user to the users file + topic file | |
print("Authentification failed. Creating new account.") | |
if check_username(username): | |
print("Username is not available.") | |
else: | |
users_file = open(this_file+"users.txt", "a") | |
users_file.write(f"\n{username},{password}") # Storing username and password | |
file_path = this_file+f"topics\{topic}.txt" | |
user_topic_file = open(file_path, "a") | |
user_topic_file.write(f"\n{username}") | |
print("Account created succsessfully. Subscription to the topic succsessful.") | |
#In case if password is empty | |
#To test function: | |
#on_subscribe("ann", "45667", "plane") | |
#By Anurag. | |
def on_unsubscribe(topic,sub_id): | |
path = f"{topic}.txt" | |
try: | |
# to read the file: | |
f1 = open(path,"r") | |
lines = f1.readlines() | |
#To remove the Subscriber: | |
cleaned_lines = [] | |
for line in lines: | |
if line.strip() != sub_id: | |
cleaned_lines.append(line.strip()) | |
# Write the updated list back to the file | |
with open(path, 'w') as file: | |
file.write('\n'.join(lines)) | |
print("Subscriber Removed.") | |
except FileNotFoundError: | |
print(f"The {topic} file not found") | |
except Exception as exe: | |
print(f"A error occured while removing subscrober:{exe}") | |
#By Anurag | |
def on_connect(username, password, client): | |
credentials = False | |
if password: | |
If check_username_and_password: | |
credentials = True | |
Else: | |
If check_username: | |
Credentials = True | |
if credentials: | |
for publication in pub_sub: | |
for pub_sub_file in glob.glob("/publications/*.txt"): | |
with open(pub_sub_file, "r") as f: | |
pub_sub_file = json.load(f) | |
# Sends the published message to the publishing file. | |
forward_publish(client, “topic”, pub_sub_data["message"]) | |
# Name gets removed from the published files. | |
os.remove(pub_sub_file) | |
# Get the list of publication file paths | |
pub_sub_file = glob.glob("/publications/*.txt") | |
# Check if the publication file existence and removes it | |
for file_path in pub_sub_file: | |
os.remove(file_path) |