#!/usr/bin/env python # -*- coding: UTF-8 -*- """ A simple example for Reinforcement Learning using table lookup Q-learning method. An agent "o" is on the left of a 1 dimensional world, the treasure is on the rightmost location. Run this program and to see how the agent will improve its strategy of finding the treasure. View more on my tutorial page: https://morvanzhou.github.io/tutorials/ """ import numpy as np import pandas as pd import time import smbus import math import sympy from sympy import asin, cos, sin, acos,tan ,atan #from DFRobot_RaspberryPi_A02YYUW import DFRobot_A02_Distance as Board import paho.mqtt.client as mqtt import json np.random.seed(2) # reproducible N_STATES = ['raise_time<1','1<=raise_time<2','2<=raise_time<4','raise_time>4','0<=overshoot<0.33','0.335.5 goal=320 #goal ACTIONS = ['kp+1','kp+0.1','kp+0.01', 'kp+0','kp-0.01','kp-0.1','kp-1','ki+0.1','ki+0.01', 'ki+0','ki-0.01','ki-0.1','kd+0.1', 'kd+0','kd-0.1'] # available actions EPSILON = 0.9 # greedy police ALPHA = 0.1 # learning rate GAMMA = 0.9 # discount factor MAX_EPISODES =1 # maximum episodes FRESH_TIME = 0.1 # fresh time for one move kp=0.0 ki=0.0 kd=0.0 count=50 x=0 jsonmsg="" pwm_1=1600 S_="" p_prev_error=0 p_integral=0 def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) client.subscribe("b8:27:eb:eb:21:13/Log", qos=2) # 當接收到從伺服器發送的訊息時要進行的動作 def on_message(client, userdata, msg): # 轉換編碼utf-8才看得懂中文 global jsonmsg global x msg.payload = msg.payload.decode('utf-8') jsonmsg = json.loads(msg.payload) x=int(jsonmsg['x']) class PCA9685: # Registers/etc. __SUBADR1 = 0x02 __SUBADR2 = 0x03 __SUBADR3 = 0x04 __MODE1 = 0x00 __PRESCALE = 0xFE __LED0_ON_L = 0x06 __LED0_ON_H = 0x07 __LED0_OFF_L = 0x08 __LED0_OFF_H = 0x09 __ALLLED_ON_L = 0xFA __ALLLED_ON_H = 0xFB __ALLLED_OFF_L = 0xFC __ALLLED_OFF_H = 0xFD def __init__(self, address=0x60, debug=False): self.bus = smbus.SMBus(1) self.address = address self.debug = debug if (self.debug): print("Reseting PCA9685") self.write(self.__MODE1, 0x00) def write(self, reg, value): "Writes an 8-bit value to the specified register/address" self.bus.write_byte_data(self.address, reg, value) if (self.debug): print("I2C: Write 0x%02X to register 0x%02X" % (value, reg)) def read(self, reg): "Read an unsigned byte from the I2C device" result = self.bus.read_byte_data(self.address, reg) if (self.debug): print("I2C: Device 0x%02X returned 0x%02X from reg 0x%02X" % (self.address, result & 0xFF, reg)) return result def setPWMFreq(self, freq): "Sets the PWM frequency" prescaleval = 25000000.0 # 25MHz prescaleval /= 4096.0 # 12-bit prescaleval /= float(freq) prescaleval -= 1.0 if (self.debug): print("Setting PWM frequency to %d Hz" % freq) print("Estimated pre-scale: %d" % prescaleval) prescale = math.floor(prescaleval + 0.5) if (self.debug): print("Final pre-scale: %d" % prescale) oldmode = self.read(self.__MODE1); newmode = (oldmode & 0x7F) | 0x10 # sleep self.write(self.__MODE1, newmode) # go to sleep self.write(self.__PRESCALE, int(math.floor(prescale))) self.write(self.__MODE1, oldmode) time.sleep(0.005) self.write(self.__MODE1, oldmode | 0x80) def setPWM(self, channel, on, off): "Sets a single PWM channel" self.write(self.__LED0_ON_L + 4 * channel, on & 0xFF) self.write(self.__LED0_ON_H + 4 * channel, on >> 8) self.write(self.__LED0_OFF_L + 4 * channel, off & 0xFF) self.write(self.__LED0_OFF_H + 4 * channel, off >> 8) if (self.debug): print("channel: %d LED_ON: %d LED_OFF: %d" % (channel, on, off)) def setServoPulse(self, channel, pulse): "Sets the Servo Pulse,The PWM frequency must be 50HZ" pulse = pulse * 4096 / 20000 # PWM frequency is 50HZ,the period is 20000us self.setPWM(channel, 0, int(pulse)) class PIDController: def __init__(self, Kp, Ki, Kd): self.Kp = Kp self.Ki = Ki self.Kd = Kd self.last_error = 0 self.integral = 0 def control(self, error): output = self.Kp * error + self.Ki * self.integral + self.Kd * (error - self.last_error) self.integral += error self.last_error = error return output class Any_System: def __init__(self, goal): self.target = goal self.current = 0 def update(self, control_singal): self.current += control_singal return self.current def get_error(self): return self.target - self.current def train(controller,system,num_iterations): global jsonmsg global x global S_,R errors=[] raise_time=0 cont=0 for _ in range(num_iterations): #error = system.get_error() raise_time+=1 current=x print(raise_time,current) error=system.target-current # 真實訊號 output=controller.control(error) current+= output pwm_3=0.6156*current+1396 if(pwm_3<1500): pwm_3=1500 elif(pwm_3>1750): pwm_3=1750 pwm.setServoPulse(14,pwm_3) #底部馬達置中 time.sleep(0.5) #control_signal=controller.control(error) #current=system.update(control_signal) if ((current-system.target)>0): cont=raise_time break else: cont=50 if cont<=10: S_= N_STATES[0] R=5 print('raise_time success') elif (101750): pwm_3=1750 pwm.setServoPulse(14,pwm_3) #底部馬達置中 time.sleep(0.5) over_time+=1 if(value>ot): ot=value print(over_time,ot) over_shoot=float(abs(ot-system.target))/320 print("overshoot",str(over_shoot)) if over_shoot>=0 and over_shoot < 0.0625: print('overshoot success') S_ = N_STATES[4] R = 5 elif (0.0625 <= over_shoot) and (over_shoot <1): S_ = N_STATES[5] R = -1*over_shoot else: S_ = N_STATES[0] R = 0 return S_, R def train3(controller,system,num_iterations): global x global S_,R errors=[] cont=0 setingtime=0 con=0 print("3") for _ in range(num_iterations): cont=cont+1 current = x error = system.target - current # 真實訊號 output=controller.control(error) current+= output pwm_3=0.6156*current+1396 if(pwm_3<1500): pwm_3=1500 elif(pwm_3>1750): pwm_3=1750 pwm.setServoPulse(14,pwm_3) #底部馬達置中 time.sleep(0.5) print(cont,error) if ((-1*error)>=0) and ((-1*error)<=5) and (con==0): setingtime =cont con=1 elif(con==0): setingtime=40 print(setingtime) if setingtime>=0 and setingtime < 10: S_ = N_STATES[6] R = 10 print('setingtime success') with open('pid.txt', 'a') as f: f.write('kp:') f.write(str(controller.Kp)) f.write('ki:') f.write(str(controller.Ki)) f.write('kd:') f.write(str(controller.Kd)) f.write('\r\n') elif (10 <= setingtime) and (setingtime <= 40): S_ = N_STATES[7] R = 1.5 -1*(error/100) else: S_ = N_STATES[4] R = -1 return S_, R def build_q_table(n_states, actions): try: table = pd.read_csv("/home/pi/pid.csv",index_col=0) except: table = pd.DataFrame( np.zeros((len(n_states), len(actions))), # q_table initial values columns=actions, index=n_states, # actions's name ) print(table) # show table return table def choose_action(state, q_table): # This is how to choose an action state_actions = q_table.loc[state, :] if (np.random.uniform() > EPSILON) or ((state_actions == 0).all()): # act non-greedy or state-action have no value ACT = ['kp+1', 'kp+0.1', 'kp+0.01', 'kp+0', 'kp-0.01', 'kp-0.1', 'kp-1'] action_name = np.random.choice(ACT) else: # act greedy action_name = state_actions.idxmax() # replace argmax to idxmax as argmax means a different function in newer version of pandas return action_name def choose_action1(state, q_table): # This is how to choose an action state_actions = q_table.loc[state, :] if (np.random.uniform() > EPSILON) or ((state_actions == 0).all()): # act non-greedy or state-action have no value ACT = ['kp+1', 'kp+0.1', 'kp+0.01', 'kp+0', 'kp-0.01', 'kp-0.1', 'kp-1'] action_name = np.random.choice(ACT) else: # act greedy action_name = state_actions.idxmax() # replace argmax to idxmax as argmax means a different function in newer version of pandas return action_name def choose_action2(state, q_table): # This is how to choose an action state_actions = q_table.loc[state, :] print("3") if (np.random.uniform() > EPSILON) or ((state_actions == 0).all()): # act non-greedy or state-action have no value ACT = [ 'ki+0.1', 'ki+0.01', 'ki+0', 'ki-0.1', 'ki-0.01'] action_name = np.random.choice(ACT) else: # act greedy action_name = state_actions.idxmax() # replace argmax to idxmax as argmax means a different function in newer version of pandas print(action_name) return action_name def pid(kp): global goal,count global S_ R=0 print("raisetime") pid_controller = PIDController(kp,0.0,0.0) any_system = Any_System(goal) S_,R = train(pid_controller, any_system,count) print('kp:',kp) return S_,R def pid1(kp): global S_ R=0 print("overshoot") pid_controller = PIDController(kp, 0.0, 0.0) any_system = Any_System(goal) S_, R = train2(pid_controller, any_system, count) print('kp:', kp) return S_,R def pid2(kp,ki,kd): global S_ R=0 print("setingtime") pid_controller = PIDController(kp,ki,kd) any_system = Any_System(goal) S_, R = train3(pid_controller, any_system, count) print('kp:', kp,'ki',ki,'kd',kd) return S_,R def get_env_feedback(S, A): # This is how agent will interact with the environment global kp,S_ R=0 if A == 'kp+1': # move right kp+=1 S_,R=pid(kp) elif A == 'kp+0.1': # move right kp+=0.1 S_,R=pid(kp) elif A == 'kp+0.01': # move right kp+=0.01 S_,R=pid(kp) elif A=='kp+0': kp=kp+0 S_,R= pid(kp) elif A == 'kp-0.01': # move right kp-=0.01 S_,R=pid(kp) elif A == 'kp-0.1': # move right kp-=0.1 S_,R=pid(kp) elif A == 'kp-1': kp-=1 S_,R= pid(kp) return S_, R def get_env_feedback1(S, A): # This is how agent will interact with the environment global kp,S_ R=0 if A == 'kp+1': # move right kp+=1 S_,R=pid1(kp) elif A == 'kp+0.1': # move right kp+=0.1 S_,R=pid1(kp) elif A == 'kp+0.01': # move right kp+=0.01 S_,R=pid1(kp) elif A=='kp+0': kp=kp+0 S_,R= pid1(kp) elif A == 'kp-0.01': # move right kp-=0.01 S_,R=pid1(kp) elif A == 'kp-0.1': # move right kp-=0.1 S_,R=pid1(kp) elif A == 'kp-1': # move right kp-=1 S_,R=pid1(kp) return S_, R def get_env_feedback2(S, A): # This is how agent will interact with the environment global ki global kp global kd,S_ R=0 if A == 'ki+0.1': # move right ki+=0.1 S_,R=pid2(kp,ki,kd) elif A == 'ki+0.01': # move right ki+=0.01 S_,R=pid2(kp,ki,kd) elif A=='ki+0': ki=ki+0 S_,R= pid2(kp,ki,kd) elif A == 'ki-0.1': # move right ki-=0.1 S_,R=pid2(kp,ki,kd) elif A == 'ki-0.01': # move right ki-=0.01 S_,R=pid2(kp,ki,kd) elif A == 'kd+0.1': # move right kd+=0.1 S_,R=pid2(kp,ki,kd) elif A=='kd+0': kd=kd+0 S_,R= pid2(kp,ki,kd) elif A == 'kd-0.1': # move right kd-=0.1 S_,R=pid2(kp,ki,kd) return S_, R def update_env(S, episode, step_counter): # This is how environment be updated interaction = 'Episode %s: raise_time= %s' % (episode + 1,S) #print('\r{}'.format(interaction), end='') #print('Episode %s: raise_time= %s\r\n' % (episode + 1,S)) def rl(): global x,y,z # main part of RL loop q_table = build_q_table(N_STATES, ACTIONS) for episode in range(MAX_EPISODES): S = N_STATES[3] is_terminated = False pwm.setServoPulse(14, 1600) # 底部馬達置中 time.sleep(5) while not is_terminated: #update_env(S, episode, step_counter) if S==N_STATES[3] or S==N_STATES[2] or S==N_STATES[1] or S==N_STATES[0]: pwm.setServoPulse(14, 1600) # 底部馬達置中 time.sleep(3) A = choose_action(S, q_table) S_, R = get_env_feedback(S, A) # take action & get next state and reward q_predict = q_table.loc[S, A] q_target = R + GAMMA * q_table.loc[S_, :].max() # next state is not terminal q_table.loc[S, A] += ALPHA * (q_target - q_predict) # update print(q_table) S = S_ # move to next state #update_env(S, episode, step_counter) #step_counter += 1 if S==N_STATES[0]: S=N_STATES[5] elif S == N_STATES[4] or S == N_STATES[5]: pwm.setServoPulse(14, 1600) # 底部馬達置中 time.sleep(3) A = choose_action1(S, q_table) S_, R = get_env_feedback1(S, A) # take action & get next state and reward q_predict = q_table.loc[S, A] print(q_table) q_target = R + GAMMA * q_table.loc[S_, :].max() # next state is not terminal q_table.loc[S, A] += ALPHA * (q_target - q_predict) # update S = S_ # move to next state #update_env(S, episode, step_counter) #step_counter += 1 if S==N_STATES[4]: S=N_STATES[7] elif S == N_STATES[6] or S == N_STATES[7] : pwm.setServoPulse(14, 1600) # 底部馬達置中 print("ok") time.sleep(3) A = choose_action2(S, q_table) S_, R = get_env_feedback2(S, A) # take action & get next state and reward q_predict = q_table.loc[S, A] print(q_table) q_target = R + GAMMA * q_table.loc[S_, :].max() # next state is not terminal q_table.loc[S, A] += ALPHA * (q_target - q_predict) # update S = S_ # move to next state if S == N_STATES[6]: is_terminated = True #update_env(S, episode, step_counter ) return q_table def coordinate_pwm_pid(): global p_prev_error,p_integral,x # ---------------------------------------- #計算各關節角度 while True: current= x p_error=320-current p_output=2.0599999999999987*p_error+ 0.1*p_integral+0.00*(p_error - p_prev_error) p_integral +=p_error p_prev_error=p_error current+= p_output pwm_1=0.6156*current+1396 print(pwm_1) if(pwm_1<1500): pwm_1=1500 elif(pwm_1>1750): pwm_1=1750 pwm.setServoPulse(14,pwm_1) time.sleep(0.1) if __name__ == "__main__": pwm = PCA9685(0x60, debug=False) pwm.setPWMFreq(50) #board = Board() #dis_min = 0 #Minimum ranging threshold: 0mm #dis_max = 4500 #Highest ranging threshold: 4500mm #board.set_dis_range(dis_min, dis_max) client = mqtt.Client() client.username_pw_set(username='aisky-client', password='aiskyc') client.connect("60.250.156.234", 1883, 60) client.on_connect = on_connect client.on_message = on_message client.loop_start() #a = raw_input("input:") a="t" if (a=="a"): q_table = rl() print('\r\nQ-table:\n') print(q_table) q_table.to_csv("/home/pi/pid.csv") else: coordinate_pwm_pid()