import pymysql import paho.mqtt.client as mqtt import json import threading from time import sleep as sl from datetime import datetime as dt from concurrent.futures import ThreadPoolExecutor class MQTT(object): # def __init__(self, username, password, ip, port, time, sub_topic): # self.client = mqtt.Client() # self.pool = ThreadPoolExecutor(10) # self.username = username # self.password = password # self.ip = ip # self.port = port # self.time = time # self.sub_topic = sub_topic # self.D = {} # for i in range(0, len(self.sub_topic)): # self.D[self.sub_topic] = 0 # self.res = 0 def __init__(self): self.res = 0 # 當地端程式連線伺服器得到回應時,要做的動作 def on_connect(client, userdata, flags, rc): # print("Waiting for MQTT message...") # 將訂閱主題寫在on_connet中 # 如果我們失去連線或重新連線時 # 地端程式將會重新訂閱 # 判斷是否為元組,有多個訂閱主題 # if isinstance(self.sub_topic, tuple): # for i in range(0, len(self.sub_topic)): # client.subscribe(self.sub_topic[i]) # client.subscribe(self.sub_topic) client.subscribe('AISKY/Coffee/MK-G/b8:27:eb:b4:59:3e/Log') # 當接收到從伺服器發送的訊息時要進行的動作 def on_message(client, userdata, msg): msg = msg payload = json.loads(msg.payload.decode('utf-8')) self.res = payload print('self.res = payload :', payload) # if payload: # self.D[self.sub_topic] = 1 # self.res = payload self.client = mqtt.Client() self.command = {'tank-number':'1', 'command':'tank_motor_status', 'value':'on'} def thread_job(self): self.client.on_connect = on_connect # 設定接收訊息的動作 self.client.on_message = on_message # 設定登入帳號密碼 self.client.username_pw_set("aisky-client", "aiskyc") # self.client.username_pw_set(self.username, self.password) # 設定連線資訊(IP, Port, 連線時間) self.client.connect("60.250.156.234", 1883, 60) # self.client.connect(self.ip, self.port, self.time) # 開始連線,執行設定的動作和處理重新連線問題 # 也可以手動使用其他loop函式來進行連接 self.client.loop_forever() mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,)) mqtt_subscribe_thread.daemon = True mqtt_subscribe_thread.start() # 睡1秒等執行敘跑完才跑主程式 sl(1) # while True: # pass def mqttPublish(self, topic, command): self.client.publish(topic, json.dumps(command)) print('json.dumps(command):', json.dumps(command)) # print('done') return True # while True: # if self.D[self.sub_topic] == 1: # return self.res # def workOn(self): # self = self # def thread_job(self): # # 設定連線的動作 # self.client.on_connect = on_connect # # 設定接收訊息的動作 # self.client.on_message = on_message # # 設定登入帳號密碼 # self.client.username_pw_set("aisky-client", "aiskyc") # # self.client.username_pw_set(self.username, self.password) # # 設定連線資訊(IP, Port, 連線時間) # self.client.connect("60.250.156.234", 1883, 60) # # self.client.connect(self.ip, self.port, self.time) # # 開始連線,執行設定的動作和處理重新連線問題 # # 也可以手動使用其他loop函式來進行連接 # self.client.loop_forever() # # mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,)) # mqtt_subscribe_thread.daemon = True # mqtt_subscribe_thread.start() # # 睡1秒等執行敘跑完才跑主程式 # sl(1) # return "success" # self.queryMySQL() # self.cursor.close() # self.db.close() # self.pool.shutdown(wait=True) # self.mqttPublish('AISKY/Coffee/MK-G/b8:27:eb:b4:59:3e', self.command) # while True: # pass if __name__ == '__main__': mqtt = MQTT() try: # mqtt.workOn() mqtt.mqttPublish('AISKY/Coffee/MK-G/b8:27:eb:b4:59:3e', mqtt.command) print('mqttPublish_test') while True: pass except KeyboardInterrupt: mqtt.pool.shutdown(wait=True) mqtt.cursor.close() mqtt.db.close()