import pymysql import rospy import paho.mqtt.client as mqtt from std_msgs.msg import String from geometry_msgs.msg import PoseStamped, PoseWithCovarianceStamped from move_base_msgs.msg import MoveBaseActionResult import json import threading from time import sleep as sl from datetime import datetime as dt from concurrent.futures import ThreadPoolExecutor class CowdungCartSchedule(object): def __init__(self): self.db = pymysql.connect(host='52.69.200.169', port=3306, user='cowdungcart', password='skyeye', database='CowdungCart', charset='utf8') self.cursor = self.db.cursor() self.client = mqtt.Client() self.goalStatus = "Got new plan" self.pool = ThreadPoolExecutor(10) def queryMySQL(self): while True: sl(60) current_time = dt.now() time = dt.strftime(current_time, "%H:%M") def query_job(time): #重新連線資料庫,即時更新 db = pymysql.connect(host='52.69.200.169', port=3306, user='cowdungcart', password='skyeye', database='CowdungCart', charset='utf8') cursor = db.cursor() #搜尋排程時間 sql = "select time1, time2, time3, time4, time5, time6, time7 from schedule order by datetime desc limit 1" cursor.execute(sql) schedules = cursor.fetchall()[0] for schedule in schedules: print(schedule) #將排程時間與現在時間做比對 if schedule == time: #搜尋目標點座標 sql = "select goal_change_x, goal_change_y from goal" cursor.execute(sql) goals = cursor.fetchall() #循環遍歷目標點 for goal in goals: self.goalPublish(goal) while True: if self.goalStatus == 'Goal reached.': self.goalStatus = "Got new plan" break # 重新連線資料庫,即時更新 db = pymysql.connect(host='52.69.200.169', port=3306, user='cowdungcart', password='skyeye', database='CowdungCart', charset='utf8') cursor = db.cursor() # 開始監測感測器的值 # 超音波糞便高度 sonic_sql = "select sonic from sonic order by datetime desc limit 1" cursor.execute(sonic_sql) sonic = cursor.fetchall()[0] # 水位 water_level_sql = "select water_level from water_level order by datetime desc limit 1" cursor.execute(water_level_sql) water_level = cursor.fetchall()[0][0] cursor.close() db.close() sl(35) print("----------------------------------------------------") print(water_level) if water_level == 'top' or sonic == '': # 記住當下車子的位置 currentPose = (self.currentPoseX, self.currentPoseY) # 發送基站前的目標點 basePose = (-0.585351440576172, -0.5214363581840141) self.goalPublish(basePose) # 如果到達基站跳出循環 while True: if self.goalStatus == 'Goal reached.': self.goalStatus = "Got new plan" break # MQTT pass # 回到原來紀錄位置 self.goalPublish(currentPose) # 如果到達之前記錄位置跳出循環 while True: if self.goalStatus == 'Goal reached.': self.goalStatus = "Got new plan" break # 往原來的目標點前進 self.goalPublish(goal) # 如果到達原來目標點跳出循環 while True: if self.goalStatus == 'Goal reached.': break cursor.close() db.close() self.pool.submit(query_job, time) def goalPublish(self, goal): # 定义发布器pub,发布的题目是chatter,消息类型是String,然后queue_size是在订阅者接受消息不够快的时候保留的消息的数量,如果对qos要求低的话可以设为0,不设置的话会出个报警,不过不会出错 # pub = rospy.Publisher('chatter', String, queue_size=10) pub = rospy.Publisher('move_base_simple/goal', PoseStamped, queue_size=10) # 接下来这里初始化了一个名为talker的节点,节点的名字不能以‘ / ’开头,因为之后会自动给你加一个。anonymous参数在为True的时候会在原本节点名字的后面加一串随机数,来保证可以同时开启多个同样的节点,如果为false的话就只能开一个 #rospy.init_node('goalPublish', anonymous=True) # 这里初始化一个Rate对象,通过后面的sleep()可以设定循环的频率 rate = rospy.Rate(10) # 10hz ps = PoseStamped() ps.header.seq = 0 ps.header.stamp.secs = 0 ps.header.stamp.nsecs = 0 ps.header.frame_id = 'map' ps.pose.position.x = float(goal[0]) ps.pose.position.y = float(goal[1]) ps.pose.position.z = 0.0 ps.pose.orientation.x = 0.0 ps.pose.orientation.y = 0.0 ps.pose.orientation.z = 0.2 ps.pose.orientation.w = 0.1 # rospy.loginfo(ps) # pub.publish(ps) # while not rospy.is_shutdown(): # # hello_str = "hello world %s" % rospy.get_time() # rospy.loginfo(ps) # #这里把hello_str发布出去了 # pub.publish(ps) # rate.sleep() i = 0 while i < 2: rospy.loginfo(ps) pub.publish(ps) rate.sleep() i += 1 def mqttPublish(self, command): print (json.dumps(command)) #要發布的主題和內容 topic = 'AISKY/AppleFarm/MK-G/b8:27:eb:b4:59:3e' self.client.publish(topic, json.dumps(command)) def workOn(self): #ROS結果回調函數 def result_callback(data): rospy.loginfo(rospy.get_caller_id() + "I heard %s", data) self.goalStatus = data.status.text print(self.goalStatus) #ROS目標位置回調函數 def pose_callback(data): # rospy.loginfo(rospy.get_caller_id() + "I heard %s", data) print(data.pose.pose.position.x, data.pose.pose.position.y, sep=',') self.currentPoseX = data.pose.pose.position.x self.currentPoseY = data.pose.pose.position.y #ROS訂閱 def listener(): rospy.init_node('goalAction', anonymous=False) # 这里初始化了一个订阅器,订阅了move_base/result这个主题,接收的消息类型为MoveBaseActionResult,每次接收到消息,就会开一个新线程来呼叫callback,每个订阅器只能有一个callback,另外似乎可以做到通过一个订阅器订阅多个主题或多个类型不同的消息,但我认为应该尽量避免这么做,尤其是涉及时间敏感度比较高的主题的时候,消息时间同步这类的事情很难处理 rospy.Subscriber("move_base/result", MoveBaseActionResult, result_callback) #初始化目標點位置訂閱器,獲取車子當下座標位置 rospy.Subscriber("amcl_pose", PoseWithCovarianceStamped, pose_callback) # spin() simply keeps python from exiting until this node is stopped # spin()的功能是让程序在手动停止前一直循环 # rospy.spin() # MQTT當地端程式連線伺服器得到回應時,要做的動作 def on_connect(client, userdata, flags, rc): print("Waiting for MQTT message...") # 將訂閱主題寫在on_connet中 # 如果我們失去連線或重新連線時 # 地端程式將會重新訂閱 client.subscribe('AISKY/AppleFarm/MK-G/b8:27:eb:b4:59:3e/Log') # MQTT當接收到從伺服器發送的訊息時要進行的動作 def on_message(client, userdata, msg): msg = msg def message_job(msg): db = pymysql.connect(host='52.69.200.169', port=3306, user='cowdungcart', password='skyeye', database='CowdungCart', charset='utf8') cursor = db.cursor() # 轉換編碼utf-8才看得懂中文 print(msg.topic + " " + msg.payload.decode('utf-8')) payload = msg.payload.decode('utf-8') current_time = dt.now() time = dt.strftime(current_time, "%Y-%m-%d %H:%M:%S") if payload: p = json.loads(payload) if p['command'] == 'sonic': sql = "insert into actuator(sonic, datetime) values('%s', '%s')" % (p['rqnn'], time) cursor.execute(sql) db.commit() else: pass cursor.close() db.close() self.pool.submit(message_job, msg) # message_thread = threading.Thread(target=message_job, args=(msg,)) # message_thread.daemon = True # message_thread.start() def thread_job(self): # 設定連線的動作 self.client.on_connect = on_connect # 設定接收訊息的動作 self.client.on_message = on_message # 設定登入帳號密碼 self.client.username_pw_set("aisky-client", "aiskyc") # 設定連線資訊(IP, Port, 連線時間) self.client.connect("60.250.156.234", 1883, 60) # 開始連線,執行設定的動作和處理重新連線問題 # 也可以手動使用其他loop函式來進行連接 self.client.loop_forever() #創建執行序監聽MQTT訂閱消息 mqtt_subscribe_thread = threading.Thread(target=thread_job, args=(self,)) mqtt_subscribe_thread.daemon = True mqtt_subscribe_thread.start() #睡1秒等執行敘跑完才跑主程式 sl(1) #創建ROS訂閱 listener() self.queryMySQL() self.cursor.close() self.db.close() self.pool.shutdown(wait=True) if __name__ == '__main__': cowdungcart = CowdungCartSchedule() try: cowdungcart.workOn() except KeyboardInterrupt: cowdungcart.pool.shutdown(wait=True) cowdungcart.cursor.close() cowdungcart.db.close()