123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- import pymysql
- import paho.mqtt.client as mqtt
- import json
- import threading
- from time import sleep as sl
- from datetime import datetime as dt
- from concurrent.futures import ThreadPoolExecutor
- class EdamameBrake(object):
- def __init__(self):
- self.db = pymysql.connect(host='52.69.200.169', port=3306, user='edamame', password='skyeye', database='Edamame', charset='utf8')
- self.cursor = self.db.cursor()
- self.client = mqtt.Client()
- self.zoom = 0
- self.pool = ThreadPoolExecutor(250)
- self.D = {'GTW009001001': 0, 'GTW009001002': 0, 'GTW009001003': 0, 'GTW009002001': 0, 'GTW009002002': 0,
- 'GTW009002003': 0, 'GTW009002004': 0, 'GTW009002005': 0, 'GTW009002006': 0, 'GTW009002007': 0,
- 'GTW009002008': 0, 'GTW009002009': 0, 'GTW009002010': 0, 'GTW009002011': 0, 'GTW009002012': 0,
- 'GTW009002013': 0, 'GTW009002014': 0, 'GTW009002015': 0, 'GTW009002016': 0,
- 'GTW009002017': 0, 'GTW009002018': 0, 'GTW009002019': 0}
- self.lock = threading.Lock()
- # 判斷小兵接收回傳訊息是否為這隻程式,還是另一個manage.py項目程式
- self.d = {'GTW009001001_n': 0, 'GTW009001002_n': 0, 'GTW009001003_n': 0, 'GTW009002001_n': 0, 'GTW009002002_n': 0,
- 'GTW009002003_n': 0, 'GTW009002004_n': 0,
- 'GTW009002005_n': 0, 'GTW009002006_n': 0, 'GTW009002007_n': 0, 'GTW009002008_n': 0, 'GTW009002009_n': 0,
- 'GTW009002010_n': 0, 'GTW009002011_n': 0,
- 'GTW009002012_n': 0, 'GTW009002013_n': 0, 'GTW009002014_n': 0, 'GTW009002015_n': 0, 'GTW009002016_n': 0,
- 'GTW009002017_n': 0, 'GTW009002018_n': 0, 'GTW009002019_n': 0}
- def queryMySQL(self):
- nodes_id = ['GTW009001001', 'GTW009001002', 'GTW009001003', 'GTW009002001', 'GTW009002002', 'GTW009002003', 'GTW009002004', 'GTW009002005', 'GTW009002006',
- 'GTW009002007', 'GTW009002008', 'GTW009002009', 'GTW009002010', 'GTW009002011', 'GTW009002012', 'GTW009002013', 'GTW009002014', 'GTW009002015',
- 'GTW009002016', 'GTW009002017', 'GTW009002018', 'GTW009002019']
- while True:
- sl(60)
- # 獲取當前時間
- current_time = dt.now()
- # 將datetime轉成字串類型,提取小時加分鐘
- hr_min = '0000' + dt.strftime(current_time, "%H%M")
- def query_job():
- db = pymysql.connect(host='52.69.200.169', port=3306, user='edamame', password='skyeye',
- database='Edamame', charset='utf8')
- cursor = db.cursor()
- for node_id in nodes_id:
- #找出每個MODE最新時間設定的方位,這裡使用substring_index和group_concat函數
- sql = "select mode, tilt_angle, pan_angle, zoom, time1, time2, time3, time4, time5, time6, time7, time8 from jvt_o_time where sn in(select substring_index(group_concat(sn order by datetime desc),',',1) from jvt_o_time where nr='" + node_id + "' group by mode)"
- cursor.execute(sql)
- message = cursor.fetchall()
- # print(hr_min)
- # print(node_id)
- # print(message)
- #找出現在小兵所在的排程倍率值
- sql = "select zoom from Orientation where nr='" + node_id + "' order by datetime desc limit 1"
- cursor.execute(sql)
- zoom = cursor.fetchone()
- if zoom :
- self.zoom = zoom[0]
- self.orientation(node_id, message, hr_min, current_time)
- cursor.close()
- db.close()
- self.pool.submit(query_job)
- def orientation(self, node_id, message, hr_min, current_time):
- def mqtt_job(self, node_id, mode, current_time, i):
- # 拍照的執行序函數
- def photo_job(self, node_id, mode, time_num):
- sl(60)
- current_time = dt.now()
- time = dt.strftime(current_time, "%Y-%m-%d_%H.%M")
- command = {"node_id": node_id, "command": "a051", "position": str(mode), "time": str(time_num),
- "filename": time}
- self.mqttPublish(node_id, command)
- db = pymysql.connect(host='52.69.200.169', port=3306, user='edamame', password='skyeye',
- database='Edamame', charset='utf8')
- cursor = db.cursor()
- # 垂直移動命令
- command = {"node_id": node_id, "command": "a014", "Tilt": mode[1]}
- self.mqttPublish(node_id, command)
- # 水平移動命令
- command = {"node_id": node_id, "command": "a012", "Pan": mode[2]}
- self.mqttPublish(node_id, command)
- # 倍率命令
- time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S")
- sql = "insert into Orientation(nr, mode, tilt_angle, pan_angle, zoom, datetime) values('%s', '%d', '%s', '%s', '%d', '%s')" % (
- node_id, mode[0], mode[1], mode[2], mode[3], time)
- cursor.execute(sql)
- db.commit()
- zoom = mode[3] - int(self.zoom)
- command = {"node_id": node_id, "command": "a016", "Zoom": zoom}
- self.mqttPublish(node_id, command)
- # 拍照命令
- time_num = 0
- if i == mode[4]:
- time_num = 1
- elif i == mode[5]:
- time_num = 2
- elif i == mode[6]:
- time_num = 3
- elif i == mode[7]:
- time_num = 4
- elif i == mode[8]:
- time_num = 5
- elif i == mode[9]:
- time_num = 6
- elif i == mode[10]:
- time_num = 7
- elif i == mode[11]:
- time_num = 8
- self.pool.submit(photo_job, self, node_id, mode[0], time_num)
- # photo_thread = threading.Thread(target=photo_job, args=(self, node_id, mode[0], time_num))
- # photo_thread.daemon = True
- # photo_thread.start()
- cursor.close()
- db.close()
- if message:
- for mode in message:
- for i in mode:
- #判斷是否等於現在時間
- if i == hr_min:
- self.pool.submit(mqtt_job, self, node_id, mode, current_time, i)
- # mqtt_thread = threading.Thread(target=mqtt_job, args=(self, node_id, mode, current_time, i))
- # mqtt_thread.daemon = True
- # mqtt_thread.start()
- def mqttPublish(self, node_id, command):
- # print (json.dumps(command))
- #要發布的主題和內容
- if node_id == 'GTW009001001':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:b7:52:9c'
- elif node_id == 'GTW009001002':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:c1:72:0c'
- elif node_id == 'GTW009001003':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:84:e9:3f'
- elif node_id == 'GTW009002001':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:95:00:53'
- elif node_id == 'GTW009002002':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:e7:51:44'
- elif node_id == 'GTW009002003':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:ce:a5:35'
- elif node_id == 'GTW009002004':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:fc:9d:34'
- elif node_id == 'GTW009002005':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:86:00:c9'
- elif node_id == 'GTW009002006':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:df:4b:0f'
- elif node_id == 'GTW009002007':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:6a:5d:17'
- elif node_id == 'GTW009002008':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:d2:d0:8f'
- elif node_id == 'GTW009002009':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:57:3c:da'
- elif node_id == 'GTW009002010':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:bd:29:b1'
- elif node_id == 'GTW009002011':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:7c:f6:06'
- elif node_id == 'GTW009002012':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:59:9d:bd'
- elif node_id == 'GTW009002013':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:e7:21:e5'
- elif node_id == 'GTW009002014':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:34:9e:39'
- elif node_id == 'GTW009002015':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:e3:f1:f4'
- elif node_id == 'GTW009002016':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:f8:24:92'
- elif node_id == 'GTW009002017':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:9d:68:05'
- elif node_id == 'GTW009002018':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:4d:e4:34'
- elif node_id == 'GTW009002019':
- topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:e7:16:28'
- self.client.publish(topic, json.dumps(command))
- num = node_id + '_n'
- self.d[num] = 1
- current_time = dt.now()
- time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S")
- if command['command'] != 'a053':
- if command['command'] == 'a051':
- sl(600)
- elif command['command'] == 'a018' or command['command'] == 'a016' or command['command'] == 'a052':
- sl(40)
- else:
- sl(10)
- if self.D[node_id] == 0:
- # print(json.dumps(command))
- self.client.publish(topic, json.dumps(command))
- if command['command'] == 'a051':
- sl(600)
- elif command['command'] == 'a018' or command['command'] == 'a016' or command['command'] == 'a052':
- sl(40)
- else:
- sl(10)
- if self.D[node_id] == 0:
- # print(json.dumps(command))
- self.client.publish(topic, json.dumps(command))
- if command['command'] == 'a051':
- sl(600)
- elif command['command'] == 'a018' or command['command'] == 'a016' or command['command'] == 'a052':
- sl(40)
- else:
- sl(10)
- if self.D[node_id] == 0:
- sql = "insert into status(nr, status, datetime) values('%s', '%s', '%s')" % (
- node_id, 'fail', time)
- self.cursor.execute(sql)
- self.db.commit()
- self.d[num] = 0
- self.D[node_id] = 0
- def workOn(self):
- self = self
- # 當地端程式連線伺服器得到回應時,要做的動作
- def on_connect(client, userdata, flags, rc):
- #print("Waiting for MQTT message...")
- # 將訂閱主題寫在on_connet中
- # 如果我們失去連線或重新連線時
- # 地端程式將會重新訂閱
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:b7:52:9c/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:c1:72:0c/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:84:e9:3f/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:95:00:53/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:e7:51:44/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:ce:a5:35/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:fc:9d:34/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:86:00:c9/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:df:4b:0f/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:6a:5d:17/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:d2:d0:8f/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:57:3c:da/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:bd:29:b1/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:7c:f6:06/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:59:9d:bd/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:e7:21:e5/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:34:9e:39/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:e3:f1:f4/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:f8:24:92/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:9d:68:05/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:4d:e4:34/Log')
- client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:e7:16:28/Log')
- # 當接收到從伺服器發送的訊息時要進行的動作
- def on_message(client, userdata, msg):
- msg = msg
- def message_job(self, msg):
- db = pymysql.connect(host='52.69.200.169', port=3306, user='edamame', password='skyeye',
- database='Edamame', charset='utf8')
- cursor = db.cursor()
- # 轉換編碼utf-8才看得懂中文
- # print(msg.topic + " " + msg.payload.decode('utf-8'))
- payload = msg.payload.decode('utf-8')
- current_time = dt.now()
- time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S")
- if payload:
- p = json.loads(payload)
- if self.d[p['node_id'] + '_n'] == 1:
- if 'filename' in p:
- sql = "insert into imgSignalLog(nr, command, response, position, time, size_a, size_b, filename, datetime) \
- values('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')" % (
- p['node_id'], p['command'], p['rqnn'], p['position'], p['time'], p['a'], p['b'],
- p['filename'],
- time)
- cursor.execute(sql)
- else:
- if 'tilt' in p:
- tilt_angle = p['tilt']
- else:
- tilt_angle = 'NULL'
- if 'pan' in p:
- pan_angle = p['pan']
- else:
- pan_angle = 'NULL'
- if 'zoom' in p:
- zoom = p['zoom']
- else:
- zoom = 'NULL'
- if 'position' in p:
- position = p['position']
- else:
- position = 'NULL'
- sql = "insert into signalLog(nr, command, response, tilt_angle, pan_angle, zoom, position, datetime) \
- values('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')" % (
- p['node_id'], p['command'], p['rqnn'], tilt_angle, pan_angle, zoom, position, time)
- # self.lock.acquire()
- cursor.execute(sql)
- # self.lock.release()
- db.commit()
- self.D[p['node_id']] = 1
- self.d[p['node_id'] + '_n'] = 0
- cursor.close()
- db.close()
- self.pool.submit(message_job, self, msg)
- # message_thread = threading.Thread(target=message_job, args=(self, msg))
- # message_thread.daemon = True
- # message_thread.start()
- def thread_job(self):
- # 設定連線的動作
- self.client.on_connect = on_connect
- # 設定接收訊息的動作
- self.client.on_message = on_message
- # 設定登入帳號密碼
- self.client.username_pw_set("aisky-client", "aiskyc")
- # 設定連線資訊(IP, Port, 連線時間)
- self.client.connect("60.250.156.234", 1883, 60)
- # 開始連線,執行設定的動作和處理重新連線問題
- # 也可以手動使用其他loop函式來進行連接
- self.client.loop_forever()
- mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,))
- mqtt_subscribe_thread.daemon = True
- mqtt_subscribe_thread.start()
- #睡1秒等執行敘跑完才跑主程式
- sl(1)
- self.queryMySQL()
- self.cursor.close()
- self.db.close()
- self.pool.shutdown(wait=True)
- if __name__ == '__main__':
- edamame = EdamameBrake()
- try:
- edamame.workOn()
- except KeyboardInterrupt:
- edamame.pool.shutdown(wait=True)
- edamame.cursor.close()
- edamame.db.close()
-
|