import pymysql import paho.mqtt.client as mqtt import json import threading from time import sleep as sl from datetime import datetime as dt from concurrent.futures import ThreadPoolExecutor class EdamameBrake(object): def __init__(self): self.db = pymysql.connect(host='52.69.200.169', port=3306, user='edamame', password='skyeye', database='Edamame', charset='utf8') self.cursor = self.db.cursor() self.client = mqtt.Client() self.zoom = 0 self.pool = ThreadPoolExecutor(250) self.D = {'GTW009001001': 0, 'GTW009001002': 0, 'GTW009001003': 0, 'GTW009002001': 0, 'GTW009002002': 0, 'GTW009002003': 0, 'GTW009002004': 0, 'GTW009002005': 0, 'GTW009002006': 0, 'GTW009002007': 0, 'GTW009002008': 0, 'GTW009002009': 0, 'GTW009002010': 0, 'GTW009002011': 0, 'GTW009002012': 0, 'GTW009002013': 0, 'GTW009002014': 0, 'GTW009002015': 0, 'GTW009002016': 0, 'GTW009002017': 0, 'GTW009002018': 0, 'GTW009002019': 0} self.lock = threading.Lock() # 判斷小兵接收回傳訊息是否為這隻程式,還是另一個manage.py項目程式 self.d = {'GTW009001001_n': 0, 'GTW009001002_n': 0, 'GTW009001003_n': 0, 'GTW009002001_n': 0, 'GTW009002002_n': 0, 'GTW009002003_n': 0, 'GTW009002004_n': 0, 'GTW009002005_n': 0, 'GTW009002006_n': 0, 'GTW009002007_n': 0, 'GTW009002008_n': 0, 'GTW009002009_n': 0, 'GTW009002010_n': 0, 'GTW009002011_n': 0, 'GTW009002012_n': 0, 'GTW009002013_n': 0, 'GTW009002014_n': 0, 'GTW009002015_n': 0, 'GTW009002016_n': 0, 'GTW009002017_n': 0, 'GTW009002018_n': 0, 'GTW009002019_n': 0} def queryMySQL(self): nodes_id = ['GTW009001001', 'GTW009001002', 'GTW009001003', 'GTW009002001', 'GTW009002002', 'GTW009002003', 'GTW009002004', 'GTW009002005', 'GTW009002006', 'GTW009002007', 'GTW009002008', 'GTW009002009', 'GTW009002010', 'GTW009002011', 'GTW009002012', 'GTW009002013', 'GTW009002014', 'GTW009002015', 'GTW009002016', 'GTW009002017', 'GTW009002018', 'GTW009002019'] while True: sl(60) # 獲取當前時間 current_time = dt.now() # 將datetime轉成字串類型,提取小時加分鐘 hr_min = '0000' + dt.strftime(current_time, "%H%M") def query_job(): db = pymysql.connect(host='52.69.200.169', port=3306, user='edamame', password='skyeye', database='Edamame', charset='utf8') cursor = db.cursor() for node_id in nodes_id: #找出每個MODE最新時間設定的方位,這裡使用substring_index和group_concat函數 sql = "select mode, tilt_angle, pan_angle, zoom, time1, time2, time3, time4, time5, time6, time7, time8 from jvt_o_time where sn in(select substring_index(group_concat(sn order by datetime desc),',',1) from jvt_o_time where nr='" + node_id + "' group by mode)" cursor.execute(sql) message = cursor.fetchall() # print(hr_min) # print(node_id) # print(message) #找出現在小兵所在的排程倍率值 sql = "select zoom from Orientation where nr='" + node_id + "' order by datetime desc limit 1" cursor.execute(sql) zoom = cursor.fetchone() if zoom : self.zoom = zoom[0] self.orientation(node_id, message, hr_min, current_time) cursor.close() db.close() self.pool.submit(query_job) def orientation(self, node_id, message, hr_min, current_time): def mqtt_job(self, node_id, mode, current_time, i): # 拍照的執行序函數 def photo_job(self, node_id, mode, time_num): sl(60) current_time = dt.now() time = dt.strftime(current_time, "%Y-%m-%d_%H.%M") command = {"node_id": node_id, "command": "a051", "position": str(mode), "time": str(time_num), "filename": time} self.mqttPublish(node_id, command) db = pymysql.connect(host='52.69.200.169', port=3306, user='edamame', password='skyeye', database='Edamame', charset='utf8') cursor = db.cursor() # 垂直移動命令 command = {"node_id": node_id, "command": "a014", "Tilt": mode[1]} self.mqttPublish(node_id, command) # 水平移動命令 command = {"node_id": node_id, "command": "a012", "Pan": mode[2]} self.mqttPublish(node_id, command) # 倍率命令 time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S") sql = "insert into Orientation(nr, mode, tilt_angle, pan_angle, zoom, datetime) values('%s', '%d', '%s', '%s', '%d', '%s')" % ( node_id, mode[0], mode[1], mode[2], mode[3], time) cursor.execute(sql) db.commit() zoom = mode[3] - int(self.zoom) command = {"node_id": node_id, "command": "a016", "Zoom": zoom} self.mqttPublish(node_id, command) # 拍照命令 time_num = 0 if i == mode[4]: time_num = 1 elif i == mode[5]: time_num = 2 elif i == mode[6]: time_num = 3 elif i == mode[7]: time_num = 4 elif i == mode[8]: time_num = 5 elif i == mode[9]: time_num = 6 elif i == mode[10]: time_num = 7 elif i == mode[11]: time_num = 8 self.pool.submit(photo_job, self, node_id, mode[0], time_num) # photo_thread = threading.Thread(target=photo_job, args=(self, node_id, mode[0], time_num)) # photo_thread.daemon = True # photo_thread.start() cursor.close() db.close() if message: for mode in message: for i in mode: #判斷是否等於現在時間 if i == hr_min: self.pool.submit(mqtt_job, self, node_id, mode, current_time, i) # mqtt_thread = threading.Thread(target=mqtt_job, args=(self, node_id, mode, current_time, i)) # mqtt_thread.daemon = True # mqtt_thread.start() def mqttPublish(self, node_id, command): # print (json.dumps(command)) #要發布的主題和內容 if node_id == 'GTW009001001': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:b7:52:9c' elif node_id == 'GTW009001002': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:c1:72:0c' elif node_id == 'GTW009001003': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:84:e9:3f' elif node_id == 'GTW009002001': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:95:00:53' elif node_id == 'GTW009002002': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:e7:51:44' elif node_id == 'GTW009002003': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:ce:a5:35' elif node_id == 'GTW009002004': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:fc:9d:34' elif node_id == 'GTW009002005': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:86:00:c9' elif node_id == 'GTW009002006': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:df:4b:0f' elif node_id == 'GTW009002007': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:6a:5d:17' elif node_id == 'GTW009002008': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:d2:d0:8f' elif node_id == 'GTW009002009': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:57:3c:da' elif node_id == 'GTW009002010': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:bd:29:b1' elif node_id == 'GTW009002011': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:7c:f6:06' elif node_id == 'GTW009002012': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:59:9d:bd' elif node_id == 'GTW009002013': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:e7:21:e5' elif node_id == 'GTW009002014': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:34:9e:39' elif node_id == 'GTW009002015': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:e3:f1:f4' elif node_id == 'GTW009002016': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:f8:24:92' elif node_id == 'GTW009002017': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:9d:68:05' elif node_id == 'GTW009002018': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:4d:e4:34' elif node_id == 'GTW009002019': topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:e7:16:28' self.client.publish(topic, json.dumps(command)) num = node_id + '_n' self.d[num] = 1 current_time = dt.now() time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S") if command['command'] != 'a053': if command['command'] == 'a051': sl(600) elif command['command'] == 'a018' or command['command'] == 'a016' or command['command'] == 'a052': sl(40) else: sl(10) if self.D[node_id] == 0: # print(json.dumps(command)) self.client.publish(topic, json.dumps(command)) if command['command'] == 'a051': sl(600) elif command['command'] == 'a018' or command['command'] == 'a016' or command['command'] == 'a052': sl(40) else: sl(10) if self.D[node_id] == 0: # print(json.dumps(command)) self.client.publish(topic, json.dumps(command)) if command['command'] == 'a051': sl(600) elif command['command'] == 'a018' or command['command'] == 'a016' or command['command'] == 'a052': sl(40) else: sl(10) if self.D[node_id] == 0: sql = "insert into status(nr, status, datetime) values('%s', '%s', '%s')" % ( node_id, 'fail', time) self.cursor.execute(sql) self.db.commit() self.d[num] = 0 self.D[node_id] = 0 def workOn(self): self = self # 當地端程式連線伺服器得到回應時,要做的動作 def on_connect(client, userdata, flags, rc): #print("Waiting for MQTT message...") # 將訂閱主題寫在on_connet中 # 如果我們失去連線或重新連線時 # 地端程式將會重新訂閱 client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:b7:52:9c/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:c1:72:0c/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:84:e9:3f/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:95:00:53/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:e7:51:44/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:ce:a5:35/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:fc:9d:34/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:86:00:c9/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:df:4b:0f/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:6a:5d:17/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:d2:d0:8f/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:57:3c:da/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:bd:29:b1/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:7c:f6:06/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:59:9d:bd/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:e7:21:e5/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:34:9e:39/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:e3:f1:f4/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:f8:24:92/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:9d:68:05/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:4d:e4:34/Log') client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:e7:16:28/Log') # 當接收到從伺服器發送的訊息時要進行的動作 def on_message(client, userdata, msg): msg = msg def message_job(self, msg): db = pymysql.connect(host='52.69.200.169', port=3306, user='edamame', password='skyeye', database='Edamame', charset='utf8') cursor = db.cursor() # 轉換編碼utf-8才看得懂中文 # print(msg.topic + " " + msg.payload.decode('utf-8')) payload = msg.payload.decode('utf-8') current_time = dt.now() time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S") if payload: p = json.loads(payload) if self.d[p['node_id'] + '_n'] == 1: if 'filename' in p: sql = "insert into imgSignalLog(nr, command, response, position, time, size_a, size_b, filename, datetime) \ values('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')" % ( p['node_id'], p['command'], p['rqnn'], p['position'], p['time'], p['a'], p['b'], p['filename'], time) cursor.execute(sql) else: if 'tilt' in p: tilt_angle = p['tilt'] else: tilt_angle = 'NULL' if 'pan' in p: pan_angle = p['pan'] else: pan_angle = 'NULL' if 'zoom' in p: zoom = p['zoom'] else: zoom = 'NULL' if 'position' in p: position = p['position'] else: position = 'NULL' sql = "insert into signalLog(nr, command, response, tilt_angle, pan_angle, zoom, position, datetime) \ values('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')" % ( p['node_id'], p['command'], p['rqnn'], tilt_angle, pan_angle, zoom, position, time) # self.lock.acquire() cursor.execute(sql) # self.lock.release() db.commit() self.D[p['node_id']] = 1 self.d[p['node_id'] + '_n'] = 0 cursor.close() db.close() self.pool.submit(message_job, self, msg) # message_thread = threading.Thread(target=message_job, args=(self, msg)) # message_thread.daemon = True # message_thread.start() def thread_job(self): # 設定連線的動作 self.client.on_connect = on_connect # 設定接收訊息的動作 self.client.on_message = on_message # 設定登入帳號密碼 self.client.username_pw_set("aisky-client", "aiskyc") # 設定連線資訊(IP, Port, 連線時間) self.client.connect("60.250.156.234", 1883, 60) # 開始連線,執行設定的動作和處理重新連線問題 # 也可以手動使用其他loop函式來進行連接 self.client.loop_forever() mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,)) mqtt_subscribe_thread.daemon = True mqtt_subscribe_thread.start() #睡1秒等執行敘跑完才跑主程式 sl(1) self.queryMySQL() self.cursor.close() self.db.close() self.pool.shutdown(wait=True) if __name__ == '__main__': edamame = EdamameBrake() try: edamame.workOn() except KeyboardInterrupt: edamame.pool.shutdown(wait=True) edamame.cursor.close() edamame.db.close()