本文鏈接:個(gè)人站 | 簡(jiǎn)書 | CSDN
版權(quán)聲明:除特別聲明外觉义,本博客文章均采用 BY-NC-SA 許可協(xié)議。轉(zhuǎn)載請(qǐng)注明出處。
最近打算分享一些基于深度學(xué)習(xí)的時(shí)間序列預(yù)測(cè)方法。這是第二篇辽剧。
今次介紹的是 Amazon 在 NIPS 2018 上發(fā)表的文章 Deep State Space Models for Time Series Forecasting臂寝。
狀態(tài)空間模型(State Space Models)起源于控制工程領(lǐng)域章鲤,典型的應(yīng)用包括卡爾曼濾波等。時(shí)間序列分析中的一些經(jīng)典方法咆贬,如 ARIMA败徊、Holt-Winters’ 等,都可以改寫成狀態(tài)空間模型素征。狀態(tài)空間模型對(duì)每個(gè)時(shí)間序列單獨(dú)建模集嵌,無法利用序列之間相似的模式,因而對(duì)歷史數(shù)據(jù)較少的序列往往無能為力御毅。
DeepState 將狀態(tài)空間模型與深度學(xué)習(xí)結(jié)合起來根欧。先用循環(huán)神經(jīng)網(wǎng)絡(luò)將特征映射為狀態(tài)空間模型的參數(shù),再使用狀態(tài)空間模型預(yù)測(cè)序列在每個(gè)時(shí)間步上取值的概率分布端蛆。所有的時(shí)間序列共享網(wǎng)絡(luò)本身的參數(shù)凤粗,而每個(gè)時(shí)間序列都有獨(dú)立的狀態(tài)空間參數(shù)。這樣一來今豆,既能從大量的序列和特征中學(xué)習(xí)到相似的模式嫌拣,又能使模型具有一定的可解釋性。
Model
通常來說呆躲,狀態(tài)空間模型包含一個(gè)狀態(tài)轉(zhuǎn)移方程和一個(gè)觀測(cè)模型异逐,前者描述了隱藏狀態(tài)隨時(shí)間變化的規(guī)律 ,后者概括了給定隱藏狀態(tài)下觀測(cè)值的條件概率分布 插掂,其中隱藏狀態(tài) 灰瞻。
DeepState 使用的是線性高斯?fàn)顟B(tài)空間模型,其狀態(tài)轉(zhuǎn)移方程形如[1]
觀測(cè)模型形如
其中 為狀態(tài)轉(zhuǎn)移矩陣辅甥, 是狀態(tài)轉(zhuǎn)移噪聲的強(qiáng)度酝润, 和 是觀測(cè)模型的權(quán)重和偏置, 是觀測(cè)噪聲的強(qiáng)度璃弄。初始狀態(tài) 要销。
綜上,線性高斯?fàn)顟B(tài)空間模型的參數(shù)為 夏块。
DeepState 模型的結(jié)構(gòu)如下圖所示疏咐。先用循環(huán)神經(jīng)網(wǎng)絡(luò)計(jì)算 ,再用 計(jì)算狀態(tài)空間模型的參數(shù) 脐供,最后計(jì)算似然 凳鬓,通過最大化對(duì)數(shù)似然來學(xué)習(xí)網(wǎng)絡(luò)的參數(shù)。
似然函數(shù)可以分解為
參考我們?cè)?a href="http://www.reibang.com/p/da4dd6d0d8a2" target="_blank">《卡爾曼濾波簡(jiǎn)介》中給出的推導(dǎo)患民,可以很容易得到
式中的參數(shù)可以通過以下遞推關(guān)系
計(jì)算得到。其中 垦梆,匹颤。
預(yù)測(cè)階段與之前介紹的 DeepAR 類似仅孩,都是使用祖先采樣方法獲取一批預(yù)測(cè)區(qū)間內(nèi)每個(gè)時(shí)間步上的樣本點(diǎn),然后利用樣本計(jì)算感興趣的統(tǒng)計(jì)量印蓖。
Code
這里給出一個(gè)基于 TensorFlow 構(gòu)建的簡(jiǎn)單 demo辽慕。
import numpy as np
import tensorflow as tf
import tensorflow_probability as tfp
class DeepState(tf.keras.models.Model):
"""
DeepState 模型
"""
def __init__(self, lstm_units, latent_dim):
super().__init__()
self.latent_dim = latent_dim
self.output_dim = 1
# 注意,文章中使用了多層的 LSTM 網(wǎng)絡(luò)赦肃,為了簡(jiǎn)單起見溅蛉,本 demo 只使用一層
self.lstm = tf.keras.layers.LSTM(lstm_units, return_sequences=True, return_state=True)
self.dense_l_prior = tf.keras.layers.Dense(latent_dim)
self.dense_P_prior = tf.keras.layers.Dense(latent_dim, activation='softplus')
self.dense_F = tf.keras.layers.Dense(latent_dim * latent_dim)
self.dense_H = tf.keras.layers.Dense(output_dim * latent_dim)
self.dense_b = tf.keras.layers.Dense(output_dim)
self.dense_w = tf.keras.layers.Dense(latent_dim, activation='softplus')
self.dense_v = tf.keras.layers.Dense(output_dim, activation='softplus')
def call(self, inputs, initial_state=None, prior=True):
batch_size, time_steps, _ = inputs.shape
outputs, state_h, state_c = self.lstm(inputs, initial_state=initial_state)
state = [state_h, state_c]
F = tf.reshape(self.dense_F(outputs), [batch_size, time_steps, self.latent_dim, self.latent_dim])
H = tf.reshape(self.dense_H(outputs), [batch_size, time_steps, self.output_dim, self.latent_dim])
b = tf.expand_dims(self.dense_b(outputs), -1)
w = tf.expand_dims(self.dense_w(outputs), -1)
v = tf.expand_dims(self.dense_v(outputs), -1)
Q = tf.matmul(w, tf.transpose(w, [0, 1, 3, 2]))
R = tf.matmul(v, tf.transpose(v, [0, 1, 3, 2]))
params = [F, H, b, Q, R]
if prior:
l = tf.expand_dims(self.dense_l_prior(outputs[:, :1, :]), -1)
P = tf.linalg.diag(self.dense_P_prior(outputs[:, :1, :]))
params += [l, P]
return [params, state]
def kalman_step(F, H, b, Q, R, l, P, z=None):
"""
卡爾曼濾波的單步操作
"""
sampling = z is None
l = tf.matmul(F, l)
P = tf.matmul(tf.matmul(F, P), tf.transpose(F, [0, 1, 3, 2])) + Q
z_pred = tf.matmul(H, l) + b
S = tf.matmul(tf.matmul(H, P), tf.transpose(H, [0, 1, 3, 2])) + R
if sampling:
z = tfp.distributions.Normal(z_pred, S).sample()
else:
log_prob = tfp.distributions.Normal(z_pred, S).log_prob(z)
K = tf.matmul(tf.matmul(P, tf.transpose(H, [0, 1, 3, 2])), tf.linalg.inv(S))
y = z - z_pred
l = l + tf.matmul(K, y)
P = P - tf.matmul(tf.matmul(K, H), P)
if sampling:
return [l, P, z]
return [l, P, log_prob]
def kalman_filtering(F, H, b, Q, R, l, P, z=None):
"""
卡爾曼濾波
"""
time_steps = F.shape[1]
if z is None:
samples = []
for t in range(time_steps):
Ft = F[:, t:t+1, :, :]
Ht = H[:, t:t+1, :, :]
bt = b[:, t:t+1, :, :]
Qt = Q[:, t:t+1, :, :]
Rt = R[:, t:t+1, :, :]
l, P, zt = kalman_step(Ft, Ht, bt, Qt, Rt, l, P)
samples.append(zt)
return samples
else:
log_probs = []
for t in range(time_steps):
Ft = F[:, t:t+1, :, :]
Ht = H[:, t:t+1, :, :]
bt = b[:, t:t+1, :, :]
Qt = Q[:, t:t+1, :, :]
Rt = R[:, t:t+1, :, :]
zt = z[:, t:t+1, :, :]
l, P, log_prob = kalman_step(Ft, Ht, bt, Qt, Rt, l, P, zt)
log_probs.append(log_prob)
loss = -tf.reduce_sum(log_probs)
return l, P, loss
實(shí)例化模型,指定優(yōu)化器他宛,就可以訓(xùn)練了:
LSTM_UNITS = 16
LATENT_DIM = 10
EPOCHS = 10
# 實(shí)例化模型
model = DeepState(LSTM_UNITS, LATENT_DIM)
# 指定優(yōu)化器
optimizer = tf.keras.optimizers.Adam()
# 定義訓(xùn)練步
def train_step(x, z):
with tf.GradientTape() as tape:
params, _ = model(x)
_, _, loss_value = kalman_filtering(*params, z)
gradients = tape.gradient(loss_value, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss_value.numpy()
# 數(shù)據(jù)處理(略)
# train_data = do_something()
# 訓(xùn)練
for epoch in range(EPOCHS):
loss = []
for x, z in train_data:
loss.append(train_step(x, z))
print('Epoch %d, Loss %.4f' % (epoch + 1, np.mean(loss))
為了驗(yàn)證代碼是否有效船侧,我們使用一個(gè)人工生成的時(shí)間序列進(jìn)行訓(xùn)練,下圖展示了這個(gè)序列的一部分?jǐn)?shù)據(jù)點(diǎn)厅各。
經(jīng)過訓(xùn)練后用于預(yù)測(cè)镜撩,效果如下圖所示,其中陰影部分表示 0.05 分位數(shù) ~ 0.95 分位數(shù)的區(qū)間队塘。
與 DeepAR 對(duì)比
- DeepAR 學(xué)習(xí)的是概率分布的參數(shù)袁梗,DeepState 學(xué)習(xí)的是狀態(tài)空間模型的參數(shù),因而 DeepState 具有更強(qiáng)的先驗(yàn)憔古,在這個(gè)先驗(yàn)正確的前提下遮怜,理論上需要的訓(xùn)練數(shù)據(jù)應(yīng)該更少一些。
- DeepAR 將當(dāng)前時(shí)間步的目標(biāo)值作為下一個(gè)時(shí)間步的輸入鸿市,因而更容易受異常值的干擾锯梁,魯棒性不如 DeepState。這種網(wǎng)絡(luò)設(shè)計(jì)也導(dǎo)致了在預(yù)測(cè)階段灸芳,每進(jìn)行一輪采樣涝桅,DeepAR 都要重新展開循環(huán)神經(jīng)網(wǎng)絡(luò)計(jì)算后驗(yàn)分布的參數(shù)。相比之下烙样,DeepState 只需要使用網(wǎng)絡(luò)計(jì)算一次狀態(tài)空間模型的參數(shù)即可進(jìn)行多輪采樣[2]冯遂。
- DeepSate 基于線性高斯?fàn)顟B(tài)空間模型,很難推廣到其它分布谒获。DeepAR 則不存在這個(gè)問題蛤肌,你可以根據(jù)數(shù)據(jù)特點(diǎn)選擇合適的分布。