Skip to content
Permalink
56ccb32122
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
733 lines (688 sloc) 28.2 KB
import socket
import time
import threading # By Fernando Campos
import paho.mqtt.client as paho # pip install paho-mqtt
import cv2 # pip install opencv-python
from inference_sdk import InferenceHTTPClient # pip install inference-sdk
import glob
import os # By Fernando Campos
from pathlib import Path
import logging # By Fernando Campos
#By Fernando Campos on 31/03/2024
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
API_KEY = os.getenv('API_KEY')
# Proceed with the initialization that requires API_KEY
CLIENT = InferenceHTTPClient(
api_url="https://detect.roboflow.com",
api_key=API_KEY)
model_id = "coco/9"
this_file = str(Path(os.getcwd()))
if "\\" in this_file:
this_file = this_file + "\\"
else:
this_file = this_file +"/"
logging.info("Working Directory: " + this_file)
class mqtt_message:
raw_message = bytes #when decoding, is just bytes instead of bytearray
# when encoding contains full message rather than last message
raw_array = bytearray #bytearrays are a mutable form of bytes
raw_size = int
is_overlap = bool
is_decoded = bool
excess = bytearray #data which is not part of the first message
excess_size = int
header = bytearray #contains command byte and contents information
header_size = int
contents = bytearray #contains data such as topic and message
contents_size = int
command_bits = int
command = str #connect, publish, subscribe...
flag_list = list
first_data_raw = bytearray
second_data_raw = bytearray
first_data_size = int
second_data_size = int
first_data = str
second_data = str
decoded = list
is_missing = bool
decoded_all = list #a single socket will often recieve multiple messages back2back, the decode function only decodes the first message.
number_of_messages = int
def __init__(self, argument1, argument2 = "", argument3 = ""): #argument is for data to be encoded or decoded
self.is_missing = False
self.is_overlap = False
if type(argument1) == str:
self.is_decoded = True
self.command = argument1
self.first_data = argument2
self.second_data = argument3
elif type(argument1) == bytes or bytearray:
self.raw_message = argument1
self.raw_array = bytearray(argument1)
self.raw_size = len(self.raw_array)
self.is_decoded = False
else:
print("unknown argument type (only accepts bytes, bytearrays and strings)")
def decode(self):
#got help from http://www.steves-internet-guide.com/mqtt-protocol-messages-overview/#:~:text=MQTT%20is%20a%20binary%20based%20protocol%20were%20the,names%20and%20Passwords%20are%20encoded%20as%20UTF-8%20strings.
#this function identifies the command and locates the size and position of the contents.
self.command_bits = self.raw_array[0]>>4
flag_bits = self.raw_array[0]<<4
first_flag = bool(flag_bits>>7)
second_flag = bool((flag_bits>>6)%2)
third_flag = bool((flag_bits>>5)%4)
fourth_flag = bool((flag_bits>>4)%8)
self.flag_list = [first_flag,second_flag,third_flag,fourth_flag]
size = 0
for byte in range(1,self.raw_size):
print(str(self.raw_array[byte]))
if self.raw_array[byte] == 0:
header_end = byte
break
continuation_bit = self.raw_array[byte] >> 7
if continuation_bit:
size = (size + self.raw_array[byte] - 128) << 7 # FIXME
else:
size = size + self.raw_array[byte]
header_end = byte + 1
break
self.header = self.raw_array[0:header_end+1]
self.header_size = len(self.header)
self.contents_size = size-1
contents_end = header_end+size
self.contents = self.raw_array[header_end+1:contents_end]
#the contents are decoded differently for each command
print(str(contents_end)+" "+str(self.raw_size))
if contents_end < self.raw_size:
#print(str(contents_end)+" "+str(self.raw_size))
self.excess = self.raw_array[contents_end:]
self.excess_size = len(self.excess)
if self.excess_size < 3:
self.is_overlap = False
else:
self.is_overlap = True
self.is_missing = False
elif contents_end > self.raw_size:
if contents_end < 1000:
self.is_missing = False #probably, for some reason the var header size doesnt match
self.is_overlap = False
else:
self.is_missing = True
self.is_overlap = False
print("part of message is missing")
else:
self.is_overlap = False
if self.command_bits == 1:
self.decode_connect()
elif self.command_bits == 2:
self.command = "connack"
elif self.command_bits == 3:
self.command = "publish"
self.decode_publish()
elif self.command_bits == 4:
self.command = "puback"
elif self.command_bits == 9:
self.command = "forward"
self.decode_forward()
elif self.command_bits == 8:
self.command = "subscribe"
self.decode_subscribe()
elif self.command_bits == 9:
self.command = "suback"
elif self.command_bits == 10:
self.command = "unsubscribe"
self.decode_unsubscribe()
elif self.command_bits == 14:
self.command = "disconnect"
self.decode_disconnect()
else:
print("unknown command")
self.decoded = [self.command,self.first_data,self.second_data]
self.number_of_messages = 1
return self.decoded
def decode_connect(self):
count = 0
name_found = False
username_found = False
password_found = False
for number in range(0,self.contents_size):
if self.contents[number] == 0:
count = count + 1
if count == 2 and name_found is False:
name_start = number + 2
name_found = True
elif count == 3 and name_found and username_found is False:
username_start = number + 2
username_found = True
elif count == 4 and username_found and password_found is False:
password_start = number + 2
password_found = True
break
if username_found:
self.command = "connect: with password"
self.first_data_size = self.contents[username_start-1]
self.first_data_raw = self.contents[username_start:username_start+self.first_data_size]
self.first_data = self.first_data_raw.decode("ascii")
self.second_data_size = self.contents[password_start-1]
self.second_data_raw = self.contents[password_start:password_start+self.second_data_size]
self.second_data = self.second_data_raw.decode("ascii")
else:
self.command = "connect"
self.first_data_size = self.contents[name_start-1]
name_end = name_start + self.first_data_size
self.first_data_raw = self.contents[name_start:name_end]
self.first_data = self.first_data_raw.decode("ascii")
self.second_data = ""
self.second_data_size = 0
def decode_publish(self):
self.first_data_size = self.contents[0]
topic_end = 1+self.first_data_size
self.first_data_raw = self.contents[1:topic_end]
self.first_data = self.first_data_raw.decode("ascii")
self.second_data_raw = self.contents[topic_end:]
self.second_data_size = len(self.second_data_raw)
try:
self.second_data = self.second_data_raw.decode("ascii")
except:
print("cannot decode payload. displaying bytes")
self.second_data = self.second_data_raw
def decode_forward(self):
pass#does not need further decoding
def decode_subscribe(self):
for byte in range(0,self.contents_size):
if self.contents[byte] == 0:
topic_start = byte + 2
break
self.first_data_size = self.contents[topic_start-1]
self.first_data_raw = self.contents[topic_start:topic_start+self.first_data_size]
self.first_data = self.first_data_raw.decode("ascii")
self.second_data = ""
def decode_unsubscribe(self): #yes it is identical to function above
for byte in range(0,self.contents_size):
if self.contents[byte] == 0:
topic_start = byte + 2
break
self.first_data_size = self.contents[topic_start-1]
self.first_data_raw = self.contents[topic_start:topic_start+self.first_data_size]
self.first_data = self.first_data_raw.decode("ascii")
self.second_data = ""
def decode_disconnect(self):
pass#does not need further decoding
def decode_puback(self):
if self.contents[0] == 0:
self.first_data = "Success"
elif self.contents[0] == 0x10:
self.first_data = "No Matching Subscribers"
else:
self.first_data = "Unknown Error"
self.second_data = ""
def decode_excess(self):
if self.is_overlap:
excess_object = mqtt_message(self.excess)
return excess_object.decode()
else:
print("error, there is no excess")
def decode_all(self):
decoding_list = []
mqtt_object = mqtt_message(self.raw_message)
count = 0
while True:
decoding_list.append(mqtt_object.decode())
count = count + 1
if mqtt_object.contents_size > 1000:
mqtt_object.second_data = mqtt_object.raw_array[mqtt_object.header_size:]
break
if mqtt_object.is_overlap:
mqtt_object = mqtt_message(mqtt_object.excess)
if mqtt_object.is_missing:
print("must be stuff missing")
self.is_missing = True
break
else:
break
self.number_of_messages = count
self.decoded_all = decoding_list
return self.decoded_all
def encode(self):
if self.command == "forward":
self.encode_forward()
self.is_overlap = True
print(self.second_data)
self.encode_publish(self.first_data, self.second_data)
else:
self.encode_connect()
if self.command == "publish":
self.is_overlap = True
self.encode_publish(self.first_data, self.second_data)
#we have no need to enocode other types of messages
else:
self.is_overlap = False
return self.raw_message
def build_header(self, command):
count = 0
remainder = self.contents_size
while remainder > 1:
count = count + 1
remainder = remainder / 128
#print("content size is: "+str(self.contents_size))
self.header_size = 2 + count
if count == 1:
var_header = chr(self.contents_size)
elif count ==2:
var_header = chr(int(self.contents_size/128)) + chr(self.contents_size%128)
elif count ==3:
var_header = chr(int(self.contents_size/128/128)) + chr(self.contents_size%128) + chr(self.contents_size%128%128)
self.command = command
if command == "connect":
self.command_bits = 1
elif command == "publish":
self.command_bits = 3
elif command == "forward":
self.command_bits = 9
elif command == "subscribe":
self.command_bits = 8
elif command == "unsubscribe":
self.command_bits = 10
elif command == "disconnect":
self.command_bits = 14
#assume no flags are used
self.header = bytearray((self.command_bits<<4).to_bytes(1,"big")) + bytearray((var_header+chr(0)).encode("utf-8"))
return self.header
def encode_connect(self):
self.second_data = ""
#can't be bothered, my broker won't need to send username and password
if bool(self.second_data) is False:
if self.first_data is False:
name = "Lawrence's_Broker"
name_length = len(name)
self.contents_size = 11 + name_length
else:
name = self.first_data
name_length = len(self.first_data)
self.contents_size = 11 + name_length
self.build_header("connect")
string = "\x04MQTT\x04\x02\x00<\x00" + chr(name_length) + name
self.contents = bytearray(string.encode("utf-8"))
self.raw_array = self.header + self.contents
self.raw_message = self.raw_array
self.raw_size = len(self.raw_message)
def encode_forward(self):
self.contents_size = 2
self.build_header("forward")
self.contents = bytearray(b"\x01")
self.raw_array = self.header + self.contents
self.raw_message = self.raw_array
self.raw_size = len(self.raw_message)
def encode_publish(self,topic,payload):
self.excess = self.raw_message
self.excess_size =self.raw_size
self.contents_size = self.contents_size + len(topic) + len(payload) + 1
self.build_header("publish")
if type(self.second_data) == str:
string = chr(len(topic)) + topic + payload
self.contents = bytearray(string.encode("utf-8"))
elif type(self.second_data) == bytes or bytearray:
print("bytes or bytearray")
string = chr(len(topic)) + topic
self.contents = bytearray(string.encode("utf-8")) + bytearray(self.second_data)
self.raw_array = self.header + self.contents
if self.is_overlap:
self.raw_message = self.excess + self.raw_array
else:
self.raw_message = self.raw_array
self.raw_size = len(self.raw_message)
def qos_respsonse(self, command=False):
if command is False:
command = self.command
if command == "connect":
return bytearray(b"\x02\x00\x00")
#Could add here the qos responses for subscribe and publish
def forward_publish(client, topic, message):
publication = mqtt_message("forward", topic, message)
message = publication.encode()
print("outgoing message: "+ str(message))
client.send(message)
def getmyip():
hostname = socket.gethostname()
address = socket.gethostbyname(hostname)
return address
print(getmyip())
def sendmessage(message, address=False):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(address)
print("connected to: "+str(address))
encodedmessage = bytes(message, encoding="utf-8")
client.send(encodedmessage)
print("message sent: "+message)
def authenticate_user(username, password):
if check_username_and_password(username, password):
return True
else:
return False
# By Fernando Campos on 31/03/2024
class ConnectionManager:
def __init__(self):
self.connections = []
self.lock = threading.Lock()
def add_connection(self, connection):
with self.lock:
self.connections.append(connection)
def remove_connection(self, connection):
with self.lock:
self.connections.remove(connection)
# Initialize the connection manager
connection_manager = ConnectionManager()
def run_server(port=1883): #Work by Lawrence
stop = False
def connection_handler(port):
ip = getmyip()
attempts = 0
while attempts < 20:
#because of reusing port issues
#this function keeps trying ports until it works
try:
address = (ip,port)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(address)
server.listen(10) #can hold a queue of 10 connections at once
break
except:
port = port+1
print("listener started. IP "+ip+" PORT "+str(port))
while stop is False:
client, client_address = server.accept()
print("got a connection from "+str(client_address))
data = client.recv(2097152)# max file size is 2 megabytes
disconnect = False
while data:
mqtt = mqtt_message(data)
print(client)
decoded_msg = mqtt.decode_all()
if mqtt.is_missing:
expected_size = mqtt.contents_size
current_size = mqtt.second_data_size
print("expected "+str(expected_size)+ " current " + str(current_size))
while current_size < expected_size - 15:
nextdata = client.recv(2097152)
if nextdata != data and type(decoded_msg[-1][2]) != str:
decoded_msg[-1][2] = decoded_msg[-1][2] + bytearray(data)
current_size = current_size + len(data)
print("more data concatenated")
continue
#print(decoded_msg)
for message in range(0,len(decoded_msg)):
print(decoded_msg[message][0])
#forward_publish(client, "topic","message")
if decoded_msg[message][0] == "publish":
on_publish(decoded_msg[message])
elif decoded_msg[message][0] == "connect" or "connect: with password":
connect_message = decoded_msg[message][0]
on_connect(decoded_msg[message][1],decoded_msg[message][2], client)
elif decoded_msg[message][0] == "subscribe":
on_subscribe(connect_message[1],connect_message[2],message[1])
#forward_publish(client, "thisistopic", "messge")
elif decoded_msg[message][0] == "unsubscribe":
on_unsubscribe(connect_message[1],connect_message[2],message[1])
pass
elif decoded_msg[message][0] == "disconnect":
disconnect = True
break
if disconnect:
break
data = client.recv(2097152) #picks up the next message, if there is one.
client.close()
server.close()
joblistener = threading.Thread(target=connection_handler, args=(port,))
joblistener.start()
input(">< press enter to stop|")
stop = True
print("listener ended\n")
#publishtestbytes = bytearray(b'\x10\x10\x00\x04MQTT\x04\x02\x00<\x00\x04Test0\x0e\x00\x05topicmessage')
#subscribetestbytes = bytearray(b'\x10\x10\x00\x04MQTT\x04\x02\x00<\x00\x04Test\x82\n\x00\x01\x00\x05topic\x00')
#unsubscribetestbytes = bytearray(b'\x10\x10\x00\x04MQTT\x04\x02\x00<\x00\x04Test\xa2\t\x00\x01\x00\x05topic')
def testpic(port=1883,ip="192.168.1.120",topic="obj-rec"):
global this_file
client = paho.Client(paho.CallbackAPIVersion.VERSION1,"Test")
client.connect(ip,port)
cam = cv2.VideoCapture(0)
result, image = cam.read()
if result:
cv2.imwrite(this_file+'webcam.jpg', image)
time.sleep(4)
file = open(this_file+"webcam.jpg","rb")
payload = file.read()
file.close()
#predictions = get_predictions("picture.jpg")
#showpic(image)
client.publish(topic, payload)
def testpublish(topic, message, ip="192.168.43.18", port = 1883):
def on_message(client, userdata, msg):
print(f"Recieved '{msg.payload.decode()}' from '{msg.topic}' topic")
client = paho.Client(paho.CallbackAPIVersion.VERSION1,"Test")
client.username_pw_set("username", "password") #must happen before connect to affect message
client.connect(ip,port)
#client.publish(topic+"hello"+"goodbye", message)
#client.unsubscribe(topic,0)
#client.subscribe(topic,qos=0)
client.on_message = on_message
client.loop_start()
client.disconnect()
#From Johnson
#Function for on_publish
def on_publish(mqtt):
global this_file #retrieves the address of where this python script is saved
mqtt = topic_logic(mqtt)
all_subscribed_people = list_subscribers()
filename = str(time.time())[5:-3]+".txt"
new_subscriber_list = this_file + filename #File path for subscribed people
new_data_file = this_file+filename
write_subscribed_users(all_subscribed_people, new_subscriber_list) #write data file
mqtt = payload_logic(mqtt)
file = open(new_data_file,"w")
file.write(mqtt[2])
file.close()
print("The list of subscribed people written to ", new_subscriber_list)
def list_subscribers(): #This will return the list of topic files
global this_file
topic_files_list = glob.glob(this_file+"topics\*.txt") #File path for topic
#print(topic_files_list)
all_subscribed_people = {} #To list the subscribed peoples for the topic
#For loop will go through the topic files and then return with subscribed people for their topic in a directory
for topic_file in topic_files_list: #This will extract the topic name from the file
extract_topic_name = os.path.basename(topic_file).split('.')[0]
all_subscribed_people[extract_topic_name] = []
topics_file = open(topic_file, "r")
for line in topics_file:
line_parts = line.strip().split(",") #This is used to split the lines using a comma
if len(line_parts) >= 2: #To see if there is atleast person and subscription on line_part
person = line_parts[0]
all_subscribed_people[extract_topic_name].append(person)
print(all_subscribed_people)
return all_subscribed_people
#Function to write subscribed users
def write_subscribed_users(all_subscribed_people, new_pub_sub_path):
subscribers_file = open(new_pub_sub_path, "w")
for topic, users in all_subscribed_people.items():
subscribers_file.write(f"Topic Name: {topic}\n")
subscribers_file.write("All subscribed people: \n")
for user in users:
subscribers_file.write(f"{user}\n")
subscribers_file.write("\n")
#By Lawrence ... again
def topic_logic(mqtt):
#mqtt is a list with format: ["command","topic","message"] #command is probably "publish"
topic_string = mqtt[1]
if topic_string[0] == "[":
#this is for handling a list of topics, the publisher sends a string containing list syntax
topic_list = []
next_topic = ""
for position in range(1,len(topic_string)-1):
if topic_string[position] == ",":
topic_list.append(next_topic)
elif topic_string[position] == "]":
topic_list.append(next_topic)
break
elif topic_string[position] == "'":
pass
else:
next_topic.append(topic_list[position])
else:
topic_list = [topic_string]
output_command = mqtt[0]
output_topics = topic_list
output_message = mqtt[2]
for position in range(0,len(topic_list)):
#for each topic, we check its name and perform operations on the message
if topic_list[position] == "obj-rec":
if type(mqtt[2]) == bytes or bytearray:
#perform object recognition
extra_topics = object_recognition(mqtt[2])
output_topics = output_topics + extra_topics
#add elif statements for topic based logic
return [output_command, output_topics, output_message]
def payload_logic(mqtt):
#if the published data is an image, replace the data with list of objects
if len(mqtt[2]) > 1000:
for position in range(0,len(mqtt[1])):
if mqtt[1][position] == "obj-rec":
objrec = position
break
returnlist = mqtt[1][objrec:]
return str(returnlist)
def object_recognition(bytearray):
global this_file
file = open(this_file+"obj_rec.jpg","wb")
file.write(bytearray)
predictions_list = get_predictions(this_file+"obj_rec.jpg")
return predictions_list
def get_predictions(photopath): #work by Lawrence
CLIENT = InferenceHTTPClient(api_url="https://detect.roboflow.com",api_key="gCKCuAfD1ZqFgdPTYvLS")
detections = CLIENT.infer(photopath, model_id="coco/9")
#print(detections)
number_of_predictions = len(detections["predictions"])
predictions_list = []
for position in range(number_of_predictions):
predictions_list.append(detections)
predictions_list = deleteDuplicates(predictions_list)
#payload = "Predictions: " + ', '.join(result)
#client.publish("cam174/x", payload)
#print("Predictions sent to broker")
print("Objects Detected: " + str(predictions_list))
return predictions_list
#Written by Semen
def deleteDuplicates(objects_list):
#New list without duplicates
new_objects_list = []
#List of all duplicates
duplicats_list = []
#Searching for target, target is duplicated words
for target in objects_list:
#If duplicated word has already detected - add to list of duplicates
if target in new_objects_list:
duplicats_list.append(target)
#If word wasn't detected before - add to list
else:
new_objects_list.append(target)
return new_objects_list
#Written by Semen
#Function for checking the username and password exsistance
#Note: chage the file path to yours
def check_username_and_password(username, password):
global this_file
userfile = open(this_file+"users.txt", "r")
for line in userfile:
parts = line.strip().split(",") #Processing each line of the file
if len(parts) >= 2: # Check if there are both username and password
name, key = parts[0], ','.join(parts[1:]) #Extraction of username and password
if name == username: #Checking if extracted name=username
if key == password: #Checking if extracted key=password
return True
else:
print("Incorrect password")
return False
print("Username not found.")
return False
#Written by Semen
#Function to check for username existence to avoid duplicates
#Note: chage the file path to yours
def check_username(username):
global this_file
user_exists_file = open(this_file+"users.txt", "r")
for line in user_exists_file:
parts = line.strip().split(",") #Splitting each line into parts by comma
if len(parts) >= 2:
name, _ = parts[0], ','.join(parts[1:]) #Extracting the username
if name == username:
return True
return False
#Written by Semen
#Subscribe function
def on_subscribe(username, password, topic):
global this_file
#If password is entered can proceed if not - else statement
#If details exsist - add the username to topic
if check_username_and_password(username, password):
file_path = this_file + f"topics\{topic}.txt"
user_topic_file = open(file_path, "a")
user_topic_file.write(f"\n{username}")
print("Authentification succsessful. Subscription to the topic succsessful.")
return True
else:
#Adding new user to the users file + topic file
print("Authentification failed. Creating new account.")
if check_username(username):
print("Username is not available.")
else:
users_file = open(this_file+"users.txt", "a")
users_file.write(f"\n{username},{password}") # Storing username and password
file_path = this_file+f"topics\{topic}.txt"
user_topic_file = open(file_path, "a")
user_topic_file.write(f"\n{username}")
print("Account created succsessfully. Subscription to the topic succsessful.")
#In case if password is empty
#To test function:
#on_subscribe("ann", "45667", "plane")
#By Anurag.
def on_unsubscribe(topic,sub_id):
path = f"{topic}.txt"
try:
# to read the file:
f1 = open(path,"r")
lines = f1.readlines()
#To remove the Subscriber:
cleaned_lines = []
for line in lines:
if line.strip() != sub_id:
cleaned_lines.append(line.strip())
# Write the updated list back to the file
with open(path, 'w') as file:
file.write('\n'.join(lines))
print("Subscriber Removed.")
except FileNotFoundError:
print(f"The {topic} file not found")
except Exception as exe:
print(f"A error occured while removing subscrober:{exe}")
#By Anurag
def on_connect(username, password, client):
credentials = False
if password:
If check_username_and_password:
credentials = True
Else:
If check_username:
Credentials = True
if credentials:
for publication in pub_sub:
for pub_sub_file in glob.glob("/publications/*.txt"):
with open(pub_sub_file, "r") as f:
pub_sub_file = json.load(f)
# Sends the published message to the publishing file.
forward_publish(client, “topic”, pub_sub_data["message"])
# Name gets removed from the published files.
os.remove(pub_sub_file)
# Get the list of publication file paths
pub_sub_file = glob.glob("/publications/*.txt")
# Check if the publication file existence and removes it
for file_path in pub_sub_file:
os.remove(file_path)