|
@@ -1,136 +1,136 @@
|
|
-import pymysql
|
|
|
|
-import paho.mqtt.client as mqtt
|
|
|
|
-import json
|
|
|
|
-import threading
|
|
|
|
-from time import sleep as sl
|
|
|
|
-from datetime import datetime as dt
|
|
|
|
-from concurrent.futures import ThreadPoolExecutor
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-class MQTT(object):
|
|
|
|
- # def __init__(self, username, password, ip, port, time, sub_topic):
|
|
|
|
- # self.client = mqtt.Client()
|
|
|
|
- # self.pool = ThreadPoolExecutor(10)
|
|
|
|
- # self.username = username
|
|
|
|
- # self.password = password
|
|
|
|
- # self.ip = ip
|
|
|
|
- # self.port = port
|
|
|
|
- # self.time = time
|
|
|
|
- # self.sub_topic = sub_topic
|
|
|
|
- # self.D = {}
|
|
|
|
- # for i in range(0, len(self.sub_topic)):
|
|
|
|
- # self.D[self.sub_topic] = 0
|
|
|
|
- # self.res = 0
|
|
|
|
- def __init__(self):
|
|
|
|
- self.res = 0
|
|
|
|
- # 當地端程式連線伺服器得到回應時,要做的動作
|
|
|
|
- def on_connect(client, userdata, flags, rc):
|
|
|
|
- # print("Waiting for MQTT message...")
|
|
|
|
- # 將訂閱主題寫在on_connet中
|
|
|
|
- # 如果我們失去連線或重新連線時
|
|
|
|
- # 地端程式將會重新訂閱
|
|
|
|
- # 判斷是否為元組,有多個訂閱主題
|
|
|
|
- # if isinstance(self.sub_topic, tuple):
|
|
|
|
- # for i in range(0, len(self.sub_topic)):
|
|
|
|
- # client.subscribe(self.sub_topic[i])
|
|
|
|
- # client.subscribe(self.sub_topic)
|
|
|
|
- client.subscribe('AISKY/Coffee/MK-G/b8:27:eb:b4:59:3e/Log')
|
|
|
|
-
|
|
|
|
- # 當接收到從伺服器發送的訊息時要進行的動作
|
|
|
|
- def on_message(client, userdata, msg):
|
|
|
|
- msg = msg
|
|
|
|
-
|
|
|
|
- payload = json.loads(msg.payload.decode('utf-8'))
|
|
|
|
- self.res = payload
|
|
|
|
- print('self.res = payload :', payload) # check! 11/16 從硬體發出這邊接收到的訊息
|
|
|
|
- # if payload:
|
|
|
|
- # self.D[self.sub_topic] = 1
|
|
|
|
- # self.res = payload
|
|
|
|
-
|
|
|
|
- self.client = mqtt.Client()
|
|
|
|
- # self.command = {'tank-number':'1', 'command':'tank_motor_status', 'value':'on'}
|
|
|
|
- self.command = {'tank-number':'', 'command':'coffee_mqtt_restart', 'value':''}
|
|
|
|
-
|
|
|
|
- def thread_job(self):
|
|
|
|
- # 設定連線設定動作
|
|
|
|
- self.client.on_connect = on_connect
|
|
|
|
- # 設定接收訊息的動作
|
|
|
|
- self.client.on_message = on_message
|
|
|
|
- # 設定登入帳號密碼
|
|
|
|
- self.client.username_pw_set("aisky-client", "aiskyc")
|
|
|
|
- # self.client.username_pw_set(self.username, self.password)
|
|
|
|
- # 設定連線資訊(IP, Port, 連線時間)
|
|
|
|
- self.client.connect("60.250.156.234", 1883, 60)
|
|
|
|
- # self.client.connect(self.ip, self.port, self.time)
|
|
|
|
- # 開始連線,執行設定的動作和處理重新連線問題
|
|
|
|
- # 也可以手動使用其他loop函式來進行連接
|
|
|
|
- self.client.loop_forever()
|
|
|
|
-
|
|
|
|
- mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,))
|
|
|
|
- mqtt_subscribe_thread.daemon = True
|
|
|
|
- mqtt_subscribe_thread.start()
|
|
|
|
- # 睡1秒等執行敘跑完才跑主程式
|
|
|
|
- sl(1)
|
|
|
|
- # while True:
|
|
|
|
- # pass
|
|
|
|
-
|
|
|
|
- def mqttPublish(self, topic, command):
|
|
|
|
- self.client.publish(topic, json.dumps(command))
|
|
|
|
- print('json.dumps(command):', json.dumps(command))
|
|
|
|
- # print('done')
|
|
|
|
- return True
|
|
|
|
- # while True:
|
|
|
|
- # if self.D[self.sub_topic] == 1:
|
|
|
|
- # return self.res
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- # def workOn(self):
|
|
|
|
- # self = self
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- # def thread_job(self):
|
|
|
|
- # # 設定連線的動作
|
|
|
|
- # self.client.on_connect = on_connect
|
|
|
|
- # # 設定接收訊息的動作
|
|
|
|
- # self.client.on_message = on_message
|
|
|
|
- # # 設定登入帳號密碼
|
|
|
|
- # self.client.username_pw_set("aisky-client", "aiskyc")
|
|
|
|
- # # self.client.username_pw_set(self.username, self.password)
|
|
|
|
- # # 設定連線資訊(IP, Port, 連線時間)
|
|
|
|
- # self.client.connect("60.250.156.234", 1883, 60)
|
|
|
|
- # # self.client.connect(self.ip, self.port, self.time)
|
|
|
|
- # # 開始連線,執行設定的動作和處理重新連線問題
|
|
|
|
- # # 也可以手動使用其他loop函式來進行連接
|
|
|
|
- # self.client.loop_forever()
|
|
|
|
- #
|
|
|
|
- # mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,))
|
|
|
|
- # mqtt_subscribe_thread.daemon = True
|
|
|
|
- # mqtt_subscribe_thread.start()
|
|
|
|
- # # 睡1秒等執行敘跑完才跑主程式
|
|
|
|
- # sl(1)
|
|
|
|
- # return "success"
|
|
|
|
- # self.queryMySQL()
|
|
|
|
- # self.cursor.close()
|
|
|
|
- # self.db.close()
|
|
|
|
- # self.pool.shutdown(wait=True)
|
|
|
|
- # self.mqttPublish('AISKY/Coffee/MK-G/b8:27:eb:b4:59:3e', self.command)
|
|
|
|
- # while True:
|
|
|
|
- # pass
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-if __name__ == '__main__':
|
|
|
|
- mqtt = MQTT()
|
|
|
|
- try:
|
|
|
|
- # mqtt.workOn()
|
|
|
|
- mqtt.mqttPublish('AISKY/Coffee/MK-G/b8:27:eb:b4:59:3e', mqtt.command)
|
|
|
|
- print('mqttPublish_test')
|
|
|
|
- while True:
|
|
|
|
- pass
|
|
|
|
- except KeyboardInterrupt:
|
|
|
|
- mqtt.pool.shutdown(wait=True)
|
|
|
|
- mqtt.cursor.close()
|
|
|
|
|
|
+import pymysql
|
|
|
|
+import paho.mqtt.client as mqtt
|
|
|
|
+import json
|
|
|
|
+import threading
|
|
|
|
+from time import sleep as sl
|
|
|
|
+from datetime import datetime as dt
|
|
|
|
+from concurrent.futures import ThreadPoolExecutor
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class MQTT(object):
|
|
|
|
+ # def __init__(self, username, password, ip, port, time, sub_topic):
|
|
|
|
+ # self.client = mqtt.Client()
|
|
|
|
+ # self.pool = ThreadPoolExecutor(10)
|
|
|
|
+ # self.username = username
|
|
|
|
+ # self.password = password
|
|
|
|
+ # self.ip = ip
|
|
|
|
+ # self.port = port
|
|
|
|
+ # self.time = time
|
|
|
|
+ # self.sub_topic = sub_topic
|
|
|
|
+ # self.D = {}
|
|
|
|
+ # for i in range(0, len(self.sub_topic)):
|
|
|
|
+ # self.D[self.sub_topic] = 0
|
|
|
|
+ # self.res = 0
|
|
|
|
+ def __init__(self):
|
|
|
|
+ self.res = 0
|
|
|
|
+ # 當地端程式連線伺服器得到回應時,要做的動作
|
|
|
|
+ def on_connect(client, userdata, flags, rc):
|
|
|
|
+ # print("Waiting for MQTT message...")
|
|
|
|
+ # 將訂閱主題寫在on_connet中
|
|
|
|
+ # 如果我們失去連線或重新連線時
|
|
|
|
+ # 地端程式將會重新訂閱
|
|
|
|
+ # 判斷是否為元組,有多個訂閱主題
|
|
|
|
+ # if isinstance(self.sub_topic, tuple):
|
|
|
|
+ # for i in range(0, len(self.sub_topic)):
|
|
|
|
+ # client.subscribe(self.sub_topic[i])
|
|
|
|
+ # client.subscribe(self.sub_topic)
|
|
|
|
+ client.subscribe('AISKY/Coffee/MK-G/b8:27:eb:7e:24:78/Log')
|
|
|
|
+
|
|
|
|
+ # 當接收到從伺服器發送的訊息時要進行的動作
|
|
|
|
+ def on_message(client, userdata, msg):
|
|
|
|
+ msg = msg
|
|
|
|
+
|
|
|
|
+ payload = json.loads(msg.payload.decode('utf-8'))
|
|
|
|
+ self.res = payload
|
|
|
|
+ print('self.res = payload :', payload) # check! 11/16 從硬體發出這邊接收到的訊息
|
|
|
|
+ # if payload:
|
|
|
|
+ # self.D[self.sub_topic] = 1
|
|
|
|
+ # self.res = payload
|
|
|
|
+
|
|
|
|
+ self.client = mqtt.Client()
|
|
|
|
+ # self.command = {'tank-number':'1', 'command':'tank_motor_status', 'value':'on'}
|
|
|
|
+ self.command = {'tank-number':'', 'command':'coffee_mqtt_restart', 'value':''}
|
|
|
|
+
|
|
|
|
+ def thread_job(self):
|
|
|
|
+ # 設定連線設定動作
|
|
|
|
+ self.client.on_connect = on_connect
|
|
|
|
+ # 設定接收訊息的動作
|
|
|
|
+ self.client.on_message = on_message
|
|
|
|
+ # 設定登入帳號密碼
|
|
|
|
+ self.client.username_pw_set("aisky-client", "aiskyc")
|
|
|
|
+ # self.client.username_pw_set(self.username, self.password)
|
|
|
|
+ # 設定連線資訊(IP, Port, 連線時間)
|
|
|
|
+ self.client.connect("60.250.156.234", 1883, 60)
|
|
|
|
+ # self.client.connect(self.ip, self.port, self.time)
|
|
|
|
+ # 開始連線,執行設定的動作和處理重新連線問題
|
|
|
|
+ # 也可以手動使用其他loop函式來進行連接
|
|
|
|
+ self.client.loop_forever()
|
|
|
|
+
|
|
|
|
+ mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,))
|
|
|
|
+ mqtt_subscribe_thread.daemon = True
|
|
|
|
+ mqtt_subscribe_thread.start()
|
|
|
|
+ # 睡1秒等執行敘跑完才跑主程式
|
|
|
|
+ sl(1)
|
|
|
|
+ # while True:
|
|
|
|
+ # pass
|
|
|
|
+
|
|
|
|
+ def mqttPublish(self, topic, command):
|
|
|
|
+ self.client.publish(topic, json.dumps(command))
|
|
|
|
+ print('json.dumps(command):', json.dumps(command))
|
|
|
|
+ # print('done')
|
|
|
|
+ return True
|
|
|
|
+ # while True:
|
|
|
|
+ # if self.D[self.sub_topic] == 1:
|
|
|
|
+ # return self.res
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ # def workOn(self):
|
|
|
|
+ # self = self
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ # def thread_job(self):
|
|
|
|
+ # # 設定連線的動作
|
|
|
|
+ # self.client.on_connect = on_connect
|
|
|
|
+ # # 設定接收訊息的動作
|
|
|
|
+ # self.client.on_message = on_message
|
|
|
|
+ # # 設定登入帳號密碼
|
|
|
|
+ # self.client.username_pw_set("aisky-client", "aiskyc")
|
|
|
|
+ # # self.client.username_pw_set(self.username, self.password)
|
|
|
|
+ # # 設定連線資訊(IP, Port, 連線時間)
|
|
|
|
+ # self.client.connect("60.250.156.234", 1883, 60)
|
|
|
|
+ # # self.client.connect(self.ip, self.port, self.time)
|
|
|
|
+ # # 開始連線,執行設定的動作和處理重新連線問題
|
|
|
|
+ # # 也可以手動使用其他loop函式來進行連接
|
|
|
|
+ # self.client.loop_forever()
|
|
|
|
+ #
|
|
|
|
+ # mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,))
|
|
|
|
+ # mqtt_subscribe_thread.daemon = True
|
|
|
|
+ # mqtt_subscribe_thread.start()
|
|
|
|
+ # # 睡1秒等執行敘跑完才跑主程式
|
|
|
|
+ # sl(1)
|
|
|
|
+ # return "success"
|
|
|
|
+ # self.queryMySQL()
|
|
|
|
+ # self.cursor.close()
|
|
|
|
+ # self.db.close()
|
|
|
|
+ # self.pool.shutdown(wait=True)
|
|
|
|
+ # self.mqttPublish('AISKY/Coffee/MK-G/b8:27:eb:7e:24:78', self.command)
|
|
|
|
+ # while True:
|
|
|
|
+ # pass
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
+ mqtt = MQTT()
|
|
|
|
+ try:
|
|
|
|
+ # mqtt.workOn()
|
|
|
|
+ mqtt.mqttPublish('AISKY/Coffee/MK-G/b8:27:eb:7e:24:78', mqtt.command)
|
|
|
|
+ print('mqttPublish_test')
|
|
|
|
+ while True:
|
|
|
|
+ pass
|
|
|
|
+ except KeyboardInterrupt:
|
|
|
|
+ mqtt.pool.shutdown(wait=True)
|
|
|
|
+ mqtt.cursor.close()
|
|
mqtt.db.close()
|
|
mqtt.db.close()
|