123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import pymysql
- import paho.mqtt.client as mqtt
- import json
- import threading
- from time import sleep as sl
- from datetime import datetime as dt
- class CowdungCartSensor(object):
- def __init__(self):
- self.db = pymysql.connect(host='52.69.200.169', port=3306, user='cowdungcart', password='skyeye', database='CowdungCart', charset='utf8')
- self.cursor = self.db.cursor()
- self.client = mqtt.Client()
- def mqttPublish(self):
- #要發布的主題和內容
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:b4:59:3e'
- water_level_command = {'command': 'water_level', 'value': 'on'}
- sonic_command = {'command': 'sonic', 'value': 'on'}
- while True:
- print("send mqtt message...")
- self.client.publish(topic, json.dumps(water_level_command))
- self.client.publish(topic, json.dumps(sonic_command))
- sl(60)
- def workOn(self):
- self = self
- # 當地端程式連線伺服器得到回應時,要做的動作
- def on_connect(client, userdata, flags, rc):
- print("Waiting for MQTT message...")
- # 將訂閱主題寫在on_connet中
- # 如果我們失去連線或重新連線時
- # 地端程式將會重新訂閱
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:b4:59:3e/Log')
- # 當接收到從伺服器發送的訊息時要進行的動作
- def on_message(client, userdata, msg):
- print("+++++++++++++++++++++++++++++++++++++++++++++")
- msg = msg
- db = pymysql.connect(host='52.69.200.169', port=3306, user='cowdungcart', password='skyeye',
- database='CowdungCart', charset='utf8')
- cursor = db.cursor()
- # 轉換編碼utf-8才看得懂中文
- print(msg.topic + " " + msg.payload.decode('utf-8'))
- payload = msg.payload.decode('utf-8')
- current_time = dt.now()
- time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S")
- if payload:
- p = json.loads(payload)
- if p['command'] == 'sonic':
- sql = "insert into sonic(sonic, datetime) values('%s', '%s')" % (p['rqnn'], time)
- cursor.execute(sql)
- db.commit()
- elif p['command'] == 'water_level':
- sql = "insert into water_level(water_level, datetime) values('%s', '%s')" % (p['rqnn'], time)
- cursor.execute(sql)
- db.commit()
- cursor.close()
- db.close()
-
- print("test")
- def thread_job(self):
- # 設定連線的動作
- self.client.on_connect = on_connect
- # 設定接收訊息的動作
- self.client.on_message = on_message
- # 設定登入帳號密碼
- self.client.username_pw_set("aisky-client", "aiskyc")
- # 設定連線資訊(IP, Port, 連線時間)
- self.client.connect("60.250.156.234", 1883, 60)
- # 開始連線,執行設定的動作和處理重新連線問題
- # 也可以手動使用其他loop函式來進行連接
- self.client.loop_forever()
- mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,))
- mqtt_subscribe_thread.daemon = True
- mqtt_subscribe_thread.start()
- # message_thread = threading.Thread(target=on_message, args=(self,))
- # message_thread.daemon = True
- # message_thread.start()
- #睡1秒等執行敘跑完才跑主程式
- sl(1)
- self.mqttPublish()
- self.cursor.close()
- self.db.close()
- if __name__ == '__main__':
- cowdungcart = CowdungCartSensor()
- cowdungcart.workOn()
|