Skip to content
Permalink
1394c3bbad
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
@ac2028
Latest commit 1394c3b Jul 9, 2024 History
1 contributor

Users who have contributed to this file

135 lines (107 sloc) 3.42 KB
"""
Creates multiple Connections to a broker
and sends and receives messages. Support SSL and Normal connections
uses the loop_start and stop functions just like a single client
Shows number of thread used
use loop() but not loop_start(_) as in demo1a.py to store client data and it is suitable when dealing with multiple brokers and topics
"""
import paho.mqtt.client as mqtt
import time
import json
import threading
import logging
clients=[
{"broker":"192.168.1.159","port":1883,"name":"blank","sub_topic":"test1","pub_topic":"test1"},
{"broker":"192.168.1.65","port":1883,"name":"blank,"sub_topic":"test2","pub_topic":"test2"}
]
nclients=len(clients)
message="test message"
out_queue=[] #use simple array to get printed message in some form of MjAxMTIxODU5MzJaMEgxCzAJBgNVBAYTAkJNMRkwFwYDVQQKExBRdW9WYWRpcyBM
def on_log(client,userdata,level,buf):
print(buf)
def on_message(client,userdata,message):
time.sleep(1)
msg="message received",str(message.payload.decode("utf-8"))
out_queue.append(msg)
def on_connect(client,userdata,flags,rc):
if rc==0:
client.connected_flag=True #set flags
client.subscribe(topic)
for i in range(nclients):
if clients[i]["client"]==client:
topic=clients[i]["sub_topic"]
break
client.subscribe(topic)
else:
print("Bad connection Retured code" ,rc)
client.loop_stop()
def on_disconnect(client,userdata,mid):
pass
def on_publish(client,userdata,mid):
time.sleep(1)
print("In on pub callback mid=" ,mid)
def Create_connections():
for i in range(nclients):
cname="client"+str(i)
t=int(time.time)
client_id=cname+str(t) #create unique client_id
client=mqtt.Client(client_id) #create new instance
clients[i]["client"]=client
clients[i]["client_id"]=client_id
clients[i]["cname"]=cname
broker=clients[i]["broker"]
port=clients[i]["port"]
try:
client.connect(broker,port) #establish connection
except:
print("Connection failed to broker" ,broker)
continue
#client.on_log=on_log #give detailed logging
client.on_connect=on_connect
client.on_disconnect=on_disconnect
#client.on_publish=on_publish
client.on_message=on_message
while not client.connected_flag:
client.loop(0.01) #check for messages
time.sleep(0.05)
mqtt.Client.connected_flag=False #create flag in class
no_threads=threading.active_count()
print("current threads=",no_threads)
print("Creating Connections",nclients,"clients")
Create_connections()
t=threading.Thread(target=multi_loop,args=(nclients,True)) #start multi loop
t.start()
print("All clients connected")
time.sleep(5)
count=0
no_threads=threading.active_count()
print("current_threads=",no_threads)
print("Publishing")
Run_Flag=True
#bad example when using loop() as the while loop
#lots of sleep calls
try:
while Run_Flag:
i=0:
for i in range(nclients):
client=clients[i]["client"]
pub_topic=clients[i]["pub_topic"]
counter=str(count).rjust(6,"0")
msg="client"+str(i)+""+counter+"XXXXXX"+message
if client.connected_flag:
client.publish(pub_topic,msg)
time.sleep(0.1)
print("publishing client" +str(i))
i+=1
time.sleep(10) #now print messages
print("queue length=",len(out_queue))
for x in range(len(out_queue)):
print(out_queue.pop())
count+=1
except KeyboardInterrupt:
print("interrupted by keyboard")
for client in clients:
client.disconnect()
multi_loop(flag=False) #stop loop
#allow time for allthreads to stop before exiting
time.sleep(10)