Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
"""
Creates multiple Connections to a broker
and sends and receives messages.
Use one thread per client
Shows number of thread used
"""
import paho.mqtt.client as mqtt
import time
import json
import threading
import logging
logging.basicConfig(level=logging.INFO)
clients=[
{"broker":"192.168.1.159","port":1883,"name":"blank","sub_topic":"test1","pub_topic":"test1"},
{"broker":"192.168.1.65","port":1883,"name":"blank,"sub_topic":"test2","pub_topic":"test2"}
]
nclients=len(clients)
message="test message"
def Connect(client,broker,port,keepalive,run_forever=False):
"""Attempts connectionset delay>1 to keep trying but at longer intervals. If runforever flag is true then it will keep trying to connect or reconnect indefinitely otherwise it will give up after 3 failed Attempts"""
connflag=False
delay=5
badcount=0 #bad connection Attempts
while not connflag:
logging.info("conneting to the broker "+str(broker))
print("Attempts ",str(badcount)
time.sleep(delay)
try:
client.connect(broker,port,keepalive)
connflag=True
except:
client.bad_connection_flag=True
logging.info("connection failed" +str(badcount))
badount+=1
if badcount > 3 and not run_forever:
return -1
raise SystemExit #give up
return 0
###end connecting
def wait_for(client,msgType,period=1,wait_time=10,running_loop=False):
"""Will wait for a particular event gives up after period*wait_time. Default=10seconds. Returns True if successful, False if fails"""
#running loop is True when using loop_start or loop_forever
client.running_loop=running_loop #
wcounts=0
while True:
logging.info("waiting"+msgType)
if msgType=="CONNACK":
if client.on_connect:
if client.connected_flag:
return True
if client.bad_connection_flag:
return False
if msgType=="SUBACK":
if client.on_subscribe:
if client.suback_flag:
return True
if msgType=="MESSAGE":
if client.on_message:
if client.message_received_flag:
return True
if msgType=="PUBACK":
if client.on_publish:
if client.puback_flag:
return True
if not client.running_loop:
client.loop(.01) #check for messages manually
time.sleep(period)
wcounts+=1
if wcount > wait_time:
print("return from wait loop taken too long")
return False
return True
def client_loop(client,broker,port,keep_alive=60,loop_function=None,loop_delay=1,run_forever=False):
"""runs a loop that will auto reconnect and subscribe to topics, pass topics as a list of tuples.We can pass a function
to be called at set intervals determined by the loop_delay
"""
client.run_flag=True
client.broker=broker
client.reconnect_delay_set(min_delay=1,max_delay=12)
while client.run_flag: #loop for ever
if client.bad_connection_flag:
break
if not client.connected_flag:
print("Connecting to ",broker)
if Connect(client,broker,port,keepalive,run_forever)!=1
if not wait_for(client,"CONNACK"):
client.run_flag=False #break no CONNACK
else: #connection fails
client.run_flag=False #break
print("quitting loop for broker ",broker)
client.loop(0.01)
if client.connected_flag and loop_function #function to call
loop_function(client,loop_delay) #call function
time.sleep(1)
print("Disonnecting from", broker)
if client.connected_flag:
client.disconnect()
client.connected_flag=False
def on_log(client,userdata,level,buf):
print(buf)
def on_message(client,userdata,message):
time.sleep(1)
print("message received",str(message.payload.decode("utf-8")))
def on_connect(client,userdata,flags,rc):
if rc==0:
client.connected_flag=True #set flags
for c in clients:
if client==c["client"] :
if c["sub_topic"]!="":
client.subscribe(c["sub_topic"])
else:
print("Bad connection Returned code=", rc)
client.loop_stop()
def on_disconnect(client,userdata,mid):
client.connected_flag=False #set the flag
def on_publish(client,userdata,mid):
time.sleep(1)
print("in on_pub callback mid=" ,mid)
def pub(client, loop_delay):
#print("in publish")
pass
def Create_connections():
for i in range(nclients):
cname="client"+str(i)
t=int(time.time())
client_id=cname+str(t) #create unique client_id
client=mqtt.Client(client_id) #create new instance
clients[i]["client"]=client
clients[i]["client_id"]=client_id
clients[i]["cname"]=cname
broker=clients[i]["broker"]
port=clients[i]["port"]
client.on_connect=on_connect()
client.on_disconnect=on_disconnect
client.on_message=on_message
t=threading.Thread(target=client_loop,args=(client,broker,port,60s,pub))
threads.append(t)
t.start()
mqtt.Client.connected_flag=False #create flag in class
mqtt.Client.bad_connection_flag=False
threads=[]
print("Creating connections")
no_threads=threading.active_count()
print("Publishing")
Create_connections()
print("All clients connected")
no_threads=threading.active_count()
print("start main loop")
try:
while True:
time.sleep(10)
no_threads=threading.active_count()
for c in nclients:
if not c["client"].connected_flag:
print("broker",c["broker"],"is disconnected")
except KeyboardInterrupt:
print("ending")
for c in nclients:
c["client"].Run_Flag=False
time.sleep(10)