edamame_brake_test.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. import pymysql
  2. import paho.mqtt.client as mqtt
  3. import json
  4. import threading
  5. from time import sleep as sl
  6. from datetime import datetime as dt
  7. from concurrent.futures import ThreadPoolExecutor
  8. class EdamameBrake(object):
  9. def __init__(self):
  10. self.db = pymysql.connect(host='52.69.200.169', port=3306, user='edamame', password='skyeye', database='Edamame', charset='utf8')
  11. self.cursor = self.db.cursor()
  12. self.client = mqtt.Client()
  13. self.zoom = 0
  14. self.pool = ThreadPoolExecutor(250)
  15. self.D = {'GTW009001001': 0, 'GTW009001002': 0, 'GTW009001003': 0, 'GTW009002001': 0, 'GTW009002002': 0,
  16. 'GTW009002003': 0, 'GTW009002004': 0, 'GTW009002005': 0, 'GTW009002006': 0, 'GTW009002007': 0,
  17. 'GTW009002008': 0, 'GTW009002009': 0, 'GTW009002010': 0, 'GTW009002011': 0, 'GTW009002012': 0,
  18. 'GTW009002013': 0, 'GTW009002014': 0, 'GTW009002015': 0, 'GTW009002016': 0,
  19. 'GTW009002017': 0, 'GTW009002018': 0, 'GTW009002019': 0}
  20. self.lock = threading.Lock()
  21. # 判斷小兵接收回傳訊息是否為這隻程式,還是另一個manage.py項目程式
  22. self.d = {'GTW009001001_n': 0, 'GTW009001002_n': 0, 'GTW009001003_n': 0, 'GTW009002001_n': 0, 'GTW009002002_n': 0,
  23. 'GTW009002003_n': 0, 'GTW009002004_n': 0,
  24. 'GTW009002005_n': 0, 'GTW009002006_n': 0, 'GTW009002007_n': 0, 'GTW009002008_n': 0, 'GTW009002009_n': 0,
  25. 'GTW009002010_n': 0, 'GTW009002011_n': 0,
  26. 'GTW009002012_n': 0, 'GTW009002013_n': 0, 'GTW009002014_n': 0, 'GTW009002015_n': 0, 'GTW009002016_n': 0,
  27. 'GTW009002017_n': 0, 'GTW009002018_n': 0, 'GTW009002019_n': 0}
  28. def queryMySQL(self):
  29. nodes_id = ['GTW009001001', 'GTW009001002', 'GTW009001003', 'GTW009002001', 'GTW009002002', 'GTW009002003', 'GTW009002004', 'GTW009002005', 'GTW009002006',
  30. 'GTW009002007', 'GTW009002008', 'GTW009002009', 'GTW009002010', 'GTW009002011', 'GTW009002012', 'GTW009002013', 'GTW009002014', 'GTW009002015',
  31. 'GTW009002016', 'GTW009002017', 'GTW009002018', 'GTW009002019']
  32. while True:
  33. sl(60)
  34. # 獲取當前時間
  35. current_time = dt.now()
  36. # 將datetime轉成字串類型,提取小時加分鐘
  37. hr_min = '0000' + dt.strftime(current_time, "%H%M")
  38. def query_job():
  39. db = pymysql.connect(host='52.69.200.169', port=3306, user='edamame', password='skyeye',
  40. database='Edamame', charset='utf8')
  41. cursor = db.cursor()
  42. for node_id in nodes_id:
  43. #找出每個MODE最新時間設定的方位,這裡使用substring_index和group_concat函數
  44. sql = "select mode, tilt_angle, pan_angle, zoom, time1, time2, time3, time4, time5, time6, time7, time8 from jvt_o_time where sn in(select substring_index(group_concat(sn order by datetime desc),',',1) from jvt_o_time where nr='" + node_id + "' group by mode)"
  45. cursor.execute(sql)
  46. message = cursor.fetchall()
  47. # print(hr_min)
  48. # print(node_id)
  49. # print(message)
  50. #找出現在小兵所在的排程倍率值
  51. sql = "select zoom from Orientation where nr='" + node_id + "' order by datetime desc limit 1"
  52. cursor.execute(sql)
  53. zoom = cursor.fetchone()
  54. if zoom :
  55. self.zoom = zoom[0]
  56. self.orientation(node_id, message, hr_min, current_time)
  57. cursor.close()
  58. db.close()
  59. self.pool.submit(query_job)
  60. def orientation(self, node_id, message, hr_min, current_time):
  61. def mqtt_job(self, node_id, mode, current_time, i):
  62. # 拍照的執行序函數
  63. def photo_job(self, node_id, mode, time_num):
  64. sl(60)
  65. current_time = dt.now()
  66. time = dt.strftime(current_time, "%Y-%m-%d_%H.%M")
  67. command = {"node_id": node_id, "command": "a051", "position": str(mode), "time": str(time_num),
  68. "filename": time}
  69. self.mqttPublish(node_id, command)
  70. db = pymysql.connect(host='52.69.200.169', port=3306, user='edamame', password='skyeye',
  71. database='Edamame', charset='utf8')
  72. cursor = db.cursor()
  73. # 垂直移動命令
  74. command = {"node_id": node_id, "command": "a014", "Tilt": mode[1]}
  75. self.mqttPublish(node_id, command)
  76. # 水平移動命令
  77. command = {"node_id": node_id, "command": "a012", "Pan": mode[2]}
  78. self.mqttPublish(node_id, command)
  79. # 倍率命令
  80. time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S")
  81. sql = "insert into Orientation(nr, mode, tilt_angle, pan_angle, zoom, datetime) values('%s', '%d', '%s', '%s', '%d', '%s')" % (
  82. node_id, mode[0], mode[1], mode[2], mode[3], time)
  83. cursor.execute(sql)
  84. db.commit()
  85. zoom = mode[3] - int(self.zoom)
  86. command = {"node_id": node_id, "command": "a016", "Zoom": zoom}
  87. self.mqttPublish(node_id, command)
  88. # 拍照命令
  89. time_num = 0
  90. if i == mode[4]:
  91. time_num = 1
  92. elif i == mode[5]:
  93. time_num = 2
  94. elif i == mode[6]:
  95. time_num = 3
  96. elif i == mode[7]:
  97. time_num = 4
  98. elif i == mode[8]:
  99. time_num = 5
  100. elif i == mode[9]:
  101. time_num = 6
  102. elif i == mode[10]:
  103. time_num = 7
  104. elif i == mode[11]:
  105. time_num = 8
  106. self.pool.submit(photo_job, self, node_id, mode[0], time_num)
  107. # photo_thread = threading.Thread(target=photo_job, args=(self, node_id, mode[0], time_num))
  108. # photo_thread.daemon = True
  109. # photo_thread.start()
  110. cursor.close()
  111. db.close()
  112. if message:
  113. for mode in message:
  114. for i in mode:
  115. #判斷是否等於現在時間
  116. if i == hr_min:
  117. self.pool.submit(mqtt_job, self, node_id, mode, current_time, i)
  118. # mqtt_thread = threading.Thread(target=mqtt_job, args=(self, node_id, mode, current_time, i))
  119. # mqtt_thread.daemon = True
  120. # mqtt_thread.start()
  121. def mqttPublish(self, node_id, command):
  122. # print (json.dumps(command))
  123. #要發布的主題和內容
  124. if node_id == 'GTW009001001':
  125. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:b7:52:9c'
  126. elif node_id == 'GTW009001002':
  127. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:c1:72:0c'
  128. elif node_id == 'GTW009001003':
  129. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:84:e9:3f'
  130. elif node_id == 'GTW009002001':
  131. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:95:00:53'
  132. elif node_id == 'GTW009002002':
  133. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:e7:51:44'
  134. elif node_id == 'GTW009002003':
  135. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:ce:a5:35'
  136. elif node_id == 'GTW009002004':
  137. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:fc:9d:34'
  138. elif node_id == 'GTW009002005':
  139. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:86:00:c9'
  140. elif node_id == 'GTW009002006':
  141. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:df:4b:0f'
  142. elif node_id == 'GTW009002007':
  143. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:6a:5d:17'
  144. elif node_id == 'GTW009002008':
  145. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:d2:d0:8f'
  146. elif node_id == 'GTW009002009':
  147. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:57:3c:da'
  148. elif node_id == 'GTW009002010':
  149. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:bd:29:b1'
  150. elif node_id == 'GTW009002011':
  151. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:7c:f6:06'
  152. elif node_id == 'GTW009002012':
  153. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:59:9d:bd'
  154. elif node_id == 'GTW009002013':
  155. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:e7:21:e5'
  156. elif node_id == 'GTW009002014':
  157. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:34:9e:39'
  158. elif node_id == 'GTW009002015':
  159. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:e3:f1:f4'
  160. elif node_id == 'GTW009002016':
  161. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:f8:24:92'
  162. elif node_id == 'GTW009002017':
  163. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:9d:68:05'
  164. elif node_id == 'GTW009002018':
  165. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:4d:e4:34'
  166. elif node_id == 'GTW009002019':
  167. topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:e7:16:28'
  168. self.client.publish(topic, json.dumps(command))
  169. num = node_id + '_n'
  170. self.d[num] = 1
  171. current_time = dt.now()
  172. time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S")
  173. if command['command'] != 'a053':
  174. if command['command'] == 'a051':
  175. sl(600)
  176. elif command['command'] == 'a018' or command['command'] == 'a016' or command['command'] == 'a052':
  177. sl(40)
  178. else:
  179. sl(10)
  180. if self.D[node_id] == 0:
  181. # print(json.dumps(command))
  182. self.client.publish(topic, json.dumps(command))
  183. if command['command'] == 'a051':
  184. sl(600)
  185. elif command['command'] == 'a018' or command['command'] == 'a016' or command['command'] == 'a052':
  186. sl(40)
  187. else:
  188. sl(10)
  189. if self.D[node_id] == 0:
  190. # print(json.dumps(command))
  191. self.client.publish(topic, json.dumps(command))
  192. if command['command'] == 'a051':
  193. sl(600)
  194. elif command['command'] == 'a018' or command['command'] == 'a016' or command['command'] == 'a052':
  195. sl(40)
  196. else:
  197. sl(10)
  198. if self.D[node_id] == 0:
  199. sql = "insert into status(nr, status, datetime) values('%s', '%s', '%s')" % (
  200. node_id, 'fail', time)
  201. self.cursor.execute(sql)
  202. self.db.commit()
  203. self.d[num] = 0
  204. self.D[node_id] = 0
  205. def workOn(self):
  206. self = self
  207. # 當地端程式連線伺服器得到回應時,要做的動作
  208. def on_connect(client, userdata, flags, rc):
  209. #print("Waiting for MQTT message...")
  210. # 將訂閱主題寫在on_connet中
  211. # 如果我們失去連線或重新連線時
  212. # 地端程式將會重新訂閱
  213. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:b7:52:9c/Log')
  214. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:c1:72:0c/Log')
  215. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:84:e9:3f/Log')
  216. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:95:00:53/Log')
  217. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:e7:51:44/Log')
  218. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:ce:a5:35/Log')
  219. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:fc:9d:34/Log')
  220. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:86:00:c9/Log')
  221. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:df:4b:0f/Log')
  222. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:6a:5d:17/Log')
  223. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:d2:d0:8f/Log')
  224. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:57:3c:da/Log')
  225. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:bd:29:b1/Log')
  226. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:7c:f6:06/Log')
  227. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:59:9d:bd/Log')
  228. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:e7:21:e5/Log')
  229. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:34:9e:39/Log')
  230. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:e3:f1:f4/Log')
  231. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:f8:24:92/Log')
  232. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:9d:68:05/Log')
  233. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:4d:e4:34/Log')
  234. client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:e7:16:28/Log')
  235. # 當接收到從伺服器發送的訊息時要進行的動作
  236. def on_message(client, userdata, msg):
  237. msg = msg
  238. def message_job(self, msg):
  239. db = pymysql.connect(host='52.69.200.169', port=3306, user='edamame', password='skyeye',
  240. database='Edamame', charset='utf8')
  241. cursor = db.cursor()
  242. # 轉換編碼utf-8才看得懂中文
  243. # print(msg.topic + " " + msg.payload.decode('utf-8'))
  244. payload = msg.payload.decode('utf-8')
  245. current_time = dt.now()
  246. time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S")
  247. if payload:
  248. p = json.loads(payload)
  249. if self.d[p['node_id'] + '_n'] == 1:
  250. if 'filename' in p:
  251. sql = "insert into imgSignalLog(nr, command, response, position, time, size_a, size_b, filename, datetime) \
  252. values('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')" % (
  253. p['node_id'], p['command'], p['rqnn'], p['position'], p['time'], p['a'], p['b'],
  254. p['filename'],
  255. time)
  256. cursor.execute(sql)
  257. else:
  258. if 'tilt' in p:
  259. tilt_angle = p['tilt']
  260. else:
  261. tilt_angle = 'NULL'
  262. if 'pan' in p:
  263. pan_angle = p['pan']
  264. else:
  265. pan_angle = 'NULL'
  266. if 'zoom' in p:
  267. zoom = p['zoom']
  268. else:
  269. zoom = 'NULL'
  270. if 'position' in p:
  271. position = p['position']
  272. else:
  273. position = 'NULL'
  274. sql = "insert into signalLog(nr, command, response, tilt_angle, pan_angle, zoom, position, datetime) \
  275. values('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')" % (
  276. p['node_id'], p['command'], p['rqnn'], tilt_angle, pan_angle, zoom, position, time)
  277. # self.lock.acquire()
  278. cursor.execute(sql)
  279. # self.lock.release()
  280. db.commit()
  281. self.D[p['node_id']] = 1
  282. self.d[p['node_id'] + '_n'] = 0
  283. cursor.close()
  284. db.close()
  285. self.pool.submit(message_job, self, msg)
  286. # message_thread = threading.Thread(target=message_job, args=(self, msg))
  287. # message_thread.daemon = True
  288. # message_thread.start()
  289. def thread_job(self):
  290. # 設定連線的動作
  291. self.client.on_connect = on_connect
  292. # 設定接收訊息的動作
  293. self.client.on_message = on_message
  294. # 設定登入帳號密碼
  295. self.client.username_pw_set("aisky-client", "aiskyc")
  296. # 設定連線資訊(IP, Port, 連線時間)
  297. self.client.connect("60.250.156.234", 1883, 60)
  298. # 開始連線,執行設定的動作和處理重新連線問題
  299. # 也可以手動使用其他loop函式來進行連接
  300. self.client.loop_forever()
  301. mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,))
  302. mqtt_subscribe_thread.daemon = True
  303. mqtt_subscribe_thread.start()
  304. #睡1秒等執行敘跑完才跑主程式
  305. sl(1)
  306. self.queryMySQL()
  307. self.cursor.close()
  308. self.db.close()
  309. self.pool.shutdown(wait=True)
  310. if __name__ == '__main__':
  311. edamame = EdamameBrake()
  312. try:
  313. edamame.workOn()
  314. except KeyboardInterrupt:
  315. edamame.pool.shutdown(wait=True)
  316. edamame.cursor.close()
  317. edamame.db.close()