在本實(shí)驗(yàn)中構(gòu)造了一系列正弦曲線的變形:
X=np.sin(T, dtype=np.float32)0.01np.random.rand()(T+np.random.rand()10)+np.random.rand()*10
樣子的話,大概是:
image.png
預(yù)測(cè)的任務(wù)是伶授,已知某條線前N個(gè)數(shù)(N為400到1600之間的隨機(jī)數(shù))嬉探,預(yù)測(cè)接下來(lái)的200個(gè)數(shù)臼寄。也就是我們要處理一個(gè)變長(zhǎng)的輸入伪朽,這就需要用到ragedtensor。
這里面的主要知識(shí)點(diǎn)可以被概括為:
- Raged tensor的使用铭乾。這里采用了from_tensor方法構(gòu)造出了一個(gè)Raged tensor來(lái)存儲(chǔ)一系列長(zhǎng)度不一致的序列剪廉。
- 利用karas處理變長(zhǎng)序列。由于karas中的LSTM是更高級(jí)的API炕檩,因此可以直接對(duì)Raged tensor進(jìn)行處理斗蒋。
- Seq2seq框架。這里采用的是tensorflow_addons中的seq2seq框架笛质,由于此框架本身對(duì)文本處理的支持比較多泉沾,在應(yīng)對(duì)我們的(相對(duì)簡(jiǎn)單的)實(shí)數(shù)序列的案例中,反而需要進(jìn)行一些定制经瓷。主要是對(duì)sampler的定制爆哑。
- Earlystop機(jī)制。直接采用karas的earlystop即可舆吮。
預(yù)測(cè)效果:
image.png
完整代碼
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
T=np.arange(2000)*0.05
X=np.sin(T, dtype=np.float32)*0.01*np.random.rand()*(T+np.random.rand()*10)+np.random.rand()*10
plt.plot(X)
batch_size = 32
max_time = 200
hidden_size = 128
YLEN=max_time
def generateTrain(records=32):
trainXs=[]
trainYs=[]
XLens=[]
YLens=[]
for i in range(records):
X=(np.sin(T)*0.01*np.random.rand()*(T+np.random.rand()*10)+np.random.rand()*10).astype(np.float32)
xi=X[:1800].copy()
sepre=np.random.randint(400,1600)
xi[sepre::]=0
yi=X[sepre:sepre+YLEN].copy()
trainXs.append(np.expand_dims(xi,axis=0))
trainYs.append(np.expand_dims(yi,axis=0))
XLens.append(sepre)
YLens.append(YLEN)
trainX=np.concatenate(trainXs,axis=0)
trainY=np.concatenate(trainYs,axis=0)
seqLen=np.array(XLens)
outLen=np.array(YLens)
return (trainX,trainY,seqLen,outLen)
def generateTest():
X=(np.sin(T)*0.01*np.random.rand()*(T+np.random.rand()*10)+np.random.rand()*10).astype(np.float32)
xi=X[:1800].copy()
yi=X[1800:1800+YLEN].copy()
testX=np.expand_dims(xi, axis=0)
testY=np.expand_dims(yi, axis=0)
seqLen_test=np.array([1800])
outLen_test=np.array([YLEN])
return (testX,testY,seqLen_test,outLen_test)
def gen_raged_train():
while True:
trainX,trainY,seqLen,_ = generateTrain()
raged_train_x = tf.RaggedTensor.from_tensor(trainX, lengths=seqLen)
raged_train_x = tf.expand_dims(raged_train_x, -1)
tensor_train_y = tf.convert_to_tensor(trainY)
yield raged_train_x,tensor_train_y
dataset = tf.data.Dataset.from_generator(
gen_raged_train,
output_signature=(
tf.RaggedTensorSpec(shape=(batch_size, None, 1), dtype=tf.float32, ragged_rank=1),
tf.TensorSpec(shape=(batch_size, 200,), dtype=tf.float32))
)
import tensorflow_addons as tfa
import tensorflow as tf
inputs = tf.keras.layers.Input(shape=[None, 1], ragged=True)
encoding, state_h, state_c = tf.keras.layers.LSTM(hidden_size, return_state=True)(inputs)
encoder_state = [state_h, state_c]
decoder_cell = tf.keras.layers.LSTMCell(hidden_size)
sample_fn = lambda x: x
end_fn = lambda x:False
sampler = tfa.seq2seq.InferenceSampler(sample_fn = sample_fn, sample_shape=[hidden_size],sample_dtype=tf.float32,end_fn=end_fn)
decoder = tfa.seq2seq.BasicDecoder(decoder_cell, sampler, maximum_iterations=200)
input_lengths = tf.fill([batch_size], max_time)
initial_state = decoder_cell.get_initial_state(encoding)
output, state, lengths = decoder(
tf.convert_to_tensor(encoding), initial_state=initial_state)
logits = output.rnn_output
output_layer = tf.keras.layers.Dense(1)
out_seq = tf.squeeze(output_layer(logits))
print(out_seq.shape)
model = tf.keras.Model(inputs=inputs, outputs=out_seq)
model.compile(optimizer="Adam", loss="mse", metrics=["mse", "mae", "mape"])
early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', patience=100)
model.fit(dataset.take(128),validation_data=val_data, epochs=1024,callbacks=[early_stop])
model.save('saved_model/my_model')
plt.figure()
plt.plot(X)
plt.plot([np.nan]*2000+model.predict(np.expand_dims(X,0)).tolist())