mqtt.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import pymysql
  2. import paho.mqtt.client as mqtt
  3. import json
  4. import threading
  5. from time import sleep as sl
  6. from datetime import datetime as dt
  7. from concurrent.futures import ThreadPoolExecutor
  8. class MQTT(object):
  9. # def __init__(self, username, password, ip, port, time, sub_topic):
  10. # self.client = mqtt.Client()
  11. # self.pool = ThreadPoolExecutor(10)
  12. # self.username = username
  13. # self.password = password
  14. # self.ip = ip
  15. # self.port = port
  16. # self.time = time
  17. # self.sub_topic = sub_topic
  18. # self.D = {}
  19. # for i in range(0, len(self.sub_topic)):
  20. # self.D[self.sub_topic] = 0
  21. # self.res = 0
  22. def __init__(self):
  23. self.res = 0
  24. # 當地端程式連線伺服器得到回應時,要做的動作
  25. def on_connect(client, userdata, flags, rc):
  26. # print("Waiting for MQTT message...")
  27. # 將訂閱主題寫在on_connet中
  28. # 如果我們失去連線或重新連線時
  29. # 地端程式將會重新訂閱
  30. # 判斷是否為元組,有多個訂閱主題
  31. # if isinstance(self.sub_topic, tuple):
  32. # for i in range(0, len(self.sub_topic)):
  33. # client.subscribe(self.sub_topic[i])
  34. # client.subscribe(self.sub_topic)
  35. client.subscribe('AISKY/Coffee/MK-G/b8:27:eb:7e:24:78/Log')
  36. # 當接收到從伺服器發送的訊息時要進行的動作
  37. def on_message(client, userdata, msg):
  38. msg = msg
  39. payload = json.loads(msg.payload.decode('utf-8'))
  40. self.res = payload
  41. print('self.res = payload :', payload) # check! 11/16 從硬體發出這邊接收到的訊息
  42. # if payload:
  43. # self.D[self.sub_topic] = 1
  44. # self.res = payload
  45. self.client = mqtt.Client()
  46. # self.command = {'tank-number':'1', 'command':'tank_motor_status', 'value':'on'}
  47. self.command = {'tank-number':'', 'command':'coffee_mqtt_restart', 'value':''}
  48. def thread_job(self):
  49. # 設定連線設定動作
  50. self.client.on_connect = on_connect
  51. # 設定接收訊息的動作
  52. self.client.on_message = on_message
  53. # 設定登入帳號密碼
  54. self.client.username_pw_set("aisky-client", "aiskyc")
  55. # self.client.username_pw_set(self.username, self.password)
  56. # 設定連線資訊(IP, Port, 連線時間)
  57. self.client.connect("60.250.156.234", 1883, 60)
  58. # self.client.connect(self.ip, self.port, self.time)
  59. # 開始連線,執行設定的動作和處理重新連線問題
  60. # 也可以手動使用其他loop函式來進行連接
  61. self.client.loop_forever()
  62. mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,))
  63. mqtt_subscribe_thread.daemon = True
  64. mqtt_subscribe_thread.start()
  65. # 睡1秒等執行敘跑完才跑主程式
  66. sl(1)
  67. # while True:
  68. # pass
  69. def mqttPublish(self, topic, command):
  70. self.client.publish(topic, json.dumps(command))
  71. print('json.dumps(command):', json.dumps(command))
  72. # print('done')
  73. return True
  74. # while True:
  75. # if self.D[self.sub_topic] == 1:
  76. # return self.res
  77. # def workOn(self):
  78. # self = self
  79. # def thread_job(self):
  80. # # 設定連線的動作
  81. # self.client.on_connect = on_connect
  82. # # 設定接收訊息的動作
  83. # self.client.on_message = on_message
  84. # # 設定登入帳號密碼
  85. # self.client.username_pw_set("aisky-client", "aiskyc")
  86. # # self.client.username_pw_set(self.username, self.password)
  87. # # 設定連線資訊(IP, Port, 連線時間)
  88. # self.client.connect("60.250.156.234", 1883, 60)
  89. # # self.client.connect(self.ip, self.port, self.time)
  90. # # 開始連線,執行設定的動作和處理重新連線問題
  91. # # 也可以手動使用其他loop函式來進行連接
  92. # self.client.loop_forever()
  93. #
  94. # mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,))
  95. # mqtt_subscribe_thread.daemon = True
  96. # mqtt_subscribe_thread.start()
  97. # # 睡1秒等執行敘跑完才跑主程式
  98. # sl(1)
  99. # return "success"
  100. # self.queryMySQL()
  101. # self.cursor.close()
  102. # self.db.close()
  103. # self.pool.shutdown(wait=True)
  104. # self.mqttPublish('AISKY/Coffee/MK-G/b8:27:eb:7e:24:78', self.command)
  105. # while True:
  106. # pass
  107. if __name__ == '__main__':
  108. mqtt = MQTT()
  109. try:
  110. # mqtt.workOn()
  111. mqtt.mqttPublish('AISKY/Coffee/MK-G/b8:27:eb:7e:24:78', mqtt.command)
  112. print('mqttPublish_test')
  113. while True:
  114. pass
  115. except KeyboardInterrupt:
  116. mqtt.pool.shutdown(wait=True)
  117. mqtt.cursor.close()
  118. mqtt.db.close()