123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- import paho.mqtt.client as mqtt
- import json
- import time
- import pymysql
- from datetime import datetime
- #訂閱與主題
- mqtt_pub_topic = "p"
- mqtt_sub_topic = "p1"
- #定義貨櫃名稱
- container_counters = {
- "Container_1": {"main": 0, "backup": 0},
- "Container_2": {"main": 0, "backup": 0},
- "Container_3": {"main": 0, "backup": 0},
- "Container_4": {"main": 0, "backup": 0},
- "Container_5": {"main": 0, "backup": 0},
- "Container_6": {"main": 0, "backup": 0},
- "Container_7": {"main": 0, "backup": 0},
- # ... 添加更多貨櫃
- }
- received_containers = set()
- # 初始化MQTT客戶端
- client = mqtt.Client()
- # 設置使用者名稱和密碼
- client.username_pw_set("plant-server", "53743001")
- #上傳名子與狀態到資料表
- def upload_status_to_database(device_id, status):
- db_connection = pymysql.connect(
- host="192.168.50.57",
- user="plant",
- password="g53743001",
- database="plant_test"
- )
- cursor = db_connection.cursor()
- insert_query = "INSERT INTO `all_status` (`Name`, `Status`) VALUES (%s,%s)"
- cursor.execute(insert_query, (device_id, status))
- db_connection.commit()
- cursor.close()
- db_connection.close()
- print("已將狀態上傳到資料庫")
-
- #上傳動作記錄到資料表(main)
- def upload_action_code_to_database(device_id, status,action_code):
- db_connection = pymysql.connect(
- host="192.168.50.57",
- user="plant",
- password="g53743001",
- database="plant_test"
- )
- cursor = db_connection.cursor()
- insert_query = "INSERT INTO `main_status` (`Name`, `Status`,`Action_code`) VALUES (%s,%s,%s)"
- cursor.execute(insert_query, (device_id, status, action_code))
- db_connection.commit()
- cursor.close()
- db_connection.close()
- print("已將狀態上傳到資料庫")
-
- #上傳動作記錄到資料表(main)
- def upload_action_code_to_database(device_id, status,action_code):
- db_connection = pymysql.connect(
- host="192.168.50.57",
- user="plant",
- password="g53743001",
- database="plant_test"
- )
- cursor = db_connection.cursor()
- insert_query = "INSERT INTO `backup_status` (`Name`, `Status`,`Action_code`) VALUES (%s,%s,%s)"
- cursor.execute(insert_query, (device_id, status, action_code))
- db_connection.commit()
- cursor.close()
- db_connection.close()
- print("已將狀態上傳到資料庫")
-
-
- #搜索資料表內的內容
- def select_action_code_by_database(device_name):
- db_connection = pymysql.connect(
- host="192.168.50.57",
- user="plant",
- password="g53743001",
- database="plant_test"
- )
- cursor = db_connection.cursor()
- select_query = "SELECT `Time`,`Name`,`Action_code` FROM `main_status` WHERE `Name` LIKE %s ORDER BY `Time` DESC LIMIT 1"
- cursor.execute(select_query, (f'%{device_name}%',))
-
- results = cursor.fetchall()
- if results:
- for row in results:
- time = row[0]
- name = row[1]
- action_code = row[2]
- print(f"Time: {time}, Device Name: {name}, Action Code: {action_code}")
- else:
- print(f"No results found for the device name containing: {device_name}")
-
- cursor.close()
- db_connection.close()
- # 連接回調函數
- def on_connect(client, userdata, flags, rc):
- print("已連接,結果代碼:" + str(rc))
- # 訂閱主要和備用主題
- client.subscribe(mqtt_sub_topic)
- container_location_number = None
- # 訊息回調函數
- def on_message(client, userdata, msg):
- global received_containers
- print("收到消息:" + msg.payload.decode())
- # 解析消息数据
- data = json.loads(msg.payload)
- container_location_number = data.get("container_location_number")
- print(container_location_number)
-
- received_containers.add(container_location_number)
- # 設置連接和訊息回調函數
- client.on_connect = on_connect
- client.on_message = on_message
- # 連接到MQTT代理
- client.connect("192.168.50.185", 1883, 60)
- # 循環等待訊息
- client.loop_start()
- # 發送訊息函數
- def send_message(topic):
- localtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- current_time = datetime.now()
- #print("Current Time:", current_time)
- payload = {
- 'device_id': 1,
- 'localtime': str(current_time),
- 'command': 'system_info'
- }
- print('發送訊息:')
- print(json.dumps(payload))
- print('------------------')
- client.publish(topic, json.dumps(payload))
-
- received_container_main = False
- received_container_backup = False
- main_topic_counter = 0
- backup_topic_counter = 0
- failure_threshold = 6
- # 主迴圈
- while True:
- # 發送訊息到主要主題和備用主題
- send_message(mqtt_pub_topic)
- # 等待一段時間,以便檢查回復
- time.sleep(10)
-
-
- for container_name, counters in container_counters.items():
- if f"{container_name}_main" in received_containers and f"{container_name}_backup" in received_containers:
- print(f"同時收到{container_name}主要和{container_name}備用回復,執行主要動作:123456")
- # 執行主要動作...
- action_code ="123456"
- upload_action_code_to_database(f"{container_name}_main", 'OK',action_code)
- counters["main"] = 0 # 重置主要計數器
- counters["backup"] = 0 # 重置備用計數器
- elif f"{container_name}_main" in received_containers:
- print(f"僅收到{container_name}主要回復,執行主要動作:123")
- # 執行主要動作...
- select_action_code_by_database(f"{container_name}_backup")
- action_code ="123"
- #upload_action_code_to_database(f"{container_name}_main", 'OK',action_code)
- counters["main"] = 0 # 重置主要計數器
- elif f"{container_name}_backup" in received_containers:
- print(f"僅收到{container_name}備用回復,執行備用動作:456")
- # 執行備用動作...
- select_action_code_by_database(f"{container_name}_main")
- action_code ="456"
- #upload_action_code_to_database(f"{container_name}_backup", 'OK',action_code)
- #print(f"{container_name}_backup")
- counters["backup"] = 0 # 重置備用計數器
- else:
- print(f"{container_name}主要和{container_name}備用均未收到回覆")
- counters["main"] += 1 # 增加主要計數器
- counters["backup"] += 1 # 增加備用計數器
- print(f"Error-{container_name}主要計數器:", counters["main"])
- print(f"Error-{container_name}備用計數器:", counters["backup"])
- if counters["main"] >= failure_threshold:
- print(f"超過{failure_threshold}次,{container_name}主要連線失敗")
- counters["main"] = 0 # 重置主要計數器
- # 執行錯誤處理...
- #upload_status_to_database(container_name+"_main","ERROR")
- if counters["backup"] >= failure_threshold:
- print(f"超過{failure_threshold}次,{container_name}備用連線失敗")
- counters["backup"] = 0 # 重置備用計數器
- # 執行錯誤處理...
- #upload_status_to_database(container_name+"_backup","ERROR")
-
- #SELECT `Time`,`Name`,`Action_code` FROM `main_status` WHERE `Name` LIKE '%Container_1_backup%' ORDER BY `Time` DESC LIMIT 1
-
-
|