import torch
import math
import torch.nn as nn
def sequence_mask(X, valid_len, value=-1e9):
"""根據(jù)valid_len將X中的非關(guān)聯(lián)元素設(shè)為value败潦。
args:
X: torch.Tensor, 輸入的張量,形狀為(batch_size * Q_timesteps, K_timesteps)
valid_len: torch.Tensor, 有效長(zhǎng)度准脂,形狀為(batch_size*time_steps,)
value: float, 要替換的值, 默認(rèn)為-1e9, 用于softmax操作, 使得非關(guān)聯(lián)元素接近0
return:
torch.Tensor, 返回更新后的X
"""
maxlen = X.size(1)
mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None, :]
mask = mask < valid_len[:, None]
X[~mask] = value
return X
def masked_softmax(X, valid_lens):
"""這個(gè)函數(shù)在張量`X`的最后一個(gè)軸上執(zhí)行softmax操作劫扒,但在此之前,它會(huì)根據(jù)`valid_lens`屏蔽某些元素狸膏。
args:
X: torch.Tensor, 輸入張量沟饥,形狀為(batch_size, Q_timesteps, K_timesteps)
valid_lens: torch.Tensor, 有效長(zhǎng)度,形狀為(batch_size,) 即直接表示每個(gè)輸入序列上的有效長(zhǎng)度,或者 (batch_size, num_steps)即每個(gè)輸入序列的每個(gè)時(shí)間點(diǎn)的有效長(zhǎng)度
return:
torch.Tensor, 返回softmax操作后的張量
"""
# `X`: 3D張量, `valid_lens`: 1D或2D張量
# 如果`valid_lens`為None贤旷,表示無(wú)需屏蔽任何元素广料,我們只需在`X`的最后一個(gè)軸上執(zhí)行常規(guī)的softmax操作。
if valid_lens is None:
return nn.functional.softmax(X, dim=-1)
else:
# 存儲(chǔ)`X`的形狀以備后用幼驶。
shape = X.shape
# 如果`valid_lens`是1D張量艾杏,我們將`valid_lens`中的每個(gè)元素重復(fù)`shape[1]`次。
# 這是因?yàn)槲覀兿雱?chuàng)建一個(gè)與`X`的第二個(gè)維度匹配的掩碼盅藻。
if valid_lens.dim() == 1:
# 如果`valid_lens`是1D張量糜颠,我們將`valid_lens`中的每個(gè)元素重復(fù)`time_steps`次,形成1D張量萧求。
valid_lens = torch.repeat_interleave(valid_lens, shape[1])
else:
# 如果`valid_lens`不是1D張量,我們將其展平并轉(zhuǎn)換為1D張量顶瞒。
valid_lens = valid_lens.reshape(-1)
# 我們將`X`重塑為2D (-1, shape[-1])夸政,并應(yīng)用`sequence_mask`函數(shù)。
# 這個(gè)函數(shù)將替換被掩碼的元素(不在有效長(zhǎng)度內(nèi)的元素)為一個(gè)非常大的負(fù)值(-1e9)榴徐。
# 當(dāng)我們稍后應(yīng)用softmax函數(shù)時(shí)守问,這些大的負(fù)值將變?yōu)?,有效地“屏蔽”這些元素坑资。
X = sequence_mask(X.reshape(-1, shape[-1]), valid_lens,
value=-1e9)
# 最后耗帕,我們將`X`重塑回原來(lái)的形狀,并在最后一個(gè)軸上執(zhí)行softmax操作袱贮。
# 結(jié)果是一個(gè)與`X`形狀相同的張量仿便,但最后一個(gè)軸上的某些元素被屏蔽(設(shè)為0)。
return nn.functional.softmax(X.reshape(shape), dim=-1)
# X = torch.arange(24).reshape(2, 12).type(torch.float32)
# valid_lens = torch.tensor([3, 2])
# ms = masked_softmax(X, valid_lens)
# print(ms)
class DotProductAttention(nn.Module):
"""
這是一個(gè)實(shí)現(xiàn)了縮放點(diǎn)積注意力機(jī)制的類(lèi)攒巍。
"""
def __init__(self, dropout, **kwargs):
super(DotProductAttention, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
def forward(self, queries, keys, values, valid_lens=None):
"""前向傳播函數(shù)嗽仪,接收查詢、鍵柒莉、值和有效長(zhǎng)度作為輸入闻坚。
Args:
queries: torch.Tensor, 查詢張量,形狀為(batch_size, num_Q_timesteps, d_model)
keys: torch.Tensor, 鍵張量兢孝,形狀為(batch_size, num_K_timesteps, d_model)
values: torch.Tensor, 值張量窿凤,形狀為(batch_size, num_V_timesteps, d_model)
valid_lens: torch.Tensor, 有效長(zhǎng)度,形狀為(batch_size,) 或 (batch_size, num_Q_timesteps)
Returns:
torch.Tensor, 返回輸出張量跨蟹,形狀為(batch_size, num_Q_timesteps, d_model)
"""
# 獲取查詢的最后一個(gè)維度的大小雳殊,即d_model。
d = queries.shape[-1]
# 計(jì)算查詢和鍵的點(diǎn)積喷市,然后除以sqrt(d)進(jìn)行縮放相种,得到得分。
# 使用`transpose`函數(shù)交換鍵的最后兩個(gè)維度,以便進(jìn)行矩陣乘法寝并。
scores = torch.bmm(queries, keys.transpose(1,2)) / math.sqrt(d)
# 使用`masked_softmax`函數(shù)對(duì)得分進(jìn)行softmax操作并生成掩碼箫措,得到注意力權(quán)重。
self.attention_weights = masked_softmax(scores, valid_lens)
# 將注意力權(quán)重應(yīng)用到值上衬潦,得到輸出斤蔓。在應(yīng)用注意力權(quán)重之前,先對(duì)其進(jìn)行dropout操作镀岛。
return torch.bmm(self.dropout(self.attention_weights), values)
def transpose_qkv(X, num_heads):
"""Transposition for parallel computation of multiple attention heads.
Defined in :numref:`sec_multihead-attention`"""
# Shape of input `X`:
# (`batch_size`, no. of queries or key-value pairs, `d_model`).
# Shape of output `X`:
# (`batch_size`, no. of queries or key-value pairs, `num_heads`,
# `d_model` / `num_heads`)
X = X.reshape(X.shape[0], X.shape[1], num_heads, -1)
# Shape of output `X`:
# (`batch_size`, `num_heads`, no. of queries or key-value pairs,
# `d_model` / `num_heads`)
X = X.permute(0, 2, 1, 3)
# Shape of `output`:
# (`batch_size` * `num_heads`, no. of queries or key-value pairs,
# `d_model` / `num_heads`)
return X.reshape(-1, X.shape[2], X.shape[3])
def transpose_output(X, num_heads):
"""Reverse the operation of `transpose_qkv`.
Defined in :numref:`sec_multihead-attention`"""
X = X.reshape(-1, num_heads, X.shape[1], X.shape[2])
X = X.permute(0, 2, 1, 3)
return X.reshape(X.shape[0], X.shape[1], -1)
class MultiHeadAttention(nn.Module):
"""Multi-head attention.
Defined in :numref:`sec_multihead-attention`"""
def __init__(self, key_size, query_size, value_size, d_model,
num_heads, dropout, bias=False, **kwargs):
super(MultiHeadAttention, self).__init__(**kwargs)
self.num_heads = num_heads
self.attention = DotProductAttention(dropout)
self.W_q = nn.Linear(query_size, d_model, bias=bias)
self.W_k = nn.Linear(key_size, d_model, bias=bias)
self.W_v = nn.Linear(value_size, d_model, bias=bias)
self.W_o = nn.Linear(d_model, d_model, bias=bias)
def forward(self, queries, keys, values, valid_lens):
# Shape of `queries`, `keys`, or `values`:
# (`batch_size`, no. of queries or key-value pairs, `d_model`)
# Shape of `valid_lens`:
# (`batch_size`,) or (`batch_size`, no. of queries)
# After transposing, shape of output `queries`, `keys`, or `values`:
# (`batch_size` * `num_heads`, no. of queries or key-value pairs,
# `d_model` / `num_heads`)
queries = transpose_qkv(self.W_q(queries), self.num_heads)
keys = transpose_qkv(self.W_k(keys), self.num_heads)
values = transpose_qkv(self.W_v(values), self.num_heads)
if valid_lens is not None:
# On axis 0, copy the first item (scalar or vector) for
# `num_heads` times, then copy the next item, and so on
valid_lens = torch.repeat_interleave(
valid_lens, repeats=self.num_heads, dim=0)
# Shape of `output`: (`batch_size` * `num_heads`, no. of queries,
# `d_model` / `num_heads`)
output = self.attention(queries, keys, values, valid_lens)
# Shape of `output_concat`:
# (`batch_size`, no. of queries, `d_model`)
output_concat = transpose_output(output, self.num_heads)
return self.W_o(output_concat)
class PositionWiseFFN(nn.Module):
"""Positionwise feed-forward network.
Defined in :numref:`sec_transformer`"""
def __init__(self, ffn_num_input, ffn_num_hiddens, ffn_num_outputs,
**kwargs):
super(PositionWiseFFN, self).__init__(**kwargs)
self.dense1 = nn.Linear(ffn_num_input, ffn_num_hiddens)
self.relu = nn.ReLU()
self.dense2 = nn.Linear(ffn_num_hiddens, ffn_num_outputs)
def forward(self, X):
return self.dense2(self.relu(self.dense1(X)))
# ffn = PositionWiseFFN(4, 4, 8)
# ffn.eval()
# ffn(torch.ones((2, 3, 4)))
class AddNorm(nn.Module):
"""Residual connection followed by layer normalization.
Defined in :numref:`sec_transformer`"""
def __init__(self, normalized_shape, dropout, **kwargs):
super(AddNorm, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
self.ln = nn.LayerNorm(normalized_shape)
def forward(self, X, Y):
return self.ln(self.dropout(Y) + X)
class EncoderBlock(nn.Module):
"""Transformer encoder block.
Defined in :numref:`sec_transformer`"""
def __init__(self, key_size, query_size, value_size, d_model,
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
dropout, use_bias=False, **kwargs):
super(EncoderBlock, self).__init__(**kwargs)
self.attention = MultiHeadAttention(
key_size, query_size, value_size, d_model, num_heads, dropout,
use_bias)
self.addnorm1 = AddNorm(norm_shape, dropout)
self.ffn = PositionWiseFFN(
ffn_num_input, ffn_num_hiddens, d_model)
self.addnorm2 = AddNorm(norm_shape, dropout)
def forward(self, X, valid_lens):
Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))
return self.addnorm2(Y, self.ffn(Y))
X = torch.ones((2, 100, 24))
valid_lens = torch.tensor([3, 2])
encoder_blk = EncoderBlock(24, 24, 24, 24, [100, 24], 24, 48, 8, 0.5)
encoder_blk.eval()
encoder_blk(X, valid_lens).shape
class PositionalEncoding(nn.Module):
"""Positional encoding.
Defined in :numref:`sec_self-attention-and-positional-encoding`"""
def __init__(self, d_model, dropout, max_len=1000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(dropout)
# Create a long enough `P`
self.P = torch.zeros((1, max_len, d_model))
X = torch.arange(max_len, dtype=torch.float32).reshape(
-1, 1) / torch.pow(10000, torch.arange(
0, d_model, 2, dtype=torch.float32) / d_model)
self.P[:, :, 0::2] = torch.sin(X)
self.P[:, :, 1::2] = torch.cos(X)
def forward(self, X):
X = X + self.P[:, :X.shape[1], :].to(X.device)
return self.dropout(X)
class TransformerEncoder(nn.Module):
"""Transformer encoder.
Defined in :numref:`sec_transformer`"""
def __init__(self, vocab_size, key_size, query_size, value_size,
d_model, norm_shape, ffn_num_input, ffn_num_hiddens,
num_heads, num_layers, dropout, use_bias=False, **kwargs):
super(TransformerEncoder, self).__init__(**kwargs)
self.d_model = d_model
self.embedding = nn.Embedding(vocab_size, d_model)
self.pos_encoding = PositionalEncoding(d_model, dropout)
self.blks = nn.Sequential()
for i in range(num_layers):
self.blks.add_module("block"+str(i),
EncoderBlock(key_size, query_size, value_size, d_model,
norm_shape, ffn_num_input, ffn_num_hiddens,
num_heads, dropout, use_bias))
def forward(self, X, valid_lens, *args):
# Since positional encoding values are between -1 and 1, the embedding
# values are multiplied by the square root of the embedding dimension
# to rescale before they are summed up
X = self.pos_encoding(self.embedding(X) * math.sqrt(self.d_model))
self.attention_weights = [None] * len(self.blks)
for i, blk in enumerate(self.blks):
X = blk(X, valid_lens)
self.attention_weights[
i] = blk.attention.attention.attention_weights
return X
# encoder = TransformerEncoder(200, 24, 24, 24, 24, [100, 24], 24, 48, 8, 2, 0.5)
# encoder.eval()
# encoder(torch.ones((2, 100), dtype=torch.long), valid_lens).shape
class DecoderBlock(nn.Module):
def __init__(self, key_size, query_size, value_size, d_model,
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
dropout, i, **kwargs):
super(DecoderBlock, self).__init__(**kwargs)
self.i = i
self.attention1 = MultiHeadAttention(
key_size, query_size, value_size, d_model, num_heads, dropout)
self.addnorm1 = AddNorm(norm_shape, dropout)
self.attention2 = MultiHeadAttention(
key_size, query_size, value_size, d_model, num_heads, dropout)
self.addnorm2 = AddNorm(norm_shape, dropout)
self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens,
d_model)
self.addnorm3 = AddNorm(norm_shape, dropout)
def forward(self, X, state):
enc_outputs, enc_valid_lens = state[0], state[1]
# During training, all the tokens of any output sequence are processed
# at the same time, so `state[2][self.i]` is `None` as initialized.
# During prediction, `state[2][self.i]` contains previous tokens of
# the output sequence
if state[2][self.i] is None: # 當(dāng)前是訓(xùn)練階段弦牡,或者第一次預(yù)測(cè)
key_values = X
else: # 當(dāng)前是預(yù)測(cè)階段
key_values = torch.cat((state[2][self.i], X), axis=1)
state[2][self.i] = key_values
if self.training:
batch_size, num_steps, _ = X.shape
# Shape of `dec_valid_lens`: (`batch_size`, `num_steps`), where
# every row is [1, 2, ..., `num_steps`]
dec_valid_lens = torch.arange(1, num_steps + 1,
device=X.device).repeat(batch_size, 1)
else:
dec_valid_lens = None
# Self-attention
X2 = self.attention1(X, key_values, key_values, dec_valid_lens)
Y = self.addnorm1(X, X2)
# Encoder-decoder attention. Shape of `enc_outputs`:
# (`batch_size`, `num_steps`, `d_model`)
Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens)
Z = self.addnorm2(Y, Y2)
return self.addnorm3(Z, self.ffn(Z)), state
class TransformerDecoder(nn.Module):
def __init__(self, vocab_size, key_size, query_size, value_size,
d_model, norm_shape, ffn_num_input, ffn_num_hiddens,
num_heads, num_layers, dropout, **kwargs):
super(TransformerDecoder, self).__init__(**kwargs)
self.d_model = d_model
self.num_layers = num_layers
self.embedding = nn.Embedding(vocab_size, d_model)
self.pos_encoding = PositionalEncoding(d_model, dropout)
self.blks = nn.Sequential()
for i in range(num_layers):
self.blks.add_module("block"+str(i),
DecoderBlock(key_size, query_size, value_size, d_model,
norm_shape, ffn_num_input, ffn_num_hiddens,
num_heads, dropout, i))
self.dense = nn.Linear(d_model, vocab_size)
def init_state(self, enc_outputs, env_valid_lens, *args):
return [enc_outputs, env_valid_lens, [None]*self.num_layers]
def forward(self, X, state):
X = self.pos_encoding(self.embedding(X) * math.sqrt(self.d_model))
self._attention_weights = [[None, None] for _ in range(self.num_layers)]
for i, blk in enumerate(self.blks):
X, state = blk(X, state)
# Decoder self-attention weights
self._attention_weights[i][0] = blk.attention1.attention.attention_weights
# Encoder-decoder attention weights
self._attention_weights[i][1] = blk.attention2.attention.attention_weights
return self.dense(X), state
@property
def attention_weights(self):
return self._attention_weights
class Transformer(nn.Module):
def __init__(self, src_vocab, tgt_vocab, key_size, query_size, value_size,
d_model, norm_shape, ffn_num_input, ffn_num_hiddens,
num_heads, num_layers, dropout, **kwargs):
super(Transformer, self).__init__(**kwargs)
self.encoder = TransformerEncoder(
src_vocab, key_size, query_size, value_size, d_model,
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
num_layers, dropout)
self.decoder = TransformerDecoder(
tgt_vocab, key_size, query_size, value_size, d_model,
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
num_layers, dropout)
def forward(self, src, tgt, src_valid_lens):
enc_outputs = self.encoder(src, src_valid_lens)
return self.decoder(tgt, self.decoder.init_state(enc_outputs, src_valid_lens))
import torch
from tf_learn import Transformer
import d2l.torch as d2l
d_model, num_layers, dropout, batch_size, num_steps = 32, 2, 0.1, 64, 10
lr, num_epochs, device = 0.005, 200, d2l.try_gpu()
ffn_num_input, ffn_num_hiddens, num_heads = d_model, 64, 4
key_size, query_size, value_size = d_model, d_model, d_model
norm_shape = [d_model] # layer normalization shape
train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)
net = Transformer(len(src_vocab), len(tgt_vocab), key_size, query_size,
value_size, d_model, norm_shape, ffn_num_input,
ffn_num_hiddens, num_heads, num_layers, dropout)
# net = d2l.EncoderDecoder(encoder, decoder)
d2l.train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
# save model
torch.save(net.state_dict(), 'transformer.pth')
import torch
from tf_learn import Transformer
import d2l.torch as d2l
d_model, num_layers, dropout, batch_size, num_steps = 32, 2, 0.1, 64, 10
lr, num_epochs, device = 0.005, 200, d2l.try_gpu()
ffn_num_input, ffn_num_hiddens, num_heads = d_model, 64, 4
key_size, query_size, value_size = d_model, d_model, d_model
norm_shape = [d_model] # layer normalization shape
train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)
net = Transformer(len(src_vocab), len(tgt_vocab), key_size, query_size,
value_size, d_model, norm_shape, ffn_num_input,
ffn_num_hiddens, num_heads, num_layers, dropout)
net.load_state_dict(torch.load('transformer.pth'))
net.to(device)
net.eval()
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
translation, dec_attention_weight_seq = d2l.predict_seq2seq(
net, eng, src_vocab, tgt_vocab, num_steps, device, True)
print(f'{eng} => {translation}, ',
f'bleu {d2l.bleu(translation, fra, k=2):.3f}')