最近有項(xiàng)目需要用到RL相關(guān)的一些東西纳像,于是就開(kāi)始嘗試自己搭建一個(gè)自定義的gym環(huán)境病毡,并使用入門的DQN網(wǎng)絡(luò)對(duì)這個(gè)環(huán)境進(jìn)行訓(xùn)練计雌,這個(gè)是我入門的第一個(gè)項(xiàng)目同窘,可能有一些地方理解的不夠的或者有問(wèn)題的焙贷,希望見(jiàn)諒并能指正镜廉。
其中環(huán)境的自定義部分參考了csdn extremebingo的文章贺喝,模型建立與訓(xùn)練過(guò)程參考了: pytorch official tutorials,訓(xùn)練結(jié)果的展示參考了:tensorflow org tutorials
尋找寶石游戲
綠色的小圓圈代表機(jī)器人斗幼,紅色圈圈表示火坑澎蛛,藍(lán)色圓圈表示寶石,褐色圈圈表示石柱孟岛,其中環(huán)境每次重置機(jī)器人便會(huì)出生在任意一個(gè)空白的格子中瓶竭,機(jī)器人需要找到含有寶石的格子獲得獎(jiǎng)勵(lì)結(jié)束游戲。在尋找的過(guò)程中如果踩入火坑游戲結(jié)束獲得負(fù)獎(jiǎng)勵(lì)渠羞,機(jī)器人無(wú)法移動(dòng)到石柱所在的格子中斤贰。
自定義gym環(huán)境
自定義gym環(huán)境模塊主要參考了csdn extremebingo的文章,可以直接點(diǎn)擊查看自定義的具體流程介紹次询,也可以參考github Readme 的gym Env set up模塊介紹中的操作流程荧恍。這里就不再贅述,下面主要介紹下使用這個(gè)流程中可能有的坑:
將自定義的文件拷貝到環(huán)境中可能不生效屯吊,可以嘗試在這個(gè)路徑同樣進(jìn)行一遍操作:
C:\Users\xxx\AppData\Roaming\Python\Python37\site-packages\gym\envs
extremebingo 構(gòu)建的環(huán)境中有部分代碼存在一些筆誤還有一些bug送巡,這里進(jìn)行了一些修改,修改后的環(huán)境代碼
模型構(gòu)建與訓(xùn)練
數(shù)據(jù)收集
訓(xùn)練數(shù)據(jù)主要有:(state, action, next_state, reward)
- state 當(dāng)前環(huán)境的狀態(tài)
- action 當(dāng)前狀態(tài)時(shí)盒卸,機(jī)器人執(zhí)行的動(dòng)作
- next_state 執(zhí)行該動(dòng)作后的狀態(tài)
- reward 執(zhí)行該動(dòng)作后獲得的激勵(lì)
(這里用環(huán)境render的圖表示state 見(jiàn)get_screen,actions = ['n', 'e', 's', 'w'] 含意為:n 上 s下 w左 e 右 reward 找到寶石+1骗爆,踩到火坑-1,增加步數(shù)在訓(xùn)練的過(guò)程中進(jìn)行適度的懲罰
)
數(shù)據(jù)收集過(guò)程中的action 根據(jù)當(dāng)前訓(xùn)練的狀態(tài)按照概率選擇使用模型結(jié)果或者隨機(jī)選擇動(dòng)作執(zhí)行下一步操作
這個(gè)概率值由EPS_END EPS_STAR EPS_DECAY 還有steps_done 共同控制 結(jié)果按照指數(shù)進(jìn)行衰減
這里我使用的值為:
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 20000
選擇隨機(jī)策略的概率隨著訓(xùn)練次數(shù)steps_done 的變化如下圖所示:
這里eps_decay 改為了20000而不是torch offical tutorials里的200,主要是因?yàn)檫@個(gè)環(huán)境比小車的稍微復(fù)雜蔽介,因此前期需要更多的隨機(jī)策略的樣本訓(xùn)練摘投,offical turorials 里概率的變化曲線如下:
,當(dāng)我們?cè)趖est模型時(shí)虹蓄,主要應(yīng)選取模型的輸出作為下一個(gè)action 因此 我在代碼中增加了eval時(shí)eps_threshold=0.001
:
def select_action(state, eval=False):
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * \
math.exp(-1. * steps_done / EPS_DECAY)
if eval:
eps_threshold = 0.001
print("eps_threshold:{} ,steps_done:{}".format(eps_threshold, steps_done))
steps_done += 1
if sample > eps_threshold:
print("select Model")
with torch.no_grad():
# t.max(1) will return largest column value of each row.
# second column on max result is index of where max element was
# found, so we pick action with the larger expected reward.
if eval:
return target_net(state).max(1)[1].view(1, 1)
return policy_net(state).max(1)[1].view(1, 1)
else:
print("select random")
return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)
運(yùn)行過(guò)程中生產(chǎn)的數(shù)據(jù)放到一個(gè)存儲(chǔ)類中犀呼,每次隨機(jī)采樣batchSize條數(shù)據(jù)訓(xùn)練:
class ReplayMemory(object):
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0
def push(self, *args):
"""Saves a transition."""
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.position] = Transition(*args)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
get_screen 函數(shù)主要獲取環(huán)境狀態(tài)改變時(shí)的圖像
def get_screen():
# Returned screen requested by gym is 400x600x3, but is sometimes larger
# such as 800x1200x3. Transpose it into torch order (CHW).
screen = env.render(mode='rgb_array').transpose((2, 0, 1))
# Cart is in the lower half, so strip off the top and bottom of the screen
_, screen_height, screen_width = screen.shape
# print("screen_height {}, screen_width {}".format(screen_height,screen_width))
screen = screen[:, int(screen_height * 0):int(screen_height * 0.9)]
view_width = int(screen_width * 0.6)
# Strip off the edges, so that we have a square image centered on a cart
# screen = screen[:, :, slice_range]
# Convert to float, rescale, convert to torch tensor
# (this doesn't require a copy)
screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
screen = torch.from_numpy(screen)
# Resize, and add a batch dimension (BCHW)
return resize(screen).unsqueeze(0).to(device)
模型構(gòu)建
DQN 網(wǎng)絡(luò)使用三層卷積,根據(jù)狀態(tài) 預(yù)測(cè)下一步采取各個(gè)行動(dòng)的收益
class DQN(nn.Module):
def __init__(self, h, w, outputs):
super(DQN, self).__init__()
self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)
self.bn1 = nn.BatchNorm2d(16)
self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)
self.bn2 = nn.BatchNorm2d(32)
self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)
self.bn3 = nn.BatchNorm2d(32)
# Number of Linear input connections depends on output of conv2d layers
# and therefore the input image size, so compute it.
def conv2d_size_out(size, kernel_size=5, stride=2):
return (size - (kernel_size - 1) - 1) // stride + 1
convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
linear_input_size = convw * convh * 32
self.head = nn.Linear(linear_input_size, outputs)
# Called with either one element to determine next action, or a batch
# during optimization. Returns tensor([[left0exp,right0exp]...]).
def forward(self, x):
x = F.relu(self.bn1(self.conv1(x)))
x = F.relu(self.bn2(self.conv2(x)))
x = F.relu(self.bn3(self.conv3(x)))
return self.head(x.view(x.size(0), -1))
訓(xùn)練過(guò)程模型參數(shù)更新
通過(guò)policy_net (參數(shù)實(shí)時(shí)更新的net)根據(jù)batch數(shù)據(jù)中的state信息預(yù)測(cè)下一步采取的每個(gè)行動(dòng)的收益薇组,生成bx4(action 可選擇的個(gè)數(shù)4)的矩陣外臂,根據(jù)batch 中 的action 的index 選擇 這一action 模型預(yù)測(cè)的值(Q(s_t, a) - model computes Q(s_t)):
state_action_values = policy_net(state_batch).gather(1, action_batch)
使用target_net (參數(shù)更新copy from policy net延遲的net) 使用next state信息(過(guò)濾掉 狀態(tài)為none)預(yù)測(cè)最大收益的行動(dòng):next_state_values
當(dāng)前狀態(tài)的收益期望值 = 下一狀態(tài)預(yù)測(cè)的行動(dòng)最大收益(next_state_values)*GAMMA + 當(dāng)前狀態(tài)行為的實(shí)際收益 reward_batch 如下所示:
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
根據(jù)當(dāng)前網(wǎng)絡(luò)預(yù)測(cè)的動(dòng)作收益 state_action_values
與實(shí)際期望的收益的誤差作為模型的loss 更新整個(gè)策略網(wǎng)絡(luò)
loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
print("loss:{}".format(loss.item()))
# Optimize the model
optimizer.zero_grad()
loss.backward()
for param in policy_net.parameters():
param.grad.data.clamp_(-1, 1)
optimizer.step()
該函數(shù)optimize_model完整代碼如下:
def optimize_model():
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
# Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
# detailed explanation). This converts batch-array of Transitions
# to Transition of batch-arrays.
batch = Transition(*zip(*transitions))
# Compute a mask of non-final states and concatenate the batch elements
# (a final state would've been the one after which simulation ended)
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
batch.next_state)), device=device, dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
# Compute Q(s_t, a) - the model computes Q(s_t), then we select the
# columns of actions taken. These are the actions which would've been taken
# for each batch state according to policy_net
state_action_values = policy_net(state_batch).gather(1, action_batch)
# Compute V(s_{t+1}) for all next states.
# Expected values of actions for non_final_next_states are computed based
# on the "older" target_net; selecting their best reward with max(1)[0].
# This is merged based on the mask, such that we'll have either the expected
# state value or 0 in case the state was final.
next_state_values = torch.zeros(BATCH_SIZE, device=device)
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
# Compute the expected Q values
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# Compute Huber loss
loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
print("loss:{}".format(loss.item()))
# Optimize the model
optimizer.zero_grad()
loss.backward()
for param in policy_net.parameters():
param.grad.data.clamp_(-1, 1)
optimizer.step()