此程序使用的是DDQN算法和DuelingDQN模型凡恍,在小車上山環(huán)境中的實現(xiàn)。
DQN算法族適用于動作空間有限的離散非連續(xù)狀態(tài)環(huán)境伍伤,但因為狀態(tài)無限多所以難以通過有限的回合對Q(s,a)進行估值和訓練收斂抛杨。DQN算法族將Q-learning和神經(jīng)網(wǎng)絡結合,通過神經(jīng)網(wǎng)絡來構造函數(shù)鄙币,實現(xiàn)對Q(s,a)的估值肃叶,此時Q(s,a)不再是通過查表計算,而是通過把狀態(tài)參量代入函數(shù)計算十嘿,所以解決了無限多狀態(tài)無法訓練的問題因惭。
程序缺點:暫未實現(xiàn)可視化。
初始化
import torch
from torch import nn
import numpy as np
import pandas as pd
import gym
import collections
#使用cuda加速訓練
device = 'cuda' if torch.cuda.is_available() else 'cpu'
經(jīng)驗回放
標準Q-learning訓練時沒有打破序列相關性详幽,可能會導致估值存在偏頗,影響收斂。DQN算法族使用經(jīng)驗回放來解決這個問題唇聘。
class DQNReplayer:
def __init__(self, capacity):
self.memory = pd.DataFrame(index=range(capacity),
columns=['observation', 'action', 'reward',
'next_observation', 'done'])
self.i = 0
self.count = 0
self.capacity = capacity
def store(self, *args):
self.memory.loc[self.i] = args
self.i = (self.i + 1) % self.capacity
self.count = min(self.count + 1, self.capacity)
def sample(self, size):
indices = np.random.choice(self.count, size=size)
return (np.stack(self.memory.loc[indices, field]) for field in
self.memory.columns)
Dueling Net模型
與普通網(wǎng)絡模型在訓練過程中無異版姑,不必深究
class DuelingNet(nn.Module):
def __init__(self, layers, num_actions):
super(DuelingNet, self).__init__()
self.layers = layers
self.num_actions=num_actions
self.features = nn.Sequential(
nn.Linear(self.layers, 64, bias=True),
nn.ReLU(),
)
self.adv = nn.Linear(64, self.num_actions, bias=True)
self.val = nn.Linear(64, 1, bias=True)
def forward(self, x):
x = self.features(x)
adv = self.adv(x)
val = self.val(x).expand(adv.size()) #擴展某個size為1的維度,值一樣 (1迟郎,6)
x = val + adv -adv.mean().expand(adv.size())
return x
訓練agent
class agent():
#設置超參數(shù)
def __init__(self,env=gym.make('MountainCar-v0'),layers=2,capacity=500,LR=0.001,gamma=0.9,epsilon=0.05):
self.env=env
self.layers=layers
self.min=env.unwrapped.min_position
self.max=env.unwrapped.max_position
self.action_num=env.action_space.n
self.net1=DuelingNet(self.layers,self.action_num).to(device)
self.net2=DuelingNet(self.layers,self.action_num).to(device)
self.optimizer1 = torch.optim.Adam(self.net1.parameters(), lr=LR)
self.loss_func = nn.MSELoss()
self.replayer=DQNReplayer(capacity)
self.lr=LR
self.gamma=gamma
self.epsilon=epsilon
#選擇動作
def action(self,state,israndom):
state_=torch.Tensor(state).to(device)
if israndom and np.random.random() < self.epsilon:
return np.random.randint(0, self.action_num)
return torch.max(torch.from_numpy(self.net1.forward(state_).cpu().detach().numpy()).to(device), 0)[1].item()
#訓練網(wǎng)絡
def learn(self,state,action,reward,next_state,done):
if done:
self.replayer.store(state,action,reward,next_state,0)
else:
self.replayer.store(state,action,reward,next_state,1)
if self.replayer.count<self.replayer.capacity:
return None
batch = list(self.replayer.sample(10))
state = torch.FloatTensor(batch[0]).to(device)
action = torch.LongTensor(batch[1]).unsqueeze(1).to(device)
reward = torch.FloatTensor(batch[2]).unsqueeze(1).to(device)
next_state = torch.FloatTensor(batch[3]).to(device)
done = torch.FloatTensor(batch[4]).unsqueeze(1).to(device)
a = self.net1.forward(next_state).max(dim=1)[1].view(-1,1)
u = reward + self.gamma * self.net2.forward(next_state).gather(1,a) * done
loss = self.loss_func(self.net1.forward(state).gather(1,action),u)
self.optimizer1.zero_grad()
loss.backward()
self.optimizer1.step()
#儲存模型參數(shù)
def save_models(self,episode):
torch.save(self.net1.state_dict(), './net/double_dqn.pkl')
torch.save(self.net2.state_dict(), './net/double_dqn_target.pkl')
print('=====================')
print('%d episode model has been save...' %(episode))
開始訓練
agent = agent()
best_reward = 0
mean_test = collections.deque(maxlen=10)
for i_episode in range(2000):
state = agent.env.reset()
total_reward = 0
treward = 0
step_num=0
#每十個回合統(tǒng)一兩個網(wǎng)絡的參數(shù)
if i_episode%10==0:
agent.net2.load_state_dict(agent.net1.state_dict())
while True:
# agent.env.render()
action = agent.action(state, True)
next_state, reward, done, info = agent.env.step(action)
reward_real = reward#環(huán)境真實獎勵
#獎勵函數(shù)(非常重要剥险,影響收斂結果和收斂速度)
if next_state[0]>-0.4 and next_state[0]<0.5:
reward=10*(next_state[0]+0.4)**3
elif next_state[0]>=0.5:
reward=100
elif next_state[0]<=-0.4:
reward=-0.1
treward+=reward#獎勵函數(shù)獎勵
agent.learn(state, action, reward, next_state, done)
state=next_state
total_reward += reward_real
step_num += 1
if done or step_num>=200:
break
print('episode: {} , total_reward: {} , treward: {}'.format(i_episode, round(total_reward, 3), round(treward, 3)))
# TEST
if i_episode % 10 == 0:
state = agent.env.reset()
test_reward = 0
while True:
# agent.env.render()
action = agent.action(state, israndom=False)
next_state, reward, done, info = agent.env.step(action)
test_reward += reward
state = next_state
if done:
agent.env.close()
break
print('episode: {} , test_reward: {}'.format(i_episode, round(test_reward, 3)))
mean_test.append(test_reward)
if np.mean(mean_test)>best_reward:
best_reward = np.mean(mean_test)
agent.save_models(i_episode)