import paho.mqtt.client as mqtt import json import time import pymysql from datetime import datetime #訂閱與主題 mqtt_pub_topic = "p" mqtt_sub_topic = "p1" #定義貨櫃名稱 container_counters = { "Container_1": {"main": 0, "backup": 0}, "Container_2": {"main": 0, "backup": 0}, "Container_3": {"main": 0, "backup": 0}, "Container_4": {"main": 0, "backup": 0}, "Container_5": {"main": 0, "backup": 0}, "Container_6": {"main": 0, "backup": 0}, "Container_7": {"main": 0, "backup": 0}, # ... 添加更多貨櫃 } received_containers = set() # 初始化MQTT客戶端 client = mqtt.Client() # 設置使用者名稱和密碼 client.username_pw_set("plant-server", "53743001") #上傳名子與狀態到資料表 def upload_status_to_database(device_id, status): db_connection = pymysql.connect( host="192.168.50.57", user="plant", password="g53743001", database="plant_test" ) cursor = db_connection.cursor() insert_query = "INSERT INTO `all_status` (`Name`, `Status`) VALUES (%s,%s)" cursor.execute(insert_query, (device_id, status)) db_connection.commit() cursor.close() db_connection.close() print("已將狀態上傳到資料庫") #上傳動作記錄到資料表(main) def upload_action_code_to_database(device_id, status,action_code): db_connection = pymysql.connect( host="192.168.50.57", user="plant", password="g53743001", database="plant_test" ) cursor = db_connection.cursor() insert_query = "INSERT INTO `main_status` (`Name`, `Status`,`Action_code`) VALUES (%s,%s,%s)" cursor.execute(insert_query, (device_id, status, action_code)) db_connection.commit() cursor.close() db_connection.close() print("已將狀態上傳到資料庫") #上傳動作記錄到資料表(main) def upload_action_code_to_database(device_id, status,action_code): db_connection = pymysql.connect( host="192.168.50.57", user="plant", password="g53743001", database="plant_test" ) cursor = db_connection.cursor() insert_query = "INSERT INTO `backup_status` (`Name`, `Status`,`Action_code`) VALUES (%s,%s,%s)" cursor.execute(insert_query, (device_id, status, action_code)) db_connection.commit() cursor.close() db_connection.close() print("已將狀態上傳到資料庫") #搜索資料表內的內容 def select_action_code_by_database(device_name): db_connection = pymysql.connect( host="192.168.50.57", user="plant", password="g53743001", database="plant_test" ) cursor = db_connection.cursor() select_query = "SELECT `Time`,`Name`,`Action_code` FROM `main_status` WHERE `Name` LIKE %s ORDER BY `Time` DESC LIMIT 1" cursor.execute(select_query, (f'%{device_name}%',)) results = cursor.fetchall() if results: for row in results: time = row[0] name = row[1] action_code = row[2] print(f"Time: {time}, Device Name: {name}, Action Code: {action_code}") else: print(f"No results found for the device name containing: {device_name}") cursor.close() db_connection.close() # 連接回調函數 def on_connect(client, userdata, flags, rc): print("已連接,結果代碼:" + str(rc)) # 訂閱主要和備用主題 client.subscribe(mqtt_sub_topic) container_location_number = None # 訊息回調函數 def on_message(client, userdata, msg): global received_containers print("收到消息:" + msg.payload.decode()) # 解析消息数据 data = json.loads(msg.payload) container_location_number = data.get("container_location_number") print(container_location_number) received_containers.add(container_location_number) # 設置連接和訊息回調函數 client.on_connect = on_connect client.on_message = on_message # 連接到MQTT代理 client.connect("192.168.50.185", 1883, 60) # 循環等待訊息 client.loop_start() # 發送訊息函數 def send_message(topic): localtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) current_time = datetime.now() #print("Current Time:", current_time) payload = { 'device_id': 1, 'localtime': str(current_time), 'command': 'system_info' } print('發送訊息:') print(json.dumps(payload)) print('------------------') client.publish(topic, json.dumps(payload)) received_container_main = False received_container_backup = False main_topic_counter = 0 backup_topic_counter = 0 failure_threshold = 6 # 主迴圈 while True: # 發送訊息到主要主題和備用主題 send_message(mqtt_pub_topic) # 等待一段時間,以便檢查回復 time.sleep(10) for container_name, counters in container_counters.items(): if f"{container_name}_main" in received_containers and f"{container_name}_backup" in received_containers: print(f"同時收到{container_name}主要和{container_name}備用回復,執行主要動作:123456") # 執行主要動作... action_code ="123456" upload_action_code_to_database(f"{container_name}_main", 'OK',action_code) counters["main"] = 0 # 重置主要計數器 counters["backup"] = 0 # 重置備用計數器 elif f"{container_name}_main" in received_containers: print(f"僅收到{container_name}主要回復,執行主要動作:123") # 執行主要動作... select_action_code_by_database(f"{container_name}_backup") action_code ="123" #upload_action_code_to_database(f"{container_name}_main", 'OK',action_code) counters["main"] = 0 # 重置主要計數器 elif f"{container_name}_backup" in received_containers: print(f"僅收到{container_name}備用回復,執行備用動作:456") # 執行備用動作... select_action_code_by_database(f"{container_name}_main") action_code ="456" #upload_action_code_to_database(f"{container_name}_backup", 'OK',action_code) #print(f"{container_name}_backup") counters["backup"] = 0 # 重置備用計數器 else: print(f"{container_name}主要和{container_name}備用均未收到回覆") counters["main"] += 1 # 增加主要計數器 counters["backup"] += 1 # 增加備用計數器 print(f"Error-{container_name}主要計數器:", counters["main"]) print(f"Error-{container_name}備用計數器:", counters["backup"]) if counters["main"] >= failure_threshold: print(f"超過{failure_threshold}次,{container_name}主要連線失敗") counters["main"] = 0 # 重置主要計數器 # 執行錯誤處理... #upload_status_to_database(container_name+"_main","ERROR") if counters["backup"] >= failure_threshold: print(f"超過{failure_threshold}次,{container_name}備用連線失敗") counters["backup"] = 0 # 重置備用計數器 # 執行錯誤處理... #upload_status_to_database(container_name+"_backup","ERROR") #SELECT `Time`,`Name`,`Action_code` FROM `main_status` WHERE `Name` LIKE '%Container_1_backup%' ORDER BY `Time` DESC LIMIT 1