內(nèi)容來自:A Clockwork RNN
Jan Koutník, Klaus Greff, Faustino Gomez, Jürgen Schmidhuber
CW-RNN是一個帶時鐘頻率的RNN變種目胡,似乎是對回歸有不錯的效果测垛,不過這個有爭論太示。對一個時間序列砰琢,無論是回歸還是分類都是將數(shù)據(jù)經(jīng)過循環(huán)按照時間序列輸入驾茴,RNN使用隱藏矩陣進行記憶,然后判定輸出销凑。針對原始RNN對長序列記憶效果極差溉苛,作者在這里設(shè)計了一種將隱藏狀態(tài)矩陣(記憶機制)分割成g個小模塊并使用類時鐘頻率掩碼的方式,將RNN的記憶分割成幾個部分签餐,然后經(jīng)過特定設(shè)計寓涨,使CW-RNN記憶矩陣中每個部分處理不同的時刻的數(shù)據(jù),加強記憶效果氯檐。
在論文中作者列舉的數(shù)據(jù)顯示這種設(shè)計在特定的應用中具有更優(yōu)的效果缅茉。例如,對一個長度為L的時間序列進行預測男摧,在使用CW-RNN的時候可以通過設(shè)置時鐘頻率人工設(shè)定記憶序列蔬墩。如,L長度為36耗拓,那我們可以設(shè)置CW為[1,2,4,8,16,12,18,36]拇颅,這相當于在長度為36的序列設(shè)置了多個記憶點,每個記憶點基于此記憶點之前的輸入值進行抽象記憶乔询。這個設(shè)計與RNN經(jīng)典變種LSTM有巨大的差異樟插。LSTM通過gate結(jié)構(gòu)實現(xiàn)自動的記憶選擇汲取,這里的CW設(shè)計需要有一種類似于先驗值的CW設(shè)定,這并不是一種十分優(yōu)雅的設(shè)計黄锤。但這種設(shè)計增加的人工設(shè)定序列節(jié)點選取的操作空間搪缨,應該可以在一定程度上對標收益率的時間序列進行特別設(shè)計,從而取得不錯的回歸效果鸵熟。
該設(shè)計進行時間序列回歸擬合副编,在取局部圖的時候可以觀察到,LSTM的回歸效果相對平滑流强,而CW-RNN并沒有這種缺陷痹届。
import numpy as np
import pandas as pd
# import statsmodels.api as sm
import tensorflow as tf
# import matplotlib.pylab as plt
import seaborn as sns
# %matplotlib inline
sns.set_style('whitegrid')
class ClockworkRNN(object):
def __init__(self,
in_length,
in_width,
out_width,
training_epochs=1e2,
batch_size=1024,
learning_rate=1e-4,
hidden_neurons=360,
Rb=60,
Ti=2,
Ti_sum=6,
display=1e2):
#
self.in_length = in_length
self.in_width = in_width
self.out_width = out_width
self.batch_size = batch_size
self.learning_rate = learning_rate
self.display = display
#
self.hidden_neurons = hidden_neurons
self.Rb = Rb
self.Ti = Ti
self.Ti_sum = Ti_sum
self.clockwork_periods = [self.Ti ** x for x in range(self.Ti_sum)]
self.training_epochs = training_epochs
self.inputs = tf.placeholder(dtype=tf.float32, shape=[None, self.in_length, self.in_width], name='inputs')
self.targets = tf.placeholder(dtype=tf.float32, shape=[None, self.out_width], name='targets')
#
self.__inference()
# 下三角掩碼矩陣,處理g-moduels劃分形成的上三角權(quán)重矩陣
def __Mask_Matrix(self, W, k):
length = np.int(W / k)
tmp = np.ones([W, W])
for i in range(length)[1:]:
tmp[i * k:(i + 1) * k, :i * k] = 0
tmp[(i + 1) * k:, :i * k] = 0
return np.transpose(tmp)
def __inference(self):
self.sess = sess = tf.InteractiveSession()
# 標準RNN初始權(quán)重
with tf.variable_scope('input_layers'):
self.WI = tf.get_variable('W', shape=[self.in_width, self.hidden_neurons],
initializer=tf.truncated_normal_initializer(stddev=0.1))
self.bI = tf.get_variable('b', shape=[self.hidden_neurons],
initializer=tf.truncated_normal_initializer(stddev=0.1))
traingular_mask = self.__Mask_Matrix(self.hidden_neurons, self.Rb)
self.traingular_mask = tf.constant(traingular_mask, dtype=tf.float32, name='mask_upper_traingular')
with tf.variable_scope('hidden_layers'):
self.WH = tf.get_variable('W', shape=[self.hidden_neurons, self.hidden_neurons],
initializer=tf.truncated_normal_initializer(stddev=0.1))
self.WH = tf.multiply(self.WH, self.traingular_mask)
self.bH = tf.get_variable('b', shape=[self.hidden_neurons],
initializer=tf.truncated_normal_initializer(stddev=0.1))
with tf.variable_scope('output_layers'):
self.WO = tf.get_variable('W', shape=[self.hidden_neurons, self.out_width],
initializer=tf.truncated_normal_initializer(stddev=0.1))
self.bO = tf.get_variable('b', shape=[self.out_width],
initializer=tf.truncated_normal_initializer(stddev=0.1))
# 輸入訓練數(shù)據(jù)轉(zhuǎn)換為列表
X_list = [tf.squeeze(x, axis=[1]) for x
in tf.split(value=self.inputs, axis=1, num_or_size_splits=self.in_length, name='inputs_list')]
with tf.variable_scope('clockwork_rnn') as scope:
# 定義初始時刻的隱藏狀態(tài)打月,設(shè)定為全0
self.state = tf.get_variable('hidden_sate', shape=[self.batch_size, self.hidden_neurons],
initializer=tf.zeros_initializer(), trainable=False)
for i in range(self.in_length):
# 獲取g_moduels索引
if i > 0:
scope.reuse_variables()
g_counter = 0
for j in range(self.Ti_sum):
if i % self.clockwork_periods[j] == 0:
g_counter += 1
if g_counter == self.Ti_sum:
g_counter = self.hidden_neurons
else:
g_counter *= self.Rb
# t時刻eq1
tmp_right = tf.matmul(X_list[i], tf.slice(self.WI, [0, 0], [-1, g_counter]))
tmp_right = tf.nn.bias_add(tmp_right, tf.slice(self.bI, [0], [g_counter]))
self.WH = tf.multiply(self.WH, self.traingular_mask)
tmp_left = tf.matmul(self.state, tf.slice(self.WH, [0, 0], [-1, g_counter]))
tmp_left = tf.nn.bias_add(tmp_left, tf.slice(self.bH, [0], [g_counter]))
tmp_hidden = tf.tanh(tf.add(tmp_left, tmp_right))
# 更新隱藏狀態(tài)
self.state = tf.concat(axis=1, values=[tmp_hidden, tf.slice(self.state, [0, g_counter], [-1, -1])])
self.final_state = self.state
self.pred = tf.nn.bias_add(tf.matmul(self.final_state, self.WO), self.bO)
# self.cost_sum = tf.reduce_sum(tf.square(self.targets - self.pred))
self.cost = tf.reduce_sum(tf.square(self.targets - self.pred))
self.optimizer = tf.train.AdamOptimizer(self.learning_rate).minimize(self.cost)
self.sess.run(tf.global_variables_initializer())
def fit(self, inputs, targets):
sess = self.sess
for step in range(np.int(self.training_epochs)):
for i in range(np.int(len(targets) / self.batch_size)):
batch_x = inputs[i * self.batch_size:(i + 1) * self.batch_size].reshape(
[self.batch_size, self.in_length, self.in_width])
batch_y = targets[i * self.batch_size:(i + 1) * self.batch_size].reshape(
[self.batch_size, self.out_width])
sess.run(self.optimizer, feed_dict={self.inputs: batch_x, self.targets: batch_y})
if len(targets) % self.batch_size != 0:
batch_x = inputs[-self.batch_size:].reshape([self.batch_size, self.in_length, self.in_width])
batch_y = targets[-self.batch_size:].reshape([self.batch_size, self.out_width])
sess.run(self.optimizer, feed_dict={self.inputs: batch_x, self.targets: batch_y})
if step % self.display == 0:
print(sess.run(self.cost, feed_dict={self.inputs: batch_x, self.targets: batch_y}))
def prediction(self, inputs):
sess = self.sess
tmp = np.zeros(self.out_width)
for i in range(np.int(len(inputs) / self.batch_size)):
batch_x = inputs[i * self.batch_size:(i + 1) * self.batch_size].reshape(
[self.batch_size, self.in_length, self.in_width])
tmp = np.vstack((tmp, sess.run(self.pred, feed_dict={self.inputs: batch_x})))
if len(inputs) % self.batch_size != 0:
batch_x = inputs[-self.batch_size:].reshape([self.batch_size, self.in_length, self.in_width])
tp = np.vstack((tmp, sess.run(self.pred, feed_dict={self.inputs: batch_x})))
l = len(targets) % self.batch_size
tp = tp[-l:]
tmp = np.vstack((tmp, tp))
tmp = np.delete(tmp, 0, 0)
return tmp
tmp = pd.read_csv('SHCOMP.csv')
tmp['trading_moment'] = pd.to_datetime(tmp['DATE'].values)
tmp.set_index('trading_moment', drop=True, inplace=True)
tmp['Returns'] = np.log(tmp.Close.shift(-10) / tmp.Close)
tmp.dropna(inplace=True)
tp = np.array(tmp['Returns'])
# del tmp['Unnamed: 0']
in_length = 36
out_length = 1
inputs = np.zeros(in_length)
targets = np.zeros(1)
for i in range(len(tp))[in_length:-out_length]:
m = tp[i - in_length:i]
R = tp[i:i + 1]
inputs = np.vstack((inputs, m))
targets = np.vstack((targets, R))
targets = np.delete(targets, 0, 0)
inputs = np.delete(inputs, 0, 0)
T_inputs = inputs[:512]
T_targets = targets[:512]
a = ClockworkRNN(36, 1, 1, training_epochs=2e4, batch_size=512)
a.fit(T_inputs, T_targets)
outputs = a.prediction(T_inputs)
CW_RNN = outputs
show = pd.DataFrame([T_targets.ravel('C'), outputs.ravel('C')]).T
show.plot()