import pymysql import paho.mqtt.client as mqtt import json import threading from time import sleep as sl from datetime import datetime as dt class CowdungCartSensor(object): def __init__(self): self.db = pymysql.connect(host='52.69.200.169', port=3306, user='cowdungcart', password='skyeye', database='CowdungCart', charset='utf8') self.cursor = self.db.cursor() self.client = mqtt.Client() def mqttPublish(self): #要發布的主題和內容 topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:b4:59:3e' water_level_command = {'command': 'water_level', 'value': 'on'} sonic_command = {'command': 'sonic', 'value': 'on'} while True: print("send mqtt message...") self.client.publish(topic, json.dumps(water_level_command)) self.client.publish(topic, json.dumps(sonic_command)) sl(60) def workOn(self): self = self # 當地端程式連線伺服器得到回應時,要做的動作 def on_connect(client, userdata, flags, rc): print("Waiting for MQTT message...") # 將訂閱主題寫在on_connet中 # 如果我們失去連線或重新連線時 # 地端程式將會重新訂閱 client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:b4:59:3e/Log') # 當接收到從伺服器發送的訊息時要進行的動作 def on_message(client, userdata, msg): print("+++++++++++++++++++++++++++++++++++++++++++++") msg = msg db = pymysql.connect(host='52.69.200.169', port=3306, user='cowdungcart', password='skyeye', database='CowdungCart', charset='utf8') cursor = db.cursor() # 轉換編碼utf-8才看得懂中文 print(msg.topic + " " + msg.payload.decode('utf-8')) payload = msg.payload.decode('utf-8') current_time = dt.now() time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S") if payload: p = json.loads(payload) if p['command'] == 'sonic': sql = "insert into sonic(sonic, datetime) values('%s', '%s')" % (p['rqnn'], time) cursor.execute(sql) db.commit() elif p['command'] == 'water_level': sql = "insert into water_level(water_level, datetime) values('%s', '%s')" % (p['rqnn'], time) cursor.execute(sql) db.commit() cursor.close() db.close() print("test") def thread_job(self): # 設定連線的動作 self.client.on_connect = on_connect # 設定接收訊息的動作 self.client.on_message = on_message # 設定登入帳號密碼 self.client.username_pw_set("aisky-client", "aiskyc") # 設定連線資訊(IP, Port, 連線時間) self.client.connect("60.250.156.234", 1883, 60) # 開始連線,執行設定的動作和處理重新連線問題 # 也可以手動使用其他loop函式來進行連接 self.client.loop_forever() mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,)) mqtt_subscribe_thread.daemon = True mqtt_subscribe_thread.start() # message_thread = threading.Thread(target=on_message, args=(self,)) # message_thread.daemon = True # message_thread.start() #睡1秒等執行敘跑完才跑主程式 sl(1) self.mqttPublish() self.cursor.close() self.db.close() if __name__ == '__main__': cowdungcart = CowdungCartSensor() cowdungcart.workOn()