123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import pymysql
- import paho.mqtt.client as mqtt
- import json
- import threading
- from time import sleep as sl
- from datetime import datetime as dt
- from concurrent.futures import ThreadPoolExecutor
- class MQTT(object):
- # def __init__(self, username, password, ip, port, time, sub_topic):
- # self.client = mqtt.Client()
- # self.pool = ThreadPoolExecutor(10)
- # self.username = username
- # self.password = password
- # self.ip = ip
- # self.port = port
- # self.time = time
- # self.sub_topic = sub_topic
- # self.D = {}
- # for i in range(0, len(self.sub_topic)):
- # self.D[self.sub_topic] = 0
- # self.res = 0
- def __init__(self):
- self.res = 0
- # 當地端程式連線伺服器得到回應時,要做的動作
- def on_connect(client, userdata, flags, rc):
- # print("Waiting for MQTT message...")
- # 將訂閱主題寫在on_connet中
- # 如果我們失去連線或重新連線時
- # 地端程式將會重新訂閱
- # 判斷是否為元組,有多個訂閱主題
- # if isinstance(self.sub_topic, tuple):
- # for i in range(0, len(self.sub_topic)):
- # client.subscribe(self.sub_topic[i])
- # client.subscribe(self.sub_topic)
- client.subscribe('AISKY/Coffee/MK-G/b8:27:eb:b4:59:3e/Log')
- # 當接收到從伺服器發送的訊息時要進行的動作
- def on_message(client, userdata, msg):
- msg = msg
- payload = json.loads(msg.payload.decode('utf-8'))
- self.res = payload
- print('self.res = payload :', payload)
- # if payload:
- # self.D[self.sub_topic] = 1
- # self.res = payload
- self.client = mqtt.Client()
- self.command = {'tank-number':'1', 'command':'tank_motor_status', 'value':'on'}
- def thread_job(self):
- self.client.on_connect = on_connect
- # 設定接收訊息的動作
- self.client.on_message = on_message
- # 設定登入帳號密碼
- self.client.username_pw_set("aisky-client", "aiskyc")
- # self.client.username_pw_set(self.username, self.password)
- # 設定連線資訊(IP, Port, 連線時間)
- self.client.connect("60.250.156.234", 1883, 60)
- # self.client.connect(self.ip, self.port, self.time)
- # 開始連線,執行設定的動作和處理重新連線問題
- # 也可以手動使用其他loop函式來進行連接
- self.client.loop_forever()
- mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,))
- mqtt_subscribe_thread.daemon = True
- mqtt_subscribe_thread.start()
- # 睡1秒等執行敘跑完才跑主程式
- sl(1)
- # while True:
- # pass
- def mqttPublish(self, topic, command):
- self.client.publish(topic, json.dumps(command))
- print('json.dumps(command):', json.dumps(command))
- # print('done')
- return True
- # while True:
- # if self.D[self.sub_topic] == 1:
- # return self.res
- # def workOn(self):
- # self = self
- # def thread_job(self):
- # # 設定連線的動作
- # self.client.on_connect = on_connect
- # # 設定接收訊息的動作
- # self.client.on_message = on_message
- # # 設定登入帳號密碼
- # self.client.username_pw_set("aisky-client", "aiskyc")
- # # self.client.username_pw_set(self.username, self.password)
- # # 設定連線資訊(IP, Port, 連線時間)
- # self.client.connect("60.250.156.234", 1883, 60)
- # # self.client.connect(self.ip, self.port, self.time)
- # # 開始連線,執行設定的動作和處理重新連線問題
- # # 也可以手動使用其他loop函式來進行連接
- # self.client.loop_forever()
- #
- # mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,))
- # mqtt_subscribe_thread.daemon = True
- # mqtt_subscribe_thread.start()
- # # 睡1秒等執行敘跑完才跑主程式
- # sl(1)
- # return "success"
- # self.queryMySQL()
- # self.cursor.close()
- # self.db.close()
- # self.pool.shutdown(wait=True)
- # self.mqttPublish('AISKY/Coffee/MK-G/b8:27:eb:b4:59:3e', self.command)
- # while True:
- # pass
- if __name__ == '__main__':
- mqtt = MQTT()
- try:
- # mqtt.workOn()
- mqtt.mqttPublish('AISKY/Coffee/MK-G/b8:27:eb:b4:59:3e', mqtt.command)
- print('mqttPublish_test')
- while True:
- pass
- except KeyboardInterrupt:
- mqtt.pool.shutdown(wait=True)
- mqtt.cursor.close()
- mqtt.db.close()
|