mqtt_thread3.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import paho.mqtt.client as mqtt
  2. import json
  3. import time
  4. import pymysql
  5. from datetime import datetime
  6. #訂閱與主題
  7. mqtt_pub_topic = "p"
  8. mqtt_sub_topic = "p1"
  9. #定義貨櫃名稱
  10. container_counters = {
  11. "Container_1": {"main": 0, "backup": 0},
  12. "Container_2": {"main": 0, "backup": 0},
  13. "Container_3": {"main": 0, "backup": 0},
  14. "Container_4": {"main": 0, "backup": 0},
  15. "Container_5": {"main": 0, "backup": 0},
  16. "Container_6": {"main": 0, "backup": 0},
  17. "Container_7": {"main": 0, "backup": 0},
  18. # ... 添加更多貨櫃
  19. }
  20. received_containers = set()
  21. # 初始化MQTT客戶端
  22. client = mqtt.Client()
  23. # 設置使用者名稱和密碼
  24. client.username_pw_set("plant-server", "53743001")
  25. #上傳名子與狀態到資料表
  26. def upload_status_to_database(device_id, status):
  27. db_connection = pymysql.connect(
  28. host="192.168.50.57",
  29. user="plant",
  30. password="g53743001",
  31. database="plant_test"
  32. )
  33. cursor = db_connection.cursor()
  34. insert_query = "INSERT INTO `all_status` (`Name`, `Status`) VALUES (%s,%s)"
  35. cursor.execute(insert_query, (device_id, status))
  36. db_connection.commit()
  37. cursor.close()
  38. db_connection.close()
  39. print("已將狀態上傳到資料庫")
  40. #上傳動作記錄到資料表(main)
  41. def upload_action_code_to_database(device_id, status,action_code):
  42. db_connection = pymysql.connect(
  43. host="192.168.50.57",
  44. user="plant",
  45. password="g53743001",
  46. database="plant_test"
  47. )
  48. cursor = db_connection.cursor()
  49. insert_query = "INSERT INTO `main_status` (`Name`, `Status`,`Action_code`) VALUES (%s,%s,%s)"
  50. cursor.execute(insert_query, (device_id, status, action_code))
  51. db_connection.commit()
  52. cursor.close()
  53. db_connection.close()
  54. print("已將狀態上傳到資料庫")
  55. #上傳動作記錄到資料表(main)
  56. def upload_action_code_to_database(device_id, status,action_code):
  57. db_connection = pymysql.connect(
  58. host="192.168.50.57",
  59. user="plant",
  60. password="g53743001",
  61. database="plant_test"
  62. )
  63. cursor = db_connection.cursor()
  64. insert_query = "INSERT INTO `backup_status` (`Name`, `Status`,`Action_code`) VALUES (%s,%s,%s)"
  65. cursor.execute(insert_query, (device_id, status, action_code))
  66. db_connection.commit()
  67. cursor.close()
  68. db_connection.close()
  69. print("已將狀態上傳到資料庫")
  70. #搜索資料表內的內容
  71. def select_action_code_by_database(device_name):
  72. db_connection = pymysql.connect(
  73. host="192.168.50.57",
  74. user="plant",
  75. password="g53743001",
  76. database="plant_test"
  77. )
  78. cursor = db_connection.cursor()
  79. select_query = "SELECT `Time`,`Name`,`Action_code` FROM `main_status` WHERE `Name` LIKE %s ORDER BY `Time` DESC LIMIT 1"
  80. cursor.execute(select_query, (f'%{device_name}%',))
  81. results = cursor.fetchall()
  82. if results:
  83. for row in results:
  84. time = row[0]
  85. name = row[1]
  86. action_code = row[2]
  87. print(f"Time: {time}, Device Name: {name}, Action Code: {action_code}")
  88. else:
  89. print(f"No results found for the device name containing: {device_name}")
  90. cursor.close()
  91. db_connection.close()
  92. # 連接回調函數
  93. def on_connect(client, userdata, flags, rc):
  94. print("已連接,結果代碼:" + str(rc))
  95. # 訂閱主要和備用主題
  96. client.subscribe(mqtt_sub_topic)
  97. container_location_number = None
  98. # 訊息回調函數
  99. def on_message(client, userdata, msg):
  100. global received_containers
  101. print("收到消息:" + msg.payload.decode())
  102. # 解析消息数据
  103. data = json.loads(msg.payload)
  104. container_location_number = data.get("container_location_number")
  105. print(container_location_number)
  106. received_containers.add(container_location_number)
  107. # 設置連接和訊息回調函數
  108. client.on_connect = on_connect
  109. client.on_message = on_message
  110. # 連接到MQTT代理
  111. client.connect("192.168.50.185", 1883, 60)
  112. # 循環等待訊息
  113. client.loop_start()
  114. # 發送訊息函數
  115. def send_message(topic):
  116. localtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  117. current_time = datetime.now()
  118. #print("Current Time:", current_time)
  119. payload = {
  120. 'device_id': 1,
  121. 'localtime': str(current_time),
  122. 'command': 'system_info'
  123. }
  124. print('發送訊息:')
  125. print(json.dumps(payload))
  126. print('------------------')
  127. client.publish(topic, json.dumps(payload))
  128. received_container_main = False
  129. received_container_backup = False
  130. main_topic_counter = 0
  131. backup_topic_counter = 0
  132. failure_threshold = 6
  133. # 主迴圈
  134. while True:
  135. # 發送訊息到主要主題和備用主題
  136. send_message(mqtt_pub_topic)
  137. # 等待一段時間,以便檢查回復
  138. time.sleep(10)
  139. for container_name, counters in container_counters.items():
  140. if f"{container_name}_main" in received_containers and f"{container_name}_backup" in received_containers:
  141. print(f"同時收到{container_name}主要和{container_name}備用回復,執行主要動作:123456")
  142. # 執行主要動作...
  143. action_code ="123456"
  144. upload_action_code_to_database(f"{container_name}_main", 'OK',action_code)
  145. counters["main"] = 0 # 重置主要計數器
  146. counters["backup"] = 0 # 重置備用計數器
  147. elif f"{container_name}_main" in received_containers:
  148. print(f"僅收到{container_name}主要回復,執行主要動作:123")
  149. # 執行主要動作...
  150. select_action_code_by_database(f"{container_name}_backup")
  151. action_code ="123"
  152. #upload_action_code_to_database(f"{container_name}_main", 'OK',action_code)
  153. counters["main"] = 0 # 重置主要計數器
  154. elif f"{container_name}_backup" in received_containers:
  155. print(f"僅收到{container_name}備用回復,執行備用動作:456")
  156. # 執行備用動作...
  157. select_action_code_by_database(f"{container_name}_main")
  158. action_code ="456"
  159. #upload_action_code_to_database(f"{container_name}_backup", 'OK',action_code)
  160. #print(f"{container_name}_backup")
  161. counters["backup"] = 0 # 重置備用計數器
  162. else:
  163. print(f"{container_name}主要和{container_name}備用均未收到回覆")
  164. counters["main"] += 1 # 增加主要計數器
  165. counters["backup"] += 1 # 增加備用計數器
  166. print(f"Error-{container_name}主要計數器:", counters["main"])
  167. print(f"Error-{container_name}備用計數器:", counters["backup"])
  168. if counters["main"] >= failure_threshold:
  169. print(f"超過{failure_threshold}次,{container_name}主要連線失敗")
  170. counters["main"] = 0 # 重置主要計數器
  171. # 執行錯誤處理...
  172. #upload_status_to_database(container_name+"_main","ERROR")
  173. if counters["backup"] >= failure_threshold:
  174. print(f"超過{failure_threshold}次,{container_name}備用連線失敗")
  175. counters["backup"] = 0 # 重置備用計數器
  176. # 執行錯誤處理...
  177. #upload_status_to_database(container_name+"_backup","ERROR")
  178. #SELECT `Time`,`Name`,`Action_code` FROM `main_status` WHERE `Name` LIKE '%Container_1_backup%' ORDER BY `Time` DESC LIMIT 1