原始代碼:https://github.com/llSourcell/Q-Learning-for-Trading
注意運(yùn)行環(huán)境gym 0.8.4肆资,py3也能運(yùn)行 修改了原代碼部分bug
-
數(shù)據(jù)
3 csv文件荒椭,分別是 IBM, MSFT, 和QCOM 從 2000.1.3 到 2017.12.27 (5629 天) 的股價(jià)信息尚猿,包含開盤價(jià),收盤價(jià)卢厂,最高價(jià)告嘲,最低價(jià)稳强,和交易數(shù)量。
數(shù)據(jù)信息 問題
給你一筆起步資金间涵,如何在市場(chǎng)中不斷交易仁热,配置手上的股票數(shù)目,使利潤(rùn)最大化勾哩。開發(fā)交易環(huán)境
envs .py
基于openai的gym開發(fā)一個(gè)簡(jiǎn)易的模擬交易環(huán)境抗蠢。
對(duì)于強(qiáng)化學(xué)習(xí)的兩個(gè)重要組件根盒,定義state,action
-
state:[ 每只股票持有數(shù)目物蝙, 每只股票的股價(jià)炎滞, 手上的現(xiàn)金]
- state的長(zhǎng)度為7: 股票只數(shù)*2+1
- 我們將收盤價(jià)作為state中的股價(jià)
- 每執(zhí)行一次交易會(huì)更新一下股價(jià)
-
action:[賣出(0), 持有(1), 買入(2)]
- 這里將問題簡(jiǎn)化,每次賣都是賣出手上持有的全部持有數(shù)目
- 每次買入都是把手上的錢全部花光
如果是買入多只股票诬乞,會(huì)根據(jù)手上的越平均分配給每只股票
import gym
from gym import spaces
from gym.utils import seeding
import numpy as np
import itertools
class TradingEnv(gym.Env):
def __init__(self, train_data, init_invest=20000):
# data
self.stock_price_history = np.around(train_data) # round up to integer to reduce state space
self.n_stock, self.n_step = self.stock_price_history.shape
# instance attributes
self.init_invest = init_invest #啟動(dòng)資金
self.cur_step = None #當(dāng)前在第幾天
self.stock_owned = None #持有股票數(shù)目
self.stock_price = None #股票價(jià)格
self.cash_in_hand = None #當(dāng)前財(cái)富
# action space
self.action_space = spaces.Discrete(3**self.n_stock) #3*3的離散動(dòng)作空間
# observation space: give estimates in order to sample and build scaler
#離散狀態(tài)空間册赛,需寫入每個(gè)狀態(tài)的最大最小值
stock_max_price = self.stock_price_history.max(axis=1)
stock_range = [[0, init_invest * 2 // mx] for mx in stock_max_price]
price_range = [[0, mx] for mx in stock_max_price]
cash_in_hand_range = [[0, init_invest * 2]]
self.observation_space = spaces.MultiDiscrete(stock_range + price_range + cash_in_hand_range) #observation_space.shape =3+3+1 結(jié)果是一個(gè)7維數(shù)組 每個(gè)數(shù)值的大小范圍有3個(gè)range決定
# seed and start
self._seed()
self._reset()
def _seed(self, seed=None):#保證結(jié)果可復(fù)現(xiàn)
self.np_random, seed = seeding.np_random(seed)
return [seed]
def _reset(self): #重置 從第一天開始交易 持有股票數(shù)量為0,啟動(dòng)資金恢復(fù)
self.cur_step = 0
self.stock_owned = [0] * self.n_stock
self.stock_price = self.stock_price_history[:, self.cur_step]
self.cash_in_hand = self.init_invest
return self._get_obs()
def _step(self, action): #一次交易行為 注意方法名是繼承與gym
assert self.action_space.contains(action)
prev_val = self._get_val()
self.cur_step += 1
self.stock_price = self.stock_price_history[:, self.cur_step] # update price
self._trade(action)
cur_val = self._get_val()
reward = cur_val - prev_val
done = self.cur_step == self.n_step - 1
info = {'cur_val': cur_val}
return self._get_obs(), reward, done, info
def _get_obs(self):#生成觀察值 每個(gè)obs是一個(gè)長(zhǎng)度為7的一維數(shù)組
obs = []
obs.extend(self.stock_owned)
obs.extend(list(self.stock_price))
obs.append(self.cash_in_hand)
return obs
def _get_val(self): #計(jì)算手上股票的價(jià)值和剩余的現(xiàn)金
return np.sum(self.stock_owned * self.stock_price) + self.cash_in_hand
def _trade(self, action):
# all combo to sell(0), hold(1), or buy(2) stocks #所有可能的操作集合
action_combo = list(map(list, itertools.product([0, 1, 2], repeat=self.n_stock)))
action_vec = action_combo[action]
# one pass to get sell/buy index #檢索出哪只要買 哪只要賣
sell_index = []
buy_index = []
for i, a in enumerate(action_vec):
if a == 0:
sell_index.append(i)
elif a == 2:
buy_index.append(i)
# two passes: sell first, then buy; might be naive in real-world settings
if sell_index:
for i in sell_index:
self.cash_in_hand += self.stock_price[i] * self.stock_owned[i]
self.stock_owned[i] = 0
if buy_index:
can_buy = True
#手上的錢可以買 則不停的買下去 并且是3只股票一只一只購(gòu)買 保證平均分配份數(shù)
while can_buy:
for i in buy_index:
if self.cash_in_hand > self.stock_price[i]:
self.stock_owned[i] += 1 # buy one share
self.cash_in_hand -= self.stock_price[i]
else:
can_buy = False
- 數(shù)據(jù)預(yù)處理
utils.py
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
def get_data(col='close'):
""" Returns a 3 x n_step array """
msft = pd.read_csv('data/daily_MSFT.csv', usecols=[col])
ibm = pd.read_csv('data/daily_IBM.csv', usecols=[col])
qcom = pd.read_csv('data/daily_QCOM.csv', usecols=[col])
# recent price are at top; reverse it
return np.array([msft[col].values[::-1],
ibm[col].values[::-1],
qcom[col].values[::-1]])
def get_scaler(env):
""" Takes a env and returns a scaler for its observation space """
low = [0] * (env.n_stock * 2 + 1)
high = []
max_price = env.stock_price_history.max(axis=1)
min_price = env.stock_price_history.min(axis=1)
max_cash = env.init_invest * 3 # 3 is a magic number...
max_stock_owned = max_cash // min_price
for i in max_stock_owned:
high.append(i)
for i in max_price:
high.append(i)
high.append(max_cash)
scaler = StandardScaler()
scaler.fit([low, high])
return scaler
def maybe_make_dir(directory):
if not os.path.exists(directory):
os.makedirs(directory)
- 使用keras構(gòu)建網(wǎng)絡(luò)
model.py
構(gòu)建一個(gè)2層的全連接網(wǎng)絡(luò)就夠了
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
def mlp(n_obs, n_action, n_hidden_layer=1, n_neuron_per_layer=32,
activation='relu', loss='mse'):
""" A multi-layer perceptron """
model = Sequential()
model.add(Dense(n_neuron_per_layer, input_dim=n_obs, activation=activation))
for _ in range(n_hidden_layer):
model.add(Dense(n_neuron_per_layer, activation=activation))
model.add(Dense(n_action, activation='linear'))
model.compile(loss=loss, optimizer=Adam())
print(model.summary())
return model
- 設(shè)計(jì)agent
agent.py
重頭戲來了震嫉,agent相當(dāng)于強(qiáng)化學(xué)習(xí)算法的大腦森瘪,這里使用帶有replay buff機(jī)制的DQN做決策。
from collections import deque
import random
import numpy as np
from model import mlp
class DQNAgent(object):
""" A simple Deep Q agent """
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=2000)
self.gamma = 0.95 # discount rate
self.epsilon = 1.0 # exploration rate
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.model = mlp(state_size, action_size)
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size)
act_values = self.model.predict(state)
return np.argmax(act_values[0]) # returns action
def replay(self, batch_size=32):
""" vectorized implementation; 30x speed up compared with for loop """
minibatch = random.sample(self.memory, batch_size)
states = np.array([tup[0][0] for tup in minibatch])
actions = np.array([tup[1] for tup in minibatch])
rewards = np.array([tup[2] for tup in minibatch])
next_states = np.array([tup[3][0] for tup in minibatch])
done = np.array([tup[4] for tup in minibatch])
# Q(s', a)
target = rewards + self.gamma * np.amax(self.model.predict(next_states), axis=1)
# end state target is reward itself (no lookahead)
target[done] = rewards[done]
# Q(s, a)
target_f = self.model.predict(states)
# make the agent to approximately map the current state to future discounted reward
target_f[range(batch_size), actions] = target
self.model.fit(states, target_f, epochs=1, verbose=0)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def load(self, name):
self.model.load_weights(name)
def save(self, name):
self.model.save_weights(name)
- 訓(xùn)練
run.py
import pickle
import time
import numpy as np
import argparse
import re
from envs import TradingEnv
from agent import DQNAgent
from utils import get_data, get_scaler, maybe_make_dir
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-e', '--episode', type=int, default=2000,
help='number of episode to run')
parser.add_argument('-b', '--batch_size', type=int, default=32,
help='batch size for experience replay')
parser.add_argument('-i', '--initial_invest', type=int, default=20000,
help='initial investment amount')
parser.add_argument('-m', '--mode', type=str, required=True,
help='either "train" or "test"')
parser.add_argument('-w', '--weights', type=str, help='a trained model weights')
args = parser.parse_args()
maybe_make_dir('weights')
maybe_make_dir('portfolio_val')
timestamp = time.strftime('%Y%m%d%H%M')
data = np.around(get_data()) #將股價(jià)有小數(shù)轉(zhuǎn)化成整數(shù)
train_data = data[:, :3526] #劃分訓(xùn)練集票堵、測(cè)試集
test_data = data[:, 3526:]
env = TradingEnv(train_data, args.initial_invest)
state_size = env.observation_space.shape
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)
scaler = get_scaler(env)
portfolio_value = [] #存儲(chǔ)每次交易之后持有的資產(chǎn)
if args.mode == 'test':
# remake the env with test data
env = TradingEnv(test_data, args.initial_invest)
# load trained weights
agent.load(args.weights)
# when test, the timestamp is same as time when weights was trained
timestamp = re.findall(r'\d{12}', args.weights)[0]
for e in range(args.episode):
state = env.reset()
state = scaler.transform([state])#將被標(biāo)準(zhǔn)化的觀察空間還原
for time in range(env.n_step):
action = agent.act(state)
next_state, reward, done, info = env.step(action)
next_state = scaler.transform([next_state])
if args.mode == 'train':
agent.remember(state, action, reward, next_state, done) #存儲(chǔ)一條經(jīng)驗(yàn)
state = next_state
if done:
print("episode: {}/{}, episode end value: {}".format(
e + 1, args.episode, info['cur_val']))
portfolio_value.append(info['cur_val']) # append episode end portfolio value
break
if args.mode == 'train' and len(agent.memory) > args.batch_size: #存儲(chǔ)的經(jīng)驗(yàn)數(shù)目達(dá)到了一個(gè)batch 開始訓(xùn)練
agent.replay(args.batch_size)
if args.mode == 'train' and (e + 1) % 10 == 0: # checkpoint weights
agent.save('weights/{}-dqn.h5'.format(timestamp))
# save portfolio value history to disk
with open('portfolio_val/{}-{}.p'.format(timestamp, args.mode), 'wb') as fp:
pickle.dump(portfolio_value, fp)