Skip to content
Permalink
56ccb32122
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
391 lines (377 sloc) 14.8 KB
import socket
import threading
import cv2
import paho
class mqtt_message: #copied from broker page
raw_message = bytes #when decoding, is just bytes instead of bytearray
# when encoding contains full message rather than last message
raw_array = bytearray #bytearrays are a mutable form of bytes
raw_size = int
is_overlap = bool
is_decoded = bool
excess = bytearray #data which is not part of the first message
excess_size = int
header = bytearray #contains command byte and contents information
header_size = int
contents = bytearray #contains data such as topic and message
contents_size = int
command_bits = int
command = str #connect, publish, subscribe...
flag_list = list
first_data_raw = bytearray
second_data_raw = bytearray
first_data_size = int
second_data_size = int
first_data = str
second_data = str
decoded = list
decoded_all = list #a single socket will often recieve multiple messages back2back, the decode function only decodes the first message.
number_of_messages = int
def __init__(self, argument1, argument2 = "", argument3 = ""): #argument is for data to be encoded or decoded
if type(argument1) == str:
self.is_decoded = True
self.command = argument1
self.first_data = argument2
self.second_data = argument3
elif type(argument1) == bytes or bytearray:
self.raw_message = argument1
self.raw_array = bytearray(argument1)
self.raw_size = len(self.raw_array)
self.is_decoded = False
else:
print("unknown argument type (only accepts bytes, bytearrays and strings)")
def decode(self):
#got help from http://www.steves-internet-guide.com/mqtt-protocol-messages-overview/#:~:text=MQTT%20is%20a%20binary%20based%20protocol%20were%20the,names%20and%20Passwords%20are%20encoded%20as%20UTF-8%20strings.
#this function identifies the command and locates the size and position of the contents.
self.command_bits = self.raw_array[0]>>4
flag_bits = self.raw_array[0]<<4
first_flag = bool(flag_bits>>7)
second_flag = bool((flag_bits>>6)%2)
third_flag = bool((flag_bits>>5)%4)
fourth_flag = bool((flag_bits>>4)%8)
self.flag_list = [first_flag,second_flag,third_flag,fourth_flag]
size = 0
for byte in range(1,self.raw_size):
if self.raw_array[byte] > 127:
size = size * 128
elif self.raw_array[byte] == 0:
header_end = byte
break
size = size + self.raw_array[byte]
self.header = self.raw_array[0:header_end+1]
self.header_size = len(self.header)
self.contents_size = size-1
contents_end = header_end+size
self.contents = self.raw_array[header_end+1:contents_end]
#the contents are decoded differently for each command
print(str(contents_end)+" "+str(self.raw_size))
if contents_end < self.raw_size:
#print(str(contents_end)+" "+str(self.raw_size))
self.excess = self.raw_array[contents_end:]
self.excess_size = len(self.excess)
if self.excess_size < 3:
self.is_overlap = False
else:
self.is_overlap = True
elif contents_end > self.raw_size:
self.is_overlap = False
print("part of message is missing")
else:
self.is_overlap = False
if self.command_bits == 1:
self.decode_connect()
elif self.command_bits == 2:
self.command = "connack"
elif self.command_bits == 3:
self.command = "publish"
self.decode_publish()
elif self.command_bits == 4:
self.command = "puback"
elif self.command_bits == 9:
self.command = "forward"
self.decode_forward()
elif self.command_bits == 8:
self.command = "subscribe"
self.decode_subscribe()
elif self.command_bits == 9:
self.command = "suback"
elif self.command_bits == 10:
self.command = "unsubscribe"
self.decode_unsubscribe()
elif self.command_bits == 14:
self.command = "disconnect"
self.decode_disconnect()
else:
print("unknown command")
self.decoded = [self.command,self.first_data,self.second_data]
self.number_of_messages = 1
return self.decoded
def decode_connect(self):
count = 0
name_found = False
username_found = False
password_found = False
for number in range(0,self.contents_size):
if self.contents[number] == 0:
count = count + 1
if count == 2 and name_found is False:
name_start = number + 2
name_found = True
elif count == 3 and name_found and username_found is False:
username_start = number + 2
username_found = True
elif count == 4 and username_found and password_found is False:
password_start = number + 2
password_found = True
break
if username_found:
self.command = "connect: with password"
self.first_data_size = self.contents[username_start-1]
self.first_data_raw = self.contents[username_start:username_start+self.first_data_size]
self.first_data = self.first_data_raw.decode("ascii")
self.second_data_size = self.contents[password_start-1]
self.second_data_raw = self.contents[password_start:password_start+self.second_data_size]
self.second_data = self.second_data_raw.decode("ascii")
else:
self.command = "connect"
self.first_data_size = self.contents[name_start-1]
name_end = name_start + self.first_data_size
self.first_data_raw = self.contents[name_start:name_end]
self.first_data = self.first_data_raw.decode("ascii")
self.second_data = ""
self.second_data_size = 0
def decode_publish(self):
self.first_data_size = self.contents[0]
topic_end = 1+self.first_data_size
self.first_data_raw = self.contents[1:topic_end]
self.first_data = self.first_data_raw.decode("ascii")
self.second_data_raw = self.contents[topic_end:]
self.second_data_size = len(self.second_data_raw)
self.second_data = self.second_data_raw.decode("ascii")
def decode_forward(self):
pass#does not need further decoding
def decode_subscribe(self):
for byte in range(0,self.contents_size):
if self.contents[byte] == 0:
topic_start = byte + 2
break
self.first_data_size = self.contents[topic_start-1]
self.first_data_raw = self.contents[topic_start:topic_start+self.first_data_size]
self.first_data = self.first_data_raw.decode("ascii")
self.second_data = ""
def decode_unsubscribe(self): #yes it is identical to function above
for byte in range(0,self.contents_size):
if self.contents[byte] == 0:
topic_start = byte + 2
break
self.first_data_size = self.contents[topic_start-1]
self.first_data_raw = self.contents[topic_start:topic_start+self.first_data_size]
self.first_data = self.first_data_raw.decode("ascii")
self.second_data = ""
def decode_disconnect(self):
pass#does not need further decoding
def decode_puback(self):
if self.contents[0] == 0:
self.first_data = "Success"
elif self.contents[0] == 0x10:
self.first_data = "No Matching Subscribers"
else:
self.first_data = "Unknown Error"
self.second_data = ""
def decode_excess(self):
if self.is_overlap:
excess_object = mqtt_message(self.excess)
return excess_object.decode()
else:
print("error, there is no excess")
def decode_all(self):
decoding_list = []
mqtt_object = mqtt_message(self.raw_message)
count = 0
while True:
decoding_list.append(mqtt_object.decode())
print(str(mqtt_object.raw_array))
count = count + 1
if mqtt_object.is_overlap:
mqtt_object = mqtt_message(mqtt_object.excess)
else:
break
self.number_of_messages = count
self.decoded_all = decoding_list
return self.decoded_all
def encode(self):
if self.command == "forward":
self.encode_forward()
self.is_overlap = True
self.encode_publish(self.first_data, self.second_data)
else:
self.encode_connect()
if self.command == "publish":
self.is_overlap = True
self.encode_publish(self.first_data, self.second_data)
#we have no need to enocode other types of messages
else:
self.is_overlap = False
return self.raw_message
def build_header(self, command):
count = 0
remainder = self.contents_size
while remainder > 1:
count = count + 1
remainder = remainder / 128
self.header_size = 2 + count
if count == 1:
var_header = chr(self.contents_size)
elif count ==2:
var_header = chr(int(self.contents_size/128)) + chr(self.contents_size%128)
elif count ==3:
var_header = chr(int(self.contents_size/128/128)) + chr(self.contents_size%128) + chr(self.contents_size%128%128)
self.command = command
if command == "connect":
self.command_bits = 1
elif command == "publish":
self.command_bits = 3
elif command == "forward":
self.command_bits = 9
elif command == "subscribe":
self.command_bits = 8
elif command == "unsubscribe":
self.command_bits = 10
elif command == "disconnect":
self.command_bits = 14
#assume no flags are used
string = chr(self.command_bits*16) + var_header + chr(0)
self.header = bytearray(string.encode("utf-8"))
return self.header
def encode_connect(self):
self.second_data = ""
#can't be bothered, my broker won't need to send username and password
if bool(self.second_data) is False:
if self.first_data is False:
name = "Lawrence's_Broker"
name_length = len(name)
self.contents_size = 11 + name_length
else:
name = self.first_data
name_length = len(self.first_data)
self.contents_size = 11 + name_length
else:
pass
self.build_header("connect")
string = "\x04MQTT\x04\x02\x00<\x00" + chr(name_length) + name
self.contents = bytearray(string.encode("utf-8"))
self.raw_array = self.header + self.contents
self.raw_message = self.raw_array
self.raw_size = len(self.raw_message)
def encode_forward(self):
self.contents_size = 2
self.build_header("forward")
self.contents = bytearray(b"\x01\x00")
self.raw_array = self.header + self.contents
self.raw_message = self.raw_array
self.raw_size = len(self.raw_message)
def encode_publish(self,topic,payload):
self.excess = self.raw_message
self.excess_size =self.raw_size
self.contents_size = self.contents_size + len(topic + payload)
self.build_header("publish")
string = chr(len(topic)) + topic + payload
self.contents = bytearray(string.encode("utf-8"))
self.raw_array = self.header + self.contents
if self.is_overlap:
self.raw_message = self.excess + self.raw_array
else:
self.raw_message = self.raw_array
self.raw_size = len(self.raw_message)
def qos_respsonse(self, command=False):
if command is False:
command = self.command
if command == "connect":
return bytearray(b"\x02\x00\x00")
#Could add here the qos responses for subscribe and publish
#got help from https://builtin.com/data-science/python-socket
def run_client(ip="192.168.1.120",port=1883): #Work by Lawrence
stop = False
'''attempts = 0
while attempts < 20:
#because of reusing port issues
#this function keeps trying ports until it works
try:
address = (getmyip(),port)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(address)
server.listen(10) #can hold a queue of 10 connections at once
break
except:
port = port+1'''
def client_loop(ip, port):
print(ip)
print(port)
cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cs.connect((ip, port))
test(cs)
print("connection established")
while stop is False:
data = cs.recv(2097152)# max file size is 2 megabytes
disconnect = False
while data:
try:
mqtt = mqtt_message(data)
print(mqtt.raw_message)
decoded_msg = mqtt.decode_all()
print(decoded_msg)
for message in range(0,len(decoded_msg)):
print(decoded_msg[message][0])
if decoded_msg[message][0] == "publish":
client_on_publish(decoded_msg[message])
elif decoded_msg[message][0] == "disconnect":
disconnect = True
break
if disconnect:
break
finally:
pass
#print("stopped listening to client")
#print("unusual data detected")
data = cs.recv(2097152) #picks up the next message, if there is one.
cs.close()
joblistener = threading.Thread(target=client_loop, args=(ip,port))
joblistener.start()
input(">< press enter to stop|")
stop = True
print("listener ended\n")
def getmyip():
hostname = socket.gethostname()
address = socket.gethostbyname(hostname)
return address
print(getmyip())
def client_on_publish(mqtt):
print(mqtt)
#write stuff here like saving results to a file
def test(client_socket, topic="topic",message="message"):
publish_object = mqtt_message("publish",topic,message)
publish_object.encode()
connect_object = mqtt_message("connect","client_script",message)
connect_object.encode()
subscribe_object = mqtt_message("subscribe",topic,message)
subscribe_object.encode()
unsubscribe_object = mqtt_message("unsubscribe",topic,message)
unsubscribe_object.encode()
encoded_message = bytearray(b"")
if connect_object:
encoded_message = encoded_message + connect_object.raw_message
if publish_object:
encoded_message = encoded_message + publish_object.raw_message
if subscribe_object:
encoded_message = encoded_message + subscribe_object.raw_message
if unsubscribe_object:
encoded_message = encoded_message + unsubscribe_object.raw_message
print("message sent: " + str(encoded_message))
client_socket.send(encoded_message)
def sendmessage(message, address=False):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(address)
print("connected to: "+str(address))
encodedmessage = bytes(message, encoding="utf-8")
client.send(encodedmessage)
print("message sent: "+message)