DQN系列算法對(duì)連續(xù)空間分布的action心有余而力不足,而Policy Gradient系列的算法能夠有效的預(yù)測連續(xù)的動(dòng)作汁掠。在此基礎(chǔ)上DPG和DDPG算法被提了出來,并且能夠有效地處理連續(xù)動(dòng)作問題卤恳。
Paper:
DPG:Deterministic policy gradient algorithms
DDPG:Continuous Control with Deep Reinforcement Learning
Github:https://github.com/xiaochus/Deep-Reinforcement-Learning-Practice
環(huán)境
- Python 3.6
- Tensorflow-gpu 1.8.0
- Keras 2.2.2
- Gym 0.10.8
DPG
DPG(Deterministic Policy Gradient)確定性行為策略是D.Silver等在2014年提出的萄喳,DPG每一步的行為通過函數(shù)μ直接獲得確定的值。在DPG之前業(yè)界普遍認(rèn)為猾愿,環(huán)境模型無關(guān)的確定性策略是不存在的鹦聪,而D.Silver等通過嚴(yán)密的數(shù)學(xué)推導(dǎo)證明了DPG的存在。根據(jù)DPG論文的證明蒂秘,當(dāng)概率策略的方差趨近于0的時(shí)候泽本,就是確定性策略。
在之前的文章中姻僧,Policy Network通過log損失和discount reward來引策略導(dǎo)梯度的更新规丽,AC方法通過log損失和TD error來引策略導(dǎo)梯度的更新,最后得到的策略都是一個(gè)action的概率分布段化。我們在選擇action的時(shí)候其實(shí)是根據(jù)概率分布進(jìn)行采樣嘁捷,因此Policy Gradient本質(zhì)上是一個(gè)隨機(jī)策略。采用隨機(jī)策略時(shí)显熏,即使在相同的狀態(tài)雄嚣,每次所采取的動(dòng)作也很可能不一樣。而確定性策略能夠得到一個(gè)確定的action。
隨機(jī)策略:π(a∣s)=P[a∣s]
確定性策略:a=μ(s)
DPG的學(xué)習(xí)框架采用AC的方法缓升,DPG求解時(shí)少了重要性權(quán)重鼓鲁,這是因?yàn)橹匾圆蓸邮怯煤唵蔚母怕史植既ス烙?jì)復(fù)雜的概率分布,DPG的action是確定值而不是概率分布港谊。另外DPG的值函數(shù)評(píng)估用的是Q-learning的方法骇吭,即用TD error來估計(jì)動(dòng)作值函數(shù)并忽略重要性權(quán)重。確定性策略AC方法的梯度公式和隨機(jī)策略的梯度公式如下圖所示歧寺。跟隨機(jī)策略梯度相比燥狰,確定性策略少了對(duì)action的積分,多了reward對(duì)action的導(dǎo)數(shù)斜筐。
DDPG
DDPG(Deep Deterministic Policy Gradient)是利用 DQN 擴(kuò)展 Q 學(xué)習(xí)算法的思路對(duì)DPG方法進(jìn)行改造得到的(Actor-Critic龙致,AC)框架的算法,該算法可用于解決連續(xù)動(dòng)作空間上的 DRL 問題顷链。相對(duì)于DPG的核心改進(jìn)是采用卷積神經(jīng)網(wǎng)絡(luò)作為策略函數(shù)μ和Q函數(shù)的函數(shù)近似目代,即策略網(wǎng)絡(luò)和Q網(wǎng)絡(luò);然后使用深度學(xué)習(xí)的方法來訓(xùn)練上述神經(jīng)網(wǎng)絡(luò)嗤练,如下圖所示榛了。
DDPG主要的關(guān)鍵點(diǎn)有以下幾個(gè):
1、DDPG可以看做是Nature DQN煞抬、Actor-Critic和DPG三種方法的組合算法霜大。
2、Critic部分的輸入為states和action此疹。
3僧诚、Actor部分不再使用自己的Loss函數(shù)和Reward進(jìn)行更新,而是使用DPG的思想蝗碎,使用critic部分Q值對(duì)action的梯度來對(duì)actor進(jìn)行更新湖笨。
4、使用了Nature DQN的思想蹦骑,加入了經(jīng)驗(yàn)池慈省、隨機(jī)抽樣和目標(biāo)網(wǎng)絡(luò),real Q值使用兩個(gè)target網(wǎng)絡(luò)共同計(jì)算眠菇。
5边败、target網(wǎng)絡(luò)更新改為軟更新,在每個(gè)batch緩慢更新target網(wǎng)絡(luò)的參數(shù)捎废。
6笑窜、 將ε-greedy探索的方法使用在連續(xù)值采樣上,通過Ornstein-Uhlenbeck process為action添加噪聲登疗。
關(guān)于上述中的actor我們不直接計(jì)算損失而是使用criric的損失排截,我們可以這樣理解:我們的actor的目的是盡量得到一個(gè)高Q值的action嫌蚤,因此actor的損失可以簡單的理解為得到的反饋Q值越大損失越小,得到的反饋Q值越小損失越大断傲。
如下圖公式脱吱,actor(θ)中action對(duì)參數(shù)的梯度為da/dθ,critic中Q對(duì)action的梯度dq/da认罩,最后得到的Q值對(duì)actor(θ)的梯度公式就為-(dq/da * da/dθ)(負(fù)數(shù)的原因是優(yōu)化器的方向?yàn)樽钚』痩oss而我們的目的是最大化Q值)箱蝠。
DDPG的算法流程如下所示:
算法實(shí)現(xiàn)
使用Pendulum來實(shí)驗(yàn)連續(xù)值預(yù)測,keras實(shí)現(xiàn)的DDPG如下所示:
# -*- coding: utf-8 -*-
import os
import random
import gym
from collections import deque
import numpy as np
import tensorflow as tf
from keras.layers import Input, Dense, Lambda, concatenate
from keras.models import Model
from keras.optimizers import Adam
import keras.backend as K
from DRL import DRL
class DDPG(DRL):
"""Deep Deterministic Policy Gradient Algorithms.
"""
def __init__(self):
super(DDPG, self).__init__()
self.sess = K.get_session()
self.env = gym.make('Pendulum-v0')
self.bound = self.env.action_space.high[0]
# update rate for target model.
self.TAU = 0.01
# experience replay.
self.memory_buffer = deque(maxlen=4000)
# discount rate for q value.
self.gamma = 0.95
# epsilon of action selection
self.epsilon = 1.0
# discount rate for epsilon.
self.epsilon_decay = 0.995
# min epsilon of ε-greedy.
self.epsilon_min = 0.01
# actor learning rate
self.a_lr = 0.0001
# critic learining rate
self.c_lr = 0.001
# ddpg model
self.actor = self._build_actor()
self.critic = self._build_critic()
# target model
self.target_actor = self._build_actor()
self.target_actor.set_weights(self.actor.get_weights())
self.target_critic = self._build_critic()
self.target_critic.set_weights(self.critic.get_weights())
# gradient function
self.get_critic_grad = self.critic_gradient()
self.actor_optimizer()
if os.path.exists('model/ddpg_actor.h5') and os.path.exists('model/ddpg_critic.h5'):
self.actor.load_weights('model/ddpg_actor.h5')
self.critic.load_weights('model/ddpg_critic.h5')
def _build_actor(self):
"""Actor model.
"""
inputs = Input(shape=(3,), name='state_input')
x = Dense(40, activation='relu')(inputs)
x = Dense(40, activation='relu')(x)
x = Dense(1, activation='tanh')(x)
output = Lambda(lambda x: x * self.bound)(x)
model = Model(inputs=inputs, outputs=output)
model.compile(loss='mse', optimizer=Adam(lr=self.a_lr))
return model
def _build_critic(self):
"""Critic model.
"""
sinput = Input(shape=(3,), name='state_input')
ainput = Input(shape=(1,), name='action_input')
s = Dense(40, activation='relu')(sinput)
a = Dense(40, activation='relu')(ainput)
x = concatenate([s, a])
x = Dense(40, activation='relu')(x)
output = Dense(1, activation='linear')(x)
model = Model(inputs=[sinput, ainput], outputs=output)
model.compile(loss='mse', optimizer=Adam(lr=self.c_lr))
return model
def actor_optimizer(self):
"""actor_optimizer.
Returns:
function, opt function for actor.
"""
self.ainput = self.actor.input
aoutput = self.actor.output
trainable_weights = self.actor.trainable_weights
self.action_gradient = tf.placeholder(tf.float32, shape=(None, 1))
# tf.gradients will calculate dy/dx with a initial gradients for y
# action_gradient is dq / da, so this is dq/da * da/dparams
params_grad = tf.gradients(aoutput, trainable_weights, -self.action_gradient)
grads = zip(params_grad, trainable_weights)
self.opt = tf.train.AdamOptimizer(self.a_lr).apply_gradients(grads)
self.sess.run(tf.global_variables_initializer())
def critic_gradient(self):
"""get critic gradient function.
Returns:
function, gradient function for critic.
"""
cinput = self.critic.input
coutput = self.critic.output
# compute the gradient of the action with q value, dq/da.
action_grads = K.gradients(coutput, cinput[1])
return K.function([cinput[0], cinput[1]], action_grads)
def OU(self, x, mu=0, theta=0.15, sigma=0.2):
"""Ornstein-Uhlenbeck process.
formula:ou = θ * (μ - x) + σ * w
Arguments:
x: action value.
mu: μ, mean fo values.
theta: θ, rate the variable reverts towards to the mean.
sigma:σ, degree of volatility of the process.
Returns:
OU value
"""
return theta * (mu - x) + sigma * np.random.randn(1)
def get_action(self, X):
"""get actor action with ou noise.
Arguments:
X: state value.
"""
action = self.actor.predict(X)[0][0]
# add randomness to action selection for exploration
noise = max(self.epsilon, 0) * self.OU(action)
action = np.clip(action + noise, -self.bound, self.bound)
return action
def remember(self, state, action, reward, next_state, done):
"""add data to experience replay.
Arguments:
state: observation.
action: action.
reward: reward.
next_state: next_observation.
done: if game done.
"""
item = (state, action, reward, next_state, done)
self.memory_buffer.append(item)
def update_epsilon(self):
"""update epsilon.
"""
if self.epsilon >= self.epsilon_min:
self.epsilon *= self.epsilon_decay
def process_batch(self, batch):
"""process batch data.
Arguments:
batch: batch size.
Returns:
states: states.
actions: actions.
y: Q_value.
"""
y = []
# ranchom choice batch data from experience replay.
data = random.sample(self.memory_buffer, batch)
states = np.array([d[0] for d in data])
actions = np.array([d[1] for d in data])
next_states = np.array([d[3] for d in data])
# Q_target垦垂。
next_actions = self.target_actor.predict(next_states)
q = self.target_critic.predict([next_states, next_actions])
# update Q value
for i, (_, _, reward, _, done) in enumerate(data):
target = reward
if not done:
target += self.gamma * q[i][0]
y.append(target)
return states, actions, y
def update_model(self, X1, X2, y):
"""update ddpg model.
Arguments:
states: states.
actions: actions.
y: Q_value.
Returns:
loss: critic loss.
"""
# loss = self.critic.train_on_batch([X1, X2], y)
loss = self.critic.fit([X1, X2], y, verbose=0)
loss = np.mean(loss.history['loss'])
X3 = self.actor.predict(X1)
a_grads = np.array(self.get_critic_grad([X1, X3]))[0]
self.sess.run(self.opt, feed_dict={
self.ainput: X1,
self.action_gradient: a_grads
})
return loss
def update_target_model(self):
"""soft update target model.
formula:θ??t ← τ * θ + (1?τ) * θt, τ << 1.
"""
critic_weights = self.critic.get_weights()
actor_weights = self.actor.get_weights()
critic_target_weights = self.target_critic.get_weights()
actor_target_weights = self.target_actor.get_weights()
for i in range(len(critic_weights)):
critic_target_weights[i] = self.TAU * critic_weights[i] + (1 - self.TAU) * critic_target_weights[i]
for i in range(len(actor_weights)):
actor_target_weights[i] = self.TAU * actor_weights[i] + (1 - self.TAU) * actor_target_weights[i]
self.target_critic.set_weights(critic_target_weights)
self.target_actor.set_weights(actor_target_weights)
def train(self, episode, batch):
"""training model.
Arguments:
episode: ganme episode.
batch: batch size of episode.
Returns:
history: training history.
"""
history = {'episode': [], 'Episode_reward': [], 'Loss': []}
for i in range(episode):
observation = self.env.reset()
reward_sum = 0
losses = []
for j in range(200):
# chocie action from ε-greedy.
x = observation.reshape(-1, 3)
# actor action
action = self.get_action(x)
observation, reward, done, _ = self.env.step(action)
# add data to experience replay.
reward_sum += reward
self.remember(x[0], action, reward, observation, done)
if len(self.memory_buffer) > batch:
X1, X2, y = self.process_batch(batch)
# update DDPG model
loss = self.update_model(X1, X2, y)
# update target model
self.update_target_model()
# reduce epsilon pure batch.
self.update_epsilon()
losses.append(loss)
loss = np.mean(losses)
history['episode'].append(i)
history['Episode_reward'].append(reward_sum)
history['Loss'].append(loss)
print('Episode: {}/{} | reward: {} | loss: {:.3f}'.format(i, episode, reward_sum, loss))
self.actor.save_weights('model/ddpg_actor.h5')
self.critic.save_weights('model/ddpg_critic.h5')
return history
def play(self):
"""play game with model.
"""
print('play...')
observation = self.env.reset()
reward_sum = 0
random_episodes = 0
while random_episodes < 10:
self.env.render()
x = observation.reshape(-1, 3)
action = self.actor.predict(x)[0]
observation, reward, done, _ = self.env.step(action)
reward_sum += reward
if done:
print("Reward for this episode was: {}".format(reward_sum))
random_episodes += 1
reward_sum = 0
observation = self.env.reset()
self.env.close()
if __name__ == '__main__':
model = DDPG()
history = model.train(200, 128)
model.save_history(history, 'ddpg.csv')
model.play()
訓(xùn)練結(jié)果如下圖所示宦搬,reward持續(xù)上升的同時(shí)critic loss持續(xù)下降。由于Pendulum的每個(gè)action的reward在-16~0之間乔外,因此reward越接近0效果越好床三。
測試結(jié)果如下所示,每輪游戲的總reward在-100左右杨幼,桿子能夠保持直立狀態(tài),說明DDPG算法解決了這個(gè)問題聂渊。
play...
Reward for this episode was: -123.71978446919498
Reward for this episode was: -115.70330575701709
Reward for this episode was: -123.30843994892032
Reward for this episode was: -377.2392365834364
Reward for this episode was: -131.49351601402685
Reward for this episode was: -245.04125509091233
Reward for this episode was: -250.5214695454614
Reward for this episode was: -129.1264146531351
Reward for this episode was: -126.59492808745193
Reward for this episode was: -130.41697205331536
PS:在實(shí)現(xiàn)代碼時(shí)差购,critic部分使用fit()和train_on_batch()會(huì)出現(xiàn)完全不同的結(jié)果,前者能使模型收斂而后者會(huì)導(dǎo)致模型不收斂汉嗽。這個(gè)問題困擾了我很久欲逃,Debug很久才發(fā)現(xiàn)這里出現(xiàn)問題,而在之前的算法實(shí)現(xiàn)中都沒有出現(xiàn)過饼暑。推測原因可能是因?yàn)閏ritic部分求gradient時(shí)使用了K.function()的原因稳析,導(dǎo)致critic沒有進(jìn)行參數(shù)更新,使用tf來求導(dǎo)會(huì)解決這個(gè)問題弓叛。