Forráskód Böngészése

上傳檔案到 ''

"2023/08/30 MQTT偵測程式碼"
fatwolf 1 éve
szülő
commit
067c473913
1 módosított fájl, 208 hozzáadás és 0 törlés
  1. 208 0
      mqtt_thread3.py

+ 208 - 0
mqtt_thread3.py

@@ -0,0 +1,208 @@
+import paho.mqtt.client as mqtt
+import json
+import time
+import pymysql
+from datetime import datetime
+
+#訂閱與主題
+mqtt_pub_topic = "p"
+mqtt_sub_topic = "p1"
+
+#定義貨櫃名稱
+container_counters = {
+    "Container_1": {"main": 0, "backup": 0},
+    "Container_2": {"main": 0, "backup": 0},
+    "Container_3": {"main": 0, "backup": 0},
+    "Container_4": {"main": 0, "backup": 0},
+    "Container_5": {"main": 0, "backup": 0},
+    "Container_6": {"main": 0, "backup": 0},
+    "Container_7": {"main": 0, "backup": 0},
+    # ... 添加更多貨櫃
+}
+
+received_containers = set() 
+
+# 初始化MQTT客戶端
+client = mqtt.Client()
+
+# 設置使用者名稱和密碼
+client.username_pw_set("plant-server", "53743001")
+
+#上傳名子與狀態到資料表
+def upload_status_to_database(device_id, status):
+    db_connection = pymysql.connect(
+        host="192.168.50.57",
+        user="plant",
+        password="g53743001",
+        database="plant_test"
+    )
+    cursor = db_connection.cursor()
+    insert_query = "INSERT INTO `all_status` (`Name`, `Status`) VALUES (%s,%s)"
+    cursor.execute(insert_query, (device_id, status))
+    db_connection.commit()
+    cursor.close()
+    db_connection.close()
+    print("已將狀態上傳到資料庫")
+    
+#上傳動作記錄到資料表(main)
+def upload_action_code_to_database(device_id, status,action_code):
+    db_connection = pymysql.connect(
+        host="192.168.50.57",
+        user="plant",
+        password="g53743001",
+        database="plant_test"
+    )
+    cursor = db_connection.cursor()
+    insert_query = "INSERT INTO `main_status` (`Name`, `Status`,`Action_code`) VALUES (%s,%s,%s)"
+    cursor.execute(insert_query, (device_id, status, action_code))
+    db_connection.commit()
+    cursor.close()
+    db_connection.close()
+    print("已將狀態上傳到資料庫")
+    
+#上傳動作記錄到資料表(main)
+def upload_action_code_to_database(device_id, status,action_code):
+    db_connection = pymysql.connect(
+        host="192.168.50.57",
+        user="plant",
+        password="g53743001",
+        database="plant_test"
+    )
+    cursor = db_connection.cursor()
+    insert_query = "INSERT INTO `backup_status` (`Name`, `Status`,`Action_code`) VALUES (%s,%s,%s)"
+    cursor.execute(insert_query, (device_id, status, action_code))
+    db_connection.commit()
+    cursor.close()
+    db_connection.close()
+    print("已將狀態上傳到資料庫")
+    
+    
+#搜索資料表內的內容
+def select_action_code_by_database(device_name):
+    db_connection = pymysql.connect(
+        host="192.168.50.57",
+        user="plant",
+        password="g53743001",
+        database="plant_test"
+    )
+    cursor = db_connection.cursor()
+    select_query = "SELECT `Time`,`Name`,`Action_code` FROM `main_status` WHERE `Name` LIKE %s ORDER BY `Time` DESC LIMIT 1"
+    cursor.execute(select_query, (f'%{device_name}%',))
+    
+    results = cursor.fetchall()
+    if results:
+        for row in results:
+            time = row[0]
+            name = row[1]
+            action_code = row[2]
+            print(f"Time: {time}, Device Name: {name}, Action Code: {action_code}")
+    else:
+        print(f"No results found for the device name containing: {device_name}")
+    
+    cursor.close()
+    db_connection.close()
+
+# 連接回調函數
+def on_connect(client, userdata, flags, rc):
+    print("已連接,結果代碼:" + str(rc))
+    # 訂閱主要和備用主題
+    client.subscribe(mqtt_sub_topic)
+
+container_location_number = None
+# 訊息回調函數
+def on_message(client, userdata, msg):
+    global received_containers
+    print("收到消息:" + msg.payload.decode())
+    # 解析消息数据
+    data = json.loads(msg.payload)
+    container_location_number = data.get("container_location_number")
+    print(container_location_number)
+    
+    received_containers.add(container_location_number)
+
+
+# 設置連接和訊息回調函數
+client.on_connect = on_connect
+client.on_message = on_message
+
+# 連接到MQTT代理
+client.connect("192.168.50.185", 1883, 60)
+
+# 循環等待訊息
+client.loop_start()
+
+# 發送訊息函數
+def send_message(topic):
+    localtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
+    current_time = datetime.now()
+    #print("Current Time:", current_time)
+    payload = {
+        'device_id': 1,
+        'localtime': str(current_time),
+        'command': 'system_info'
+    }
+    print('發送訊息:')
+    print(json.dumps(payload))
+    print('------------------')
+    client.publish(topic, json.dumps(payload))
+    
+received_container_main = False
+received_container_backup = False
+main_topic_counter = 0
+backup_topic_counter = 0
+failure_threshold = 6  
+
+# 主迴圈
+while True:
+    # 發送訊息到主要主題和備用主題
+    send_message(mqtt_pub_topic)    
+
+    # 等待一段時間,以便檢查回復
+    time.sleep(10)  
+    
+    
+    for container_name, counters in container_counters.items():
+        if f"{container_name}_main" in received_containers and f"{container_name}_backup" in received_containers:
+            print(f"同時收到{container_name}主要和{container_name}備用回復,執行主要動作:123456")
+            # 執行主要動作...
+            action_code ="123456"
+            upload_action_code_to_database(f"{container_name}_main", 'OK',action_code)
+            counters["main"] = 0  # 重置主要計數器
+            counters["backup"] = 0  # 重置備用計數器
+        elif f"{container_name}_main" in received_containers:
+            print(f"僅收到{container_name}主要回復,執行主要動作:123")
+            # 執行主要動作...
+            select_action_code_by_database(f"{container_name}_backup")
+            action_code ="123"
+            #upload_action_code_to_database(f"{container_name}_main", 'OK',action_code)
+            counters["main"] = 0  # 重置主要計數器
+        elif f"{container_name}_backup" in received_containers:
+            print(f"僅收到{container_name}備用回復,執行備用動作:456")
+            # 執行備用動作...
+            select_action_code_by_database(f"{container_name}_main")
+            action_code ="456"
+            #upload_action_code_to_database(f"{container_name}_backup", 'OK',action_code)
+            #print(f"{container_name}_backup")
+            counters["backup"] = 0  # 重置備用計數器
+        else:
+            print(f"{container_name}主要和{container_name}備用均未收到回覆")
+            counters["main"] += 1  # 增加主要計數器
+            counters["backup"] += 1  # 增加備用計數器
+            print(f"Error-{container_name}主要計數器:", counters["main"])
+            print(f"Error-{container_name}備用計數器:", counters["backup"])
+            if counters["main"] >= failure_threshold:
+                print(f"超過{failure_threshold}次,{container_name}主要連線失敗")
+                counters["main"] = 0  # 重置主要計數器
+                # 執行錯誤處理...
+                #upload_status_to_database(container_name+"_main","ERROR")
+            if counters["backup"] >= failure_threshold:
+                print(f"超過{failure_threshold}次,{container_name}備用連線失敗")
+                counters["backup"] = 0  # 重置備用計數器
+                # 執行錯誤處理...
+                #upload_status_to_database(container_name+"_backup","ERROR")
+    
+#SELECT `Time`,`Name`,`Action_code` FROM `main_status` WHERE `Name` LIKE '%Container_1_backup%' ORDER BY `Time` DESC LIMIT 1
+    
+
+
+