| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583 | #!/usr/bin/env python# -*- coding: UTF-8 -*-"""A simple example for Reinforcement Learning using table lookup Q-learning method.An agent "o" is on the left of a 1 dimensional world, the treasure is on the rightmost location.Run this program and to see how the agent will improve its strategy of finding the treasure.View more on my tutorial page: https://morvanzhou.github.io/tutorials/"""import numpy as npimport pandas as pdimport timeimport smbusimport mathimport sympyfrom sympy import asin, cos, sin, acos,tan ,atanfrom DFRobot_RaspberryPi_A02YYUW import DFRobot_A02_Distance as Boardnp.random.seed(2)  # reproducibleN_STATES = ['raise_time<1','1<=raise_time<2','2<=raise_time<4','raise_time>4','0<=overshoot<0.33','0.33<overshoot<1','10<=setingtime<20','20<=setingtime<30']  ## 1:time<5      2:5.05<time<5.25    3:5.25<time<5.5  4:time>5.5goal=16     #goalACTIONS = ['kp+1','kp+0.1','kp+0.01', 'kp+0','kp-0.01','kp-0.1','kp-1','ki+0.1','ki+0.01', 'ki+0','ki-0.01','ki-0.1','kd+0.01', 'kd+0','kd-0.01']     # available actionsEPSILON = 0.9   # greedy policeALPHA = 0.1     # learning rateGAMMA = 0.9    # discount factorMAX_EPISODES =1 # maximum episodesFRESH_TIME = 0.1    # fresh time for one movekp=0.0ki=0.0kd=0.0count=50y=23.5class PCA9685:  # Registers/etc.  __SUBADR1            = 0x02  __SUBADR2            = 0x03  __SUBADR3            = 0x04  __MODE1              = 0x00  __PRESCALE           = 0xFE  __LED0_ON_L          = 0x06  __LED0_ON_H          = 0x07  __LED0_OFF_L         = 0x08  __LED0_OFF_H         = 0x09  __ALLLED_ON_L        = 0xFA  __ALLLED_ON_H        = 0xFB  __ALLLED_OFF_L       = 0xFC  __ALLLED_OFF_H       = 0xFD  def __init__(self, address=0x60, debug=False):      self.bus = smbus.SMBus(1)      self.address = address      self.debug = debug      if (self.debug):          print("Reseting PCA9685")      self.write(self.__MODE1, 0x00)  def write(self, reg, value):      "Writes an 8-bit value to the specified register/address"      self.bus.write_byte_data(self.address, reg, value)      if (self.debug):          print("I2C: Write 0x%02X to register 0x%02X" % (value, reg))  def read(self, reg):      "Read an unsigned byte from the I2C device"      result = self.bus.read_byte_data(self.address, reg)      if (self.debug):          print("I2C: Device 0x%02X returned 0x%02X from reg 0x%02X" % (self.address, result & 0xFF, reg))      return result  def setPWMFreq(self, freq):      "Sets the PWM frequency"      prescaleval = 25000000.0  # 25MHz      prescaleval /= 4096.0  # 12-bit      prescaleval /= float(freq)      prescaleval -= 1.0      if (self.debug):          print("Setting PWM frequency to %d Hz" % freq)          print("Estimated pre-scale: %d" % prescaleval)      prescale = math.floor(prescaleval + 0.5)      if (self.debug):          print("Final pre-scale: %d" % prescale)      oldmode = self.read(self.__MODE1);      newmode = (oldmode & 0x7F) | 0x10  # sleep      self.write(self.__MODE1, newmode)  # go to sleep      self.write(self.__PRESCALE, int(math.floor(prescale)))      self.write(self.__MODE1, oldmode)      time.sleep(0.005)      self.write(self.__MODE1, oldmode | 0x80)  def setPWM(self, channel, on, off):      "Sets a single PWM channel"      self.write(self.__LED0_ON_L + 4 * channel, on & 0xFF)      self.write(self.__LED0_ON_H + 4 * channel, on >> 8)      self.write(self.__LED0_OFF_L + 4 * channel, off & 0xFF)      self.write(self.__LED0_OFF_H + 4 * channel, off >> 8)      if (self.debug):          print("channel: %d  LED_ON: %d LED_OFF: %d" % (channel, on, off))  def setServoPulse(self, channel, pulse):      "Sets the Servo Pulse,The PWM frequency must be 50HZ"      pulse = pulse * 4096 / 20000  # PWM frequency is 50HZ,the period is 20000us      self.setPWM(channel, 0, int(pulse))      class PIDController:    def __init__(self, Kp, Ki, Kd):        self.Kp = Kp        self.Ki = Ki        self.Kd = Kd        self.last_error = 0        self.integral = 0    def control(self, error):        output = self.Kp * error + self.Ki * self.integral + self.Kd * (error - self.last_error)        self.integral += error        self.last_error = error        return output    def ctr_pwm(self,x,y,z):            pwm_1=1750            i = 0            j = 0            # ----------------------------------------            r1 = 8            r2 = 8            # 計算各關節角度            x = (x - 320) / 100            y = float(y)            z = ((240 - z) / 100 )+ 5.3            enable = 1            print("x=", x)            if (x < -19 or x > 19):  # 超出範圍離開                print("x over range")                enable = 0                # break            # y= input("input y (0~19):")            print("y=", y)            if (y < 0 or y > 19):  # 超出範圍離開                print("y over range")                enable = 0                # break            # z= input("input z (3~11):")            print("z=", z)            z2 = z - 3            if (z2 < 0 or z2 > 8):  # 超出範圍離開                print("z over range")                enable = 0            L = math.sqrt(x * x + y * y) - 8  # 實際夾子前端馬達6.5右邊馬達為1.5公分            print("L=", L)            if (L < 2):  # 超出範圍離開                print("L over range")                enable = 0            h1 = L * L + z2 * z2 + 2 * r1 * L + r1 * r1 - r2 * r2            # print("h1=",h1)            h2 = -4 * r1 * z2            # print("h2=",h2)            h3 = L * L + z2 * z2 - 2 * r1 * L + r1 * r1 - r2 * r2            # print("h3=",h3)            try:                Theta = 2 * atan((-h2 + math.sqrt(h2 * h2 - 4 * h1 * h3)) / (2 * h1))  # 無法得出角度離開                # print("Theta=",Theta,math.degrees(Theta))            except:                print("error")                enable = 0            try:                Gamma = asin((z2 - r1 * sin(Theta)) / r2)  # 無法得出角度離開                # print("Gamma=",Gamma,math.degrees(Gamma))            except:                print("error")                enable = 0            try:                tt = x / (r1 * cos(Theta) + r2 * cos(Gamma) + 8)                print(tt)                if (tt >= 1):                    tt = 1                if (tt <= -1):                    tt = -1                Beta = acos(tt)  # 無法得出角度離開                # print("Beta=",Beta,math.degrees(Beta))            except:                print("error")                enable = 0            if (enable == 1):                # Gamma角度轉換為PWM(右邊馬達)                pwm_3 = -12.222 * (-math.degrees(Gamma)) + 2300                print("pwm_3", pwm_3)                error=pwm_3-pwm_1                print("error",error)                output = self.Kp * error + self.Ki * self.integral + self.Kd * (error - self.last_error)                self.integral += error                self.last_error = error                pwm_1+=output*10                if pwm_1>1700 and pwm_1<2300:                    pwm_1=pwm_1                elif pwm_1>2300:                     pwm_1=2300                             print("output kp ki kd",pwm_1,self.Kp,self.Ki,self.Kd)                pwm.setServoPulse(0,2300)  #前面馬達開夾子                pwm.setServoPulse(1,1750)  #右邊馬達45度                pwm.setServoPulse(14,1600) #底部馬達置中                pwm.setServoPulse(15, pwm_1)class Any_System:    def __init__(self, goal):        self.target = goal        self.current = 0    def update(self, control_singal):        self.current += control_singal        return self.current    def get_error(self):        return self.target - self.currentdef train(S,controller,system,num_iterations):    errors=[]    raise_time=0    for _ in range(num_iterations):        #error = system.get_error()        current =23.5-(board.getDistance()/10)        with open('dis1.txt', 'a') as f:            f.write(str(current))            f.write('\r\n')        error=system.target-current # 真實訊號        controller.ctr_pwm(320,current,240)        #control_signal=controller.control(error)        #current=system.update(control_signal)        errors.append(error)        #time.sleep(0.1)        raise_time+=1        S_ = N_STATES[3]        if error >= 0 and error <= system.target:            R = 0        elif error > system.target:            R = -1        elif error < 0:            R = -1        print(raise_time,current)        if current>system.target:            print('time',raise_time)            if raise_time<10:                S_= N_STATES[0]                R=5            elif (10<=raise_time) and (raise_time<20):                S_= N_STATES[1]                R=3            elif (20<= raise_time) and (raise_time < 40):                S_= N_STATES[2]                R=2            else:                S_= N_STATES[3]                if  error>0 and error < system.target:                    R = 0                elif error >system.target:                    R = -1                elif error <0:                    R=-1            return S_, R    return  S_,Rdef train2(S,controller,system,num_iterations):    errors=[]    current_arr=[]    overshoot_value=[]    for _ in range(num_iterations):        #error = system.get_error()        current = 23.5 - (board.getDistance() / 10)        with open('dis2.txt', 'a') as f:            f.write(str(current))            f.write('\r\n')        error = system.target - current  # 真實訊號        controller.ctr_pwm(320,current,240)        #control_signal=controller.control(error)        #current=system.update(control_signal)        errors.append(error)        #time.sleep(0.1)        current_arr.append(current)    for i in range(num_iterations):        if (current_arr[i]-system.target>=0):            overshoot_value.append((current_arr[i] - system.target) / system.target)        print(i,current_arr[i])    #min(temp_arr[9:19])    #print(min(temp_arr[9:19]))    #overshoot=abs((min(temp_arr[9:19])-30)/30)    try:        overshoot=max(overshoot_value)    except:        overshoot =1    print(overshoot)    if overshoot>=0 and overshoot < 0.0625:        print('overshoot success')        S_ = N_STATES[4]        R = 2    elif (0.0625 <= overshoot) and (overshoot < 1):        S_ = N_STATES[5]        R = 1    else:        S_ = N_STATES[0]        R = 0    return S_, Rdef train3(S,controller,system,num_iterations):    errors=[]    current_arr=[]    for _ in range(num_iterations):        #error = system.get_error()        current = 23.5 - (board.getDistance() / 10)        with open('dis3.txt', 'a') as f:            f.write(str(current))            f.write('\r\n')        error = system.target - current  # 真實訊號        controller.ctr_pwm(320,current,240)        #control_signal=controller.control(error)        #current=system.update(control_signal)        errors.append(error)        #time.sleep(0.1)        current_arr.append(current)    if (abs(current_arr[10]-system.target)) < 5:        setingtime =10    elif (abs(current_arr[20]-system.target)) < 5:        setingtime =20    elif (abs(current_arr[30]-system.target)) < 5:        setingtime =30    else:        setingtime=31    for i in range(9,49):        if (abs(current_arr[i] - system.target))>5:            setingtime=31    print(setingtime)    if setingtime>=10 and setingtime < 20:        S_ = N_STATES[6]        R = 2        print('setingtime success')        with open('pid.txt', 'a') as f:            f.write('kp:')            f.write(str(controller.Kp))            f.write('ki:')            f.write(str(controller.Ki))            f.write('kd:')            f.write(str(controller.Kd))            f.write('\r\n')    elif (20 <= setingtime) and (setingtime < 30):        S_ = N_STATES[7]        R = 1    else:        S_ = N_STATES[4]        R = 0    return S_, Rdef build_q_table(n_states, actions):    try:      table = pd.read_csv("/home/pi/pid.csv",index_col=0)    except:     table = pd.DataFrame(        np.zeros((len(n_states), len(actions))),     # q_table initial values        columns=actions, index=n_states,   # actions's name     )    print(table)    # show table    return tabledef choose_action(state, q_table):    # This is how to choose an action    state_actions = q_table.loc[state, :]    if (np.random.uniform() > EPSILON) or ((state_actions == 0).all()):  # act non-greedy or state-action have no value        ACT = ['kp+1', 'kp+0.1', 'kp+0.01', 'kp+0', 'kp-0.01', 'kp-0.1', 'kp-1']        action_name = np.random.choice(ACT)    else:   # act greedy        action_name = state_actions.idxmax()    # replace argmax to idxmax as argmax means a different function in newer version of pandas    return action_namedef choose_action1(state, q_table):    # This is how to choose an action    state_actions = q_table.loc[state, :]    if (np.random.uniform() > EPSILON) or ((state_actions == 0).all()):  # act non-greedy or state-action have no value        ACT = ['kp+1', 'kp+0.1', 'kp+0.01', 'kp+0', 'kp-0.01', 'kp-0.1', 'kp-1']        action_name = np.random.choice(ACT)    else:   # act greedy        action_name = state_actions.idxmax()    # replace argmax to idxmax as argmax means a different function in newer version of pandas    return action_namedef choose_action2(state, q_table):    # This is how to choose an action    state_actions = q_table.loc[state, :]    if (np.random.uniform() > EPSILON) or ((state_actions == 0).all()):  # act non-greedy or state-action have no value        ACT = [ 'ki+0.1', 'ki+0.01', 'ki+0', 'ki-0.01', 'ki-0.1','kd+0.01', 'kd+0','kd-0.01']        action_name = np.random.choice(ACT)    else:   # act greedy        action_name = state_actions.idxmax()    # replace argmax to idxmax as argmax means a different function in newer version of pandas    return action_namedef pid(S,kp):    global  goal,count    print('kp:',kp)    pid_controller = PIDController(kp,0.0,0.0)    any_system = Any_System(goal)    S_,R = train(S,pid_controller, any_system,count)    return  S_,Rdef pid1(S,kp):    print("overshoot")    pid_controller = PIDController(kp, 0.0, 0.0)    any_system = Any_System(goal)    S_, R = train2(S, pid_controller, any_system, count)    print('kp:', kp)    return  S_,Rdef pid2(S,kp,ki,kd):    print("setingtime")    pid_controller = PIDController(kp,ki,kd)    any_system = Any_System(goal)    S_, R = train3(S, pid_controller, any_system, count)    print('kp:', kp,'ki',ki,'kd',kd)    return  S_,Rdef get_env_feedback(S, A):    # This is how agent will interact with the environment    global kp    if A == 'kp+1':    # move right          kp+=1          S_,R=pid(S,kp)    elif A == 'kp+0.1':    # move right          kp+=0.1          S_,R=pid(S,kp)    elif A == 'kp+0.01':    # move right          kp+=0.01          S_,R=pid(S,kp)    elif A=='kp+0':          kp=kp+0          S_,R= pid(S,kp)    elif A == 'kp-0.01':    # move right          kp-=0.01          S_,R=pid(S,kp)    elif A == 'kp-0.1':    # move right          kp-=0.1          S_,R=pid(S,kp)    elif A == 'kp-1':          kp-=1          S_,R= pid(S,kp)    return S_, Rdef get_env_feedback1(S, A):    # This is how agent will interact with the environment    global kp    if A == 'kp+1':    # move right          kp+=1          S_,R=pid1(S,kp)    elif A == 'kp+0.1':    # move right          kp+=0.1          S_,R=pid1(S,kp)    elif A == 'kp+0.01':    # move right          kp+=0.01          S_,R=pid1(S,kp)    elif A=='kp+0':          kp=kp+0          S_,R= pid1(S,kp)    elif A == 'kp-0.01':    # move right          kp-=0.01          S_,R=pid1(S,kp)    elif A == 'kp-0.1':    # move right          kp-=0.1          S_,R=pid1(S,kp)    elif A == 'kp-1':          kp-=1          S_,R= pid1(S,kp)    return S_, Rdef get_env_feedback2(S, A):    # This is how agent will interact with the environment    global ki    global kp    global kd    if A == 'ki+0.1':    # move right          ki+=0.1          S_,R=pid2(S,kp,ki,kd)    elif A == 'ki+0.01':    # move right          ki+=0.01          S_,R=pid2(S,kp,ki,kd)    elif A=='ki+0':          ki=ki+0          S_,R= pid2(S,kp,ki,kd)    elif A == 'ki-0.01':    # move right          ki-=0.01          S_,R=pid2(S,kp,ki,kd)    elif A == 'ki-0.1':    # move right          ki-=0.1          S_,R=pid2(S,kp,ki,kd)    elif A == 'kd+0.01':    # move right          kd+=0.01          S_,R=pid2(S,kp,ki,kd)    elif A=='kd+0':          kd=kd+0          S_,R= pid2(S,kp,ki,kd)    elif A == 'kd-0.01':    # move right          kd-=0.01          S_,R=pid2(S,kp,ki,kd)    return S_, Rdef update_env(S, episode, step_counter):    # This is how environment be updated    interaction = 'Episode %s: raise_time= %s' % (episode + 1,S)    #print('\r{}'.format(interaction), end='')    #print('Episode %s: raise_time= %s\r\n' % (episode + 1,S))def rl():    # main part of RL loop    global x, y, z, w    q_table = build_q_table(N_STATES, ACTIONS)    for episode in range(MAX_EPISODES):        S = N_STATES[3]        is_terminated = False        while not is_terminated:           x = 320           z = 240           y = 15.5           pwm.setServoPulse(0, 1100)  # 前面馬達開夾子           pwm.setServoPulse(1, 1750)  # 右邊馬達45度           pwm.setServoPulse(14, 1600)  # 底部馬達置中           pwm.setServoPulse(15, 1700)  # 左邊馬達垂直           time.sleep(1)           pwm.setServoPulse(0, 0)           pwm.setServoPulse(1, 0)           pwm.setServoPulse(14, 0)           pwm.setServoPulse(15, 0)           #update_env(S, episode, step_counter)           if S==N_STATES[3] or S==N_STATES[2] or S==N_STATES[1] or S==N_STATES[0]:              A = choose_action(S, q_table)              S_, R = get_env_feedback(S, A)  # take action & get next state and reward              q_predict = q_table.loc[S, A]              q_target = R + GAMMA * q_table.loc[S_, :].max()   # next state is not terminal              q_table.loc[S, A] += ALPHA * (q_target - q_predict)  # update              print(q_table)              S = S_  # move to next state              #update_env(S, episode, step_counter)              #step_counter += 1              if S==N_STATES[0]:                  S=N_STATES[5]           elif  S == N_STATES[4] or S == N_STATES[5]:               print("raise_time success")               A = choose_action1(S, q_table)               S_, R = get_env_feedback1(S, A)  # take action & get next state and reward               q_predict = q_table.loc[S, A]               q_target = R + GAMMA * q_table.loc[S_, :].max()  # next state is not terminal               q_table.loc[S, A] += ALPHA * (q_target - q_predict)  # update               S = S_  # move to next state               #update_env(S, episode, step_counter)               #step_counter += 1               if S==N_STATES[4]:                  S=N_STATES[7]           elif  S == N_STATES[6] or S == N_STATES[7] :               A = choose_action2(S, q_table)               S_, R = get_env_feedback2(S, A)  # take action & get next state and reward               q_predict = q_table.loc[S, A]               q_target = R + GAMMA * q_table.loc[S_, :].max()  # next state is not terminal               q_table.loc[S, A] += ALPHA * (q_target - q_predict)  # update               S = S_  # move to next state               if S == N_STATES[6]:                 is_terminated = True               #update_env(S, episode, step_counter )    return q_tableif __name__ == "__main__":    pwm = PCA9685(0x60, debug=False)    pwm.setPWMFreq(50)    board = Board()    dis_min = 0   #Minimum ranging threshold: 0mm    dis_max = 4500 #Highest ranging threshold: 4500mm    board.set_dis_range(dis_min, dis_max)    q_table = rl()    print('\r\nQ-table:\n')    print(q_table)    q_table.to_csv("/home/pi/pid.csv")
 |