cowdungcart_sensor.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import pymysql
  2. import paho.mqtt.client as mqtt
  3. import json
  4. import threading
  5. from time import sleep as sl
  6. from datetime import datetime as dt
  7. class CowdungCartSensor(object):
  8. def __init__(self):
  9. self.db = pymysql.connect(host='52.69.200.169', port=3306, user='cowdungcart', password='skyeye', database='CowdungCart', charset='utf8')
  10. self.cursor = self.db.cursor()
  11. self.client = mqtt.Client()
  12. def mqttPublish(self):
  13. #要發布的主題和內容
  14. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:b4:59:3e'
  15. water_level_command = {'command': 'water_level', 'value': 'on'}
  16. sonic_command = {'command': 'sonic', 'value': 'on'}
  17. while True:
  18. print("send mqtt message...")
  19. self.client.publish(topic, json.dumps(water_level_command))
  20. self.client.publish(topic, json.dumps(sonic_command))
  21. sl(60)
  22. def workOn(self):
  23. self = self
  24. # 當地端程式連線伺服器得到回應時,要做的動作
  25. def on_connect(client, userdata, flags, rc):
  26. print("Waiting for MQTT message...")
  27. # 將訂閱主題寫在on_connet中
  28. # 如果我們失去連線或重新連線時
  29. # 地端程式將會重新訂閱
  30. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:b4:59:3e/Log')
  31. # 當接收到從伺服器發送的訊息時要進行的動作
  32. def on_message(client, userdata, msg):
  33. print("+++++++++++++++++++++++++++++++++++++++++++++")
  34. msg = msg
  35. db = pymysql.connect(host='52.69.200.169', port=3306, user='cowdungcart', password='skyeye',
  36. database='CowdungCart', charset='utf8')
  37. cursor = db.cursor()
  38. # 轉換編碼utf-8才看得懂中文
  39. print(msg.topic + " " + msg.payload.decode('utf-8'))
  40. payload = msg.payload.decode('utf-8')
  41. current_time = dt.now()
  42. time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S")
  43. if payload:
  44. p = json.loads(payload)
  45. if p['command'] == 'sonic':
  46. sql = "insert into sonic(sonic, datetime) values('%s', '%s')" % (p['rqnn'], time)
  47. cursor.execute(sql)
  48. db.commit()
  49. elif p['command'] == 'water_level':
  50. sql = "insert into water_level(water_level, datetime) values('%s', '%s')" % (p['rqnn'], time)
  51. cursor.execute(sql)
  52. db.commit()
  53. cursor.close()
  54. db.close()
  55. print("test")
  56. def thread_job(self):
  57. # 設定連線的動作
  58. self.client.on_connect = on_connect
  59. # 設定接收訊息的動作
  60. self.client.on_message = on_message
  61. # 設定登入帳號密碼
  62. self.client.username_pw_set("aisky-client", "aiskyc")
  63. # 設定連線資訊(IP, Port, 連線時間)
  64. self.client.connect("60.250.156.234", 1883, 60)
  65. # 開始連線,執行設定的動作和處理重新連線問題
  66. # 也可以手動使用其他loop函式來進行連接
  67. self.client.loop_forever()
  68. mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,))
  69. mqtt_subscribe_thread.daemon = True
  70. mqtt_subscribe_thread.start()
  71. # message_thread = threading.Thread(target=on_message, args=(self,))
  72. # message_thread.daemon = True
  73. # message_thread.start()
  74. #睡1秒等執行敘跑完才跑主程式
  75. sl(1)
  76. self.mqttPublish()
  77. self.cursor.close()
  78. self.db.close()
  79. if __name__ == '__main__':
  80. cowdungcart = CowdungCartSensor()
  81. cowdungcart.workOn()