循环神经网络

本内容参考动手学深度学习[1]

在学习循环神经网络$(Recurrent Neural Network, RNN)$之前先了解一下序列模型,序列模型是专门用于处理和预测序列数据的模型。在自然语言处理、音频处理和时间序列处理有广泛的应用。

举例来说,用$x_{t}$表示价格,即在时间步$t \in \mathbb{Z}^+$,观察价格$x_{t}$,假设交易员想在$t$日预测股市的价格,可以表现为:
$$
x_t \sim P(x_t \mid x_{t-1}, \ldots, x_1).
$$

自回归模型

自回归模型:假设在现实情况下$x_{t-1}, \ldots, x_1$是不必要的,因此只需要满足长度为$\tau$的时间跨度, 即使用观测序列$x_{t-1}, \ldots, x_{t-\tau}$。当下获得的最直接的好处就是参数的数量总是不变的,至少在$t > \tau$是如此,这种模型被称为自回归模型$(autoregressive models)$,因为它对自己执行回归。

隐变量自回归模型:是保留一些对过去观测的总结$h_{t}$, 并且同时更新预测$\hat{x}t$和总结$h{t}$。这就产生了基于$\hat{x}t = P(x_t \mid h{t})$估计$x_t$,以及公式$h_t = g(h_{t-1}, x_{t-1})$更新模型,因为$h_{t}$从未被观测到,这种模型被称为隐变量自回归模型$(latent autoregressive models)$

马尔可夫模型

在自回归模型的近似法中,使用$x_{t-1}, \ldots, x_{t-\tau}$来估计$x_{t}$ ,而不是$x_{t-1}, \ldots, x_1$,只要这种是近似精确的,我们就说满足马尔可夫条件。特殊的,如果$\tau$<1得到一阶马尔可夫模型$first-order Markov model)$
$$
P(x_1, \ldots, x_T) = \prod_{t=1}^T P(x_t \mid x_{t-1}) \text{ 当 } P(x_1 \mid x_0) = P(x_1).
$$
基于条件概率公式,我们可以写出:
$$
P(x_1, \ldots, x_T) = \prod_{t=T}^1 P(x_t \mid x_{t+1}, \ldots, x_T).
$$

文本预处理

读取数据集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#@save
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
'090b5e7e70c295757f55df93cb0a180b9691891a')

def read_time_machine(): #@save
"""将时间机器数据集加载到文本行的列表中"""
with open(d2l.download('time_machine'), 'r') as f:
lines = f.readlines()
return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]

lines = read_time_machine()
print(f'# 文本总行数: {len(lines)}')
print(lines[0])
print(lines[10])

输出:

1
2
3
4
Downloading ../data/timemachine.txt from http://d2l-data.s3-accelerate.amazonaws.com/timemachine.txt...
# 文本总行数: 3221
the time machine by h g wells
twinkled and his usually pale face was flushed and animated the

词元化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def tokenize(lines, token='word'):  #@save
"""将文本行拆分为单词或字符词元"""
# 'word':按单词拆分(默认值)
if token == 'word':
return [line.split() for line in lines]
# 'char':按字符拆分
elif token == 'char':
return [list(line) for line in lines]
else:
print('错误:未知词元类型:' + token)

tokens = tokenize(lines)
for i in range(11):
print(tokens[i])

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 示例文本
lines = [
"Hello world",
"This is a test.",
"Tokenize me!",
"How are you?",
"I hope this works!",
"Let's see the output.",
"This function is simple.",
"Please split by words.",
"Test the character tokenization.",
"What do you think?",
"Enjoy learning!"
]

# 按单词拆分
tokens = tokenize(lines, token='word')

# 输出前 11 行拆分结果
for i in range(11):
print(tokens[i])

输出:

[‘Hello’, ‘world’]
[‘This’, ‘is’, ‘a’, ‘test.’]
[‘Tokenize’, ‘me!’]
[‘How’, ‘are’, ‘you?’]
[‘I’, ‘hope’, ‘this’, ‘works!’]
[‘Let’s’, ‘see’, ‘the’, ‘output.’]
[‘This’, ‘function’, ‘is’, ‘simple.’]
[‘Please’, ‘split’, ‘by’, ‘words.’]
[‘Test’, ‘the’, ‘character’, ‘tokenization.’]
[‘What’, ‘do’, ‘you’, ‘think?’]
[‘Enjoy’, ‘learning!’]

词表

将字符串类型得词元映射到从0开始的数字索引中。根据词元出现的频率,分配数字索引。语料库中不存在或已删除的任何词元都将映射到一个特定的未知词元“”。同时增加一个列表,用于保存那些被保留的词元, 例如:填充词元(“”); 序列开始词元(“”); 序列结束词元(“”)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class Vocab:  #@save
"""文本词表"""
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
# token词元列表
if tokens is None:
tokens = []
# 保留词元列表
if reserved_tokens is None:
reserved_tokens = []
# 按出现频率排序
counter = count_corpus(tokens)
self._token_freqs = sorted(counter.items(), key=lambda x: x[1],
reverse=True)
# 未知词元的索引为0
self.idx_to_token = ['<unk>'] + reserved_tokens
self.token_to_idx = {token: idx
for idx, token in enumerate(self.idx_to_token)}
for token, freq in self._token_freqs:
if freq < min_freq:
break
if token not in self.token_to_idx:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1

def __len__(self):
return len(self.idx_to_token)

def __getitem__(self, tokens):
if not isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]

def to_tokens(self, indices):
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]

@property
def unk(self): # 未知词元的索引为0
return 0

@property
def token_freqs(self):
return self._token_freqs

def count_corpus(tokens): #@save
"""统计词元的频率"""
# 这里的tokens是1D列表或2D列表
if len(tokens) == 0 or isinstance(tokens[0], list):
# 将词元列表展平成一个列表
tokens = [token for line in tokens for token in line]
return collections.Counter(tokens)

整合所有功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def load_corpus_time_machine(max_tokens=-1):  #@save
"""返回时光机器数据集的词元索引列表和词表"""
lines = read_time_machine()
tokens = tokenize(lines, 'char')
vocab = Vocab(tokens)
# 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
# 所以将所有文本行展平到一个列表中
corpus = [vocab[token] for line in tokens for token in line]
if max_tokens > 0:
corpus = corpus[:max_tokens]
return corpus, vocab

corpus, vocab = load_corpus_time_machine()
len(corpus), len(vocab)

语言模型

根据上面提到的自回归模型和马尔可夫模型,将语言模型定义为:
$$
P(x_1, x_2, \ldots, x_T) = \prod_{t=1}^T P(x_t \mid x_1, \ldots, x_{t-1}).
$$
由此,预测四个单词的文本序列的概率为:
$$
P(\text{deep}, \text{learning}, \text{is}, \text{fun}) = P(\text{deep}) P(\text{learning} \mid \text{deep}) P(\text{is} \mid \text{deep}, \text{learning}) P(\text{fun} \mid \text{deep}, \text{learning}, \text{is}).
$$
为了训练语言模型,需要计算单词的概率, 以及给定前面几个单词后出现某个单词的条件概率。训练中次的出现概率可表示为,其中$n(x)$和$n(x,x′)$分别是单个单词和连续单词对的出现次数。
$$
\hat{P}(\text{learning} \mid \text{deep}) = \frac{n(\text{deep, learning})}{n(\text{deep})},
$$
通常,连续单词对要比“deep learning”出现频率要低很多,所以对于不常见的组合,要想找到足够的出现次数来获得估计会非常不容易。三个以上的单词组合,预测的结果会更差。为了缓解这种问题,Wood, F等人[2]提出如下公式。其中$\epsilon_1,\epsilon_2,\epsilon_3$为超参数, 以$\epsilon_1$为例:当$\epsilon_1 =0$时,不应用平滑; 当ϵ1接近正无穷大时,$\hat{P}(x)$接近均匀概率分布1/m。
$$
\begin{split}\begin{aligned}
\hat{P}(x) & = \frac{n(x) + \epsilon_1/m}{n + \epsilon_1}, \
\hat{P}(x’ \mid x) & = \frac{n(x, x’) + \epsilon_2 \hat{P}(x’)}{n(x) + \epsilon_2}, \
\hat{P}(x’’ \mid x,x’) & = \frac{n(x, x’,x’’) + \epsilon_3 \hat{P}(x’’)}{n(x, x’) + \epsilon_3}.
\end{aligned}\end{split}
$$
然而,这样的模型很容易变得无效,原因如下: 首先,我们需要存储所有的计数; 其次,这完全忽略了单词的意思。 例如,“猫”(cat)和“猫科动物”(feline)可能出现在相关的上下文中, 但是想根据上下文调整这类模型其实是相当困难的。 最后,长单词序列大部分是没出现过的, 因此一个模型如果只是简单地统计先前“看到”的单词序列频率, 那么模型面对这种问题肯定是表现不佳的。

马尔可夫模型与n元语法

如果$P(x_{t+1} \mid x_t, \ldots, x_1) = P(x_{t+1} \mid x_t)$, 则序列上的分布满足一阶马尔可夫性质。 阶数越高,对应的依赖关系就越长。

$$
\begin{split}\begin{aligned}
P(x_1, x_2, x_3, x_4) &= P(x_1) P(x_2) P(x_3) P(x_4),\
P(x_1, x_2, x_3, x_4) &= P(x_1) P(x_2 \mid x_1) P(x_3 \mid x_2) P(x_4 \mid x_3),\
P(x_1, x_2, x_3, x_4) &= P(x_1) P(x_2 \mid x_1) P(x_3 \mid x_1, x_2) P(x_4 \mid x_2, x_3).
\end{aligned}\end{split}
$$

循环神经网络

循环神经网络(recurrent neural networks,RNNs) 是具有隐状态的神经网络。与感知机不同的是,保存了前一个时间步的隐藏变量$\mathbf{H}{t-1}$,并引入了一个新的权重参数$\mathbf{W}{hh} \in \mathbb{R}^{h \times h}$当前时间步隐藏变量由当前时间步的输入 与前一个时间步的隐藏变量一起计算得出:

image-20241116184514051
$$
\mathbf{H}t = \phi(\mathbf{X}t \mathbf{W}{xh} + \mathbf{H}{t-1} \mathbf{W}_{hh} + \mathbf{b}_h).
$$

$$
\mathbf{O}_t = \mathbf{H}t \mathbf{W}{hq} + \mathbf{b}_q.
$$

循环神经网络的损失函数(困惑度)

$$
\frac{1}{n} \sum_{t=1}^n -\log P(x_t \mid x_{t-1}, \ldots, x_1),
$$

其中P表示语言模型的预测概率,$x_{t}$表示真实值。

循环神经网络的应用

image-20241116185023652

代码实现(手动)

整体模型架构

绘图1

数据加载

1
2
3
4
5
6
7
8
9
%matplotlib inline
import math
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

初始化模型参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def get_params(vocab_size, num_hiddens, device):
num_inputs = num_outputs = vocab_size

def normal(shape):
return torch.randn(size=shape, device=device) * 0.01

# 隐藏层参数
W_xh = normal((num_inputs, num_hiddens))
W_hh = normal((num_hiddens, num_hiddens))
b_h = torch.zeros(num_hiddens, device=device)
# 输出层参数
W_hq = normal((num_hiddens, num_outputs))
b_q = torch.zeros(num_outputs, device=device)
# 附加梯度
params = [W_xh, W_hh, b_h, W_hq, b_q]
for param in params:
param.requires_grad_(True)
return params

在一个时间步内计算隐状态和输出

1
2
3
4
5
6
7
8
9
10
11
def rnn(inputs, state, params):
# inputs的形状:(时间步数量,批量大小,词表大小)
W_xh, W_hh, b_h, W_hq, b_q = params
H, = state
outputs = []
# X的形状:(批量大小,词表大小)
for X in inputs:
H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
Y = torch.mm(H, W_hq) + b_q
outputs.append(Y)
return torch.cat(outputs, dim=0), (H,)

创建类包装函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class RNNModelScratch: #@save
"""从零开始实现的循环神经网络模型"""
def __init__(self, vocab_size, num_hiddens, device,
get_params, init_state, forward_fn):
self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
self.params = get_params(vocab_size, num_hiddens, device)
self.init_state, self.forward_fn = init_state, forward_fn

def __call__(self, X, state):
X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
return self.forward_fn(X, state, self.params)

def begin_state(self, batch_size, device):
return self.init_state(batch_size, self.num_hiddens, device)

预测函数

1
2
3
4
5
6
7
8
9
10
11
12
def predict_ch8(prefix, num_preds, net, vocab, device):  #@save
"""在prefix后面生成新字符"""
state = net.begin_state(batch_size=1, device=device)
outputs = [vocab[prefix[0]]]
get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
for y in prefix[1:]: # 预热期
_, state = net(get_input(), state)
outputs.append(vocab[y])
for _ in range(num_preds): # 预测num_preds步
y, state = net(get_input(), state)
outputs.append(int(y.argmax(dim=1).reshape(1)))
return ''.join([vocab.idx_to_token[i] for i in outputs])

梯度裁剪

$$
\mathbf{g} \leftarrow \min\left(1, \frac{\theta}{|\mathbf{g}|}\right) \mathbf{g}.
$$

1
2
3
4
5
6
7
8
9
10
def grad_clipping(net, theta):  #@save
"""裁剪梯度"""
if isinstance(net, nn.Module):
params = [p for p in net.parameters() if p.requires_grad]
else:
params = net.params
norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
if norm > theta:
for param in params:
param.grad[:] *= theta / norm

训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#@save
def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
"""训练网络一个迭代周期(定义见第8章)"""
state, timer = None, d2l.Timer()
metric = d2l.Accumulator(2) # 训练损失之和,词元数量
for X, Y in train_iter:
if state is None or use_random_iter:
# 在第一次迭代或使用随机抽样时初始化state
state = net.begin_state(batch_size=X.shape[0], device=device)
else:
if isinstance(net, nn.Module) and not isinstance(state, tuple):
# state对于nn.GRU是个张量
state.detach_()
else:
# state对于nn.LSTM或对于我们从零开始实现的模型是个张量
for s in state:
s.detach_()
y = Y.T.reshape(-1)
X, y = X.to(device), y.to(device)
y_hat, state = net(X, state)
l = loss(y_hat, y.long()).mean()
if isinstance(updater, torch.optim.Optimizer):
updater.zero_grad()
l.backward()
grad_clipping(net, 1)
updater.step()
else:
l.backward()
grad_clipping(net, 1)
# 因为已经调用了mean函数
updater(batch_size=1)
metric.add(l * y.numel(), y.numel())
return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#@save
def train_ch8(net, train_iter, vocab, lr, num_epochs, device,
use_random_iter=False):
"""训练模型(定义见第8章)"""
loss = nn.CrossEntropyLoss()
animator = d2l.Animator(xlabel='epoch', ylabel='perplexity',
legend=['train'], xlim=[10, num_epochs])
# 初始化
if isinstance(net, nn.Module):
updater = torch.optim.SGD(net.parameters(), lr)
else:
updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size)
predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device)
# 训练和预测
for epoch in range(num_epochs):
ppl, speed = train_epoch_ch8(
net, train_iter, loss, updater, device, use_random_iter)
if (epoch + 1) % 10 == 0:
print(predict('time traveller'))
animator.add(epoch + 1, [ppl])
print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')
print(predict('time traveller'))
print(predict('traveller'))
1
2
num_epochs, lr = 500, 1
train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu())

输出:

1
2
3
困惑度 1.0, 67212.6 词元/秒 cuda:0
time traveller for so it will be convenient to speak of himwas e
travelleryou can show black is white by argument said filby

代码简洁实现

1
2
3
4
5
6
7
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)
1
2
3
4
num_hiddens = 256
rnn_layer = nn.RNN(len(vocab), num_hiddens)
# 形状是(隐藏层数,批量大小,隐藏单元数)
state = torch.zeros((1, batch_size, num_hiddens))
1
2
3
X = torch.rand(size=(num_steps, batch_size, len(vocab)))
Y, state_new = rnn_layer(X, state)
Y.shape, state_new.shape
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#@save
class RNNModel(nn.Module):
"""循环神经网络模型"""
def __init__(self, rnn_layer, vocab_size, **kwargs):
super(RNNModel, self).__init__(**kwargs)
self.rnn = rnn_layer
self.vocab_size = vocab_size
self.num_hiddens = self.rnn.hidden_size
# 如果RNN是双向的(之后将介绍),num_directions应该是2,否则应该是1,需要构造自己的输出层
if not self.rnn.bidirectional:
self.num_directions = 1
self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
else:
self.num_directions = 2
self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)

def forward(self, inputs, state):
X = F.one_hot(inputs.T.long(), self.vocab_size)
X = X.to(torch.float32)
Y, state = self.rnn(X, state)
# 全连接层首先将Y的形状改为(时间步数*批量大小,隐藏单元数)
# 它的输出形状是(时间步数*批量大小,词表大小)。
output = self.linear(Y.reshape((-1, Y.shape[-1])))
return output, state

def begin_state(self, device, batch_size=1):
if not isinstance(self.rnn, nn.LSTM):
# nn.GRU以张量作为隐状态
return torch.zeros((self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens),
device=device)
else:
# nn.LSTM以元组作为隐状态
return (torch.zeros((
self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens), device=device),
torch.zeros((
self.num_directions * self.rnn.num_layers,
batch_size, self.num_hiddens), device=device))
1
2
3
4
device = d2l.try_gpu()
net = RNNModel(rnn_layer, vocab_size=len(vocab))
net = net.to(device)
d2l.predict_ch8('time traveller', 10, net, vocab, device)

循环神经网络的应用——GRU(门控循环单元)

文章中举了一个书籍中段落的例子,各个章节可能存在逻辑中断,在这种情况下,也为缓解梯度的异常的问题,使用GRU的方法重置内部的状态。

门控循环单元与普通循环神经网路关键区别在于有专门的机制更新隐状态和重置隐状态。

重置门和更新门

image-20241117193249996

输入小批量$\mathbf{X}t \in \mathbb{R}^{n \times d}$(样本个数$n$,输入个数$d$),上一步的隐状态是$\mathbf{H}{t-1} \in \mathbb{R}^{n \times h}$则重置门$\mathbf{R}t \in \mathbb{R}^{n \times h}$和更新门$\mathbf{Z}t \in \mathbb{R}^{n \times h}$表示为
$$
\begin{split}\begin{aligned}
\mathbf{R}t = \sigma(\mathbf{X}t \mathbf{W}{xr} + \mathbf{H}{t-1} \mathbf{W}
{hr} + \mathbf{b}r),\
\mathbf{Z}t = \sigma(\mathbf{X}t \mathbf{W}{xz} + \mathbf{H}{t-1} \mathbf{W}
{hz} + \mathbf{b}z),
\end{aligned}\end{split}
$$
$\mathbf{W}
{xr}, \mathbf{W}
{xz} \in \mathbb{R}^{d \times h}$为权重,$\mathbf{b}_r, \mathbf{b}_z \in \mathbb{R}^{1 \times h}$为偏置

候选隐状态

$$
\tilde{\mathbf{H}}t = \tanh(\mathbf{X}t \mathbf{W}{xh} + \left(\mathbf{R}t \odot \mathbf{H}{t-1}\right) \mathbf{W}{hh} + \mathbf{b}_h),
$$

$\odot$ 表示哈达玛积,$W,b$分别为权重和偏置。

隐状态

上述的计算结果只是候选隐状态,我们仍然需要结合更新门Zt的效果。此时,来自$X_{t}$的信息基本上被忽略。相反,当$Z_{t}$接近0时, 新的隐状态$\mathbf{H}_t$就会接近候选隐状态$\tilde{\mathbf{H}}_t$。这些设计可以处理循环神经网络中的梯度消失问题.
$$
\mathbf{H}_t = \mathbf{Z}t \odot \mathbf{H}{t-1} + (1 - \mathbf{Z}_t) \odot \tilde{\mathbf{H}}_t.
$$

长短期记忆网络(LSTM)

输入门、忘记门、输出门

image-20241117203925653
$$
\begin{split}\begin{aligned}
\mathbf{I}t &= \sigma(\mathbf{X}t \mathbf{W}{xi} + \mathbf{H}{t-1} \mathbf{W}_{hi} + \mathbf{b}i),\
\mathbf{F}t &= \sigma(\mathbf{X}t \mathbf{W}{xf} + \mathbf{H}{t-1} \mathbf{W}
{hf} + \mathbf{b}f),\
\mathbf{O}t &= \sigma(\mathbf{X}t \mathbf{W}{xo} + \mathbf{H}{t-1} \mathbf{W}
{ho} + \mathbf{b}_o),
\end{aligned}\end{split}
$$

候选记忆单元

$$
\tilde{\mathbf{C}}t = \text{tanh}(\mathbf{X}t \mathbf{W}{xc} + \mathbf{H}{t-1} \mathbf{W}_{hc} + \mathbf{b}_c),
$$

image-20241117204012572

记忆元

$$
\mathbf{C}_t = \mathbf{F}t \odot \mathbf{C}{t-1} + \mathbf{I}_t \odot \tilde{\mathbf{C}}_t.
$$

image-20241117204048297

隐状态

$$
\mathbf{H}_t = \mathbf{O}_t \odot \tanh(\mathbf{C}_t).
$$

深度循环神经网络

基础公式为:
$$
\mathbf{H}t^{(l)} = \phi_l(\mathbf{H}t^{(l-1)} \mathbf{W}{xh}^{(l)} + \mathbf{H}{t-1}^{(l)} \mathbf{W}_{hh}^{(l)} + \mathbf{b}_h^{(l)}),
$$
最后,输出层的计算仅基于第l个隐藏层最终的隐状态:
$$
\mathbf{O}_t = \mathbf{H}t^{(L)} \mathbf{W}{hq} + \mathbf{b}_q,
$$

双向循环神经网络

$$
\begin{split}\begin{aligned}
\overrightarrow{\mathbf{H}}t &= \phi(\mathbf{X}t \mathbf{W}{xh}^{(f)} + \overrightarrow{\mathbf{H}}{t-1} \mathbf{W}_{hh}^{(f)} + \mathbf{b}h^{(f)}),\
\overleftarrow{\mathbf{H}}t &= \phi(\mathbf{X}t \mathbf{W}{xh}^{(b)} + \overleftarrow{\mathbf{H}}{t+1} \mathbf{W}
{hh}^{(b)} + \mathbf{b}_h^{(b)}),
\end{aligned}\end{split}
$$
接下来,将前向隐状态$\overrightarrow{\mathbf{H}}_t$和反向隐状态$\overleftarrow{\mathbf{H}}_t$连接,获得需要送入输出层的隐状态$\mathbf{H}_t \in \mathbb{R}^{n \times 2h}$。在具有多个隐藏层的深度双向循环神经网络中, 该信息作为输入传递到下一个双向层。 最后,输出层计算得到的输出为$\mathbf{O}_t \in \mathbb{R}^{n \times q}$($q$是输出单元的数目)
$$
\mathbf{O}_t = \mathbf{H}t \mathbf{W}{hq} + \mathbf{b}_q.
$$
连接:常规操作有拼接(Concatenation)加和(Summation)

拼接:在双向RNN中,时间步t的隐藏层输出将是这两个隐藏状态的拼接:
$$
\mathbf{H}_t = [\overrightarrow{\mathbf{H}}_t,\overleftarrow{\mathbf{H}}_t]
$$
加和:将前向和反向RNN的输出直接相加。
$$
\mathbf{H}_t = \overrightarrow{\mathbf{H}}_t+\overleftarrow{\mathbf{H}}_t
$$

1
lstm_layer = nn.LSTM(num_inputs, num_hiddens, num_layers, bidirectional=True)

bidirectional=True 双向循环网络的开关

编码器与解码器

序列转换是核心问题,为了处理输入和输出,设计一个包含两个组件的架构:编码器和解码器**(encoder-decoder)**

编码器,继承nn.module模型

1
2
3
4
5
6
7
8
9
10
11
from torch import nn


#@save
class Encoder(nn.Module):
"""编码器-解码器架构的基本编码器接口"""
def __init__(self, **kwargs):
super(Encoder, self).__init__(**kwargs)

def forward(self, X, *args):
raise NotImplementedError

解码器,新增init_state函数,用于将编码器的输出(enc_outputs)转换为编码后的状态。

1
2
3
4
5
6
7
8
9
10
11
#@save
class Decoder(nn.Module):
"""编码器-解码器架构的基本解码器接口"""
def __init__(self, **kwargs):
super(Decoder, self).__init__(**kwargs)

def init_state(self, enc_outputs, *args):
raise NotImplementedError

def forward(self, X, state):
raise NotImplementedError

合并编码器和解码器

1
2
3
4
5
6
7
8
9
10
11
12
#@save
class EncoderDecoder(nn.Module):
"""编码器-解码器架构的基类"""
def __init__(self, encoder, decoder, **kwargs):
super(EncoderDecoder, self).__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder

def forward(self, enc_X, dec_X, *args):
enc_outputs = self.encoder(enc_X, *args)
dec_state = self.decoder.init_state(enc_outputs, *args)
return self.decoder(dec_X, dec_state)

序列到序列学习

特定的“”表示序列结束词元。 一旦输出序列生成此词元,模型就会停止预测。 “”表示序列开始词元,它是解码器的输入序列的第一个词元。

编码器

$x_{t}$是文本序列中的第$t$个词元,$t$为时间步,$h_{t-1}$为上一时间步的状态。使用一个函数$f$来描述循环神经网络的循环层所做的变换:
$$
\mathbf{h}_t = f(\mathbf{x}t, \mathbf{h}{t-1}).
$$
编码器通过选定的函数$q$,将所有的时间步的隐状态转换为上下文变量:
$$
\mathbf{c} = q(\mathbf{h}_1, \ldots, \mathbf{h}_T).
$$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#@save
class Seq2SeqEncoder(d2l.Encoder):
"""用于序列到序列学习的循环神经网络编码器"""
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
dropout=0, **kwargs):
super(Seq2SeqEncoder, self).__init__(**kwargs)
# 嵌入层
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size, num_hiddens, num_layers,
dropout=dropout)

def forward(self, X, *args):
# 输出'X'的形状:(batch_size,num_steps,embed_size)
X = self.embedding(X)
# 在循环神经网络模型中,第一个轴对应于时间步
X = X.permute(1, 0, 2)
# 如果未提及状态,则默认为0
output, state = self.rnn(X)
# output的形状:(num_steps,batch_size,num_hiddens)
# state的形状:(num_layers,batch_size,num_hiddens)
return output, state

解码器

编码器输出的上下文变量$c$对整个序列$x_1, \ldots, x_T$进行编码。来自训练数据集的输出序列$y_1, y_2, \ldots, y_{T’}$,对于每个时间步$t’$,解码器的$y_{t’}$概率取决于$y_1, \ldots, y_{t’-1}$和上下文变量$c$,即$P(y_{t’} \mid y_1, \ldots, y_{t’-1}, \mathbf{c})$
$$
\mathbf{s}{t^\prime} = g(y{t^\prime-1}, \mathbf{c}, \mathbf{s}_{t^\prime-1}).
$$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Seq2SeqDecoder(d2l.Decoder):
"""用于序列到序列学习的循环神经网络解码器"""
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
dropout=0, **kwargs):
super(Seq2SeqDecoder, self).__init__(**kwargs)
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers,
dropout=dropout)
self.dense = nn.Linear(num_hiddens, vocab_size)

def init_state(self, enc_outputs, *args):
# output, state = encoder(x),取出的是state
return enc_outputs[1]

def forward(self, X, state):
# 输出'X'的形状:(batch_size,num_steps,embed_size),时间步放在前面
X = self.embedding(X).permute(1, 0, 2)
# 广播context,使其具有与X相同的num_steps , state[-1]最后一层隐藏状态的输出
context = state[-1].repeat(X.shape[0], 1, 1)
# torch.cat的结果是最后一层解码器的输出再加上最后一层隐藏层的内容
X_and_context = torch.cat的结果是最后一层解码器的输出再加上最后一层隐藏层的内容((X, context), 2)
output, state = self.rnn(X_and_context, state)
output = self.dense(output).permute(1, 0, 2)
# output的形状:(batch_size,num_steps,vocab_size)
# state的形状:(num_layers,batch_size,num_hiddens)
return output, state

损失函数

构建sequence_mask函数通过零值化屏蔽不相关项:

1
2
3
4
5
6
7
8
9
10
def sequence_mask(X, valid_len, value=0):
"""在序列中屏蔽不相关的项"""
maxlen = X.size(1)
mask = torch.arange((maxlen), dtype=torch.float32,
device=X.device)[None, :] < valid_len[:, None]
X[~mask] = value
return X

X = torch.tensor([[1, 2, 3], [4, 5, 6]])
sequence_mask(X, torch.tensor([1, 2]))

代码理解:

  • **maxlen = X.size(1)**:获取 X 张量的最大长度(即序列的最大步数)。在例子中,X 的形状是 (2, 3),所以 maxlen 将是 3

  • mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None]:

​ None的作用:在 NumPy 和 PyTorch 中是用来增加新维度的。它通常与切片操作结合使用,来添加一个新的轴。

​ [None, :] 的含义:None 放在前面([None, :])表示在第一个轴(即行轴)上增加一个新的维度。

例:

1
2
3
4
a = torch.tensor([1, 2, 3])
b = a[None, :]
print(b.shape) # 输出 (1, 3)

​ valid_len[:, None]的含义:在函数的末尾增加一个维度。

1
2
3
valid_len = torch.tensor([1, 2, 3])
valid_len_expanded = valid_len[:, None]
print(valid_len_expanded)

输出:

tensor([[1],
[2],
[3]])

通过广播机制,PyTorch 会自动将这两个张量的形状对齐,并按元素逐一比较,生成一个布尔值的张量。

(1, maxlen)与valid_len[:, None]按元素比较,比较的结果是:

如果当前时间步小于 valid_len,则返回 True(表示这个时间步有效)。

否则,返回 False(表示这个时间步无效)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import torch

# 输入数据
valid_len = torch.tensor([1, 2])
maxlen = 3

# 创建张量,包含从0到maxlen-1的值
arange_tensor = torch.arange(maxlen)

# 将其转换为形状 (1, maxlen) 形式
arange_tensor = arange_tensor[None, :] # 结果是形状 (1, 3)

# 将 valid_len 转换为形状 (batch_size, 1) 形式
valid_len_expanded = valid_len[:, None] # 结果是形状 (2, 1)

# 比较操作
mask = arange_tensor < valid_len_expanded # 广播后,形状为 (2, 3)

print(mask)
  • X[~mask] = value:

mask表示取反,该段代码整体表现为将 X 中对应 `maskTrue的位置的元素赋值为value`。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#@save
class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):
"""带遮蔽的softmax交叉熵损失函数"""
# pred的形状:(batch_size,num_steps,vocab_size)
# label的形状:(batch_size,num_steps)
# valid_len的形状:(batch_size,)
def forward(self, pred, label, valid_len):
weights = torch.ones_like(label)
weights = sequence_mask(weights, valid_len)
self.reduction='none'
unweighted_loss = super().forward(
pred.permute(0, 2, 1), label)
# 有效的地方留下来
weighted_loss = (unweighted_loss * weights).mean(dim=1)
return weighted_loss

训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#@save
def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):
"""训练序列到序列模型"""
def xavier_init_weights(m):
if type(m) == nn.Linear:
nn.init.xavier_uniform_(m.weight)
if type(m) == nn.GRU:
for param in m._flat_weights_names:
if "weight" in param:
nn.init.xavier_uniform_(m._parameters[param])

net.apply(xavier_init_weights)
net.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
loss = MaskedSoftmaxCELoss()
net.train()
animator = d2l.Animator(xlabel='epoch', ylabel='loss',
xlim=[10, num_epochs])
for epoch in range(num_epochs):
timer = d2l.Timer()
metric = d2l.Accumulator(2) # 训练损失总和,词元数量
for batch in data_iter:
optimizer.zero_grad()
X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]
bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],
device=device).reshape(-1, 1)
# 在句子的开头增加一个开始词元
dec_input = torch.cat([bos, Y[:, :-1]], 1) # 强制教学
Y_hat, _ = net(X, dec_input, X_valid_len)
l = loss(Y_hat, Y, Y_valid_len)
l.sum().backward() # 损失函数的标量进行“反向传播”
d2l.grad_clipping(net, 1)
num_tokens = Y_valid_len.sum()
optimizer.step()
with torch.no_grad():
metric.add(l.sum(), num_tokens)
if (epoch + 1) % 10 == 0:
animator.add(epoch + 1, (metric[0] / metric[1],))
print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '
f'tokens/sec on {str(device)}')

输出:

1
2
3
4
5
6
7
8
9
10
11
embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1
batch_size, num_steps = 64, 10
lr, num_epochs, device = 0.005, 300, d2l.try_gpu()

train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)
encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers,
dropout)
decoder = Seq2SeqDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers,
dropout)
net = d2l.EncoderDecoder(encoder, decoder)
train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)

输出结果:

预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#@save
def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,
device, save_attention_weights=False):
"""序列到序列模型的预测"""
# 在预测时将net设置为评估模式
net.eval()
src_tokens = src_vocab[src_sentence.lower().split(' ')] + [
src_vocab['<eos>']]
enc_valid_len = torch.tensor([len(src_tokens)], device=device)
src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])
# 添加批量轴
enc_X = torch.unsqueeze(
torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)
enc_outputs = net.encoder(enc_X, enc_valid_len)
dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)
# 添加批量轴
dec_X = torch.unsqueeze(torch.tensor(
[tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)
output_seq, attention_weight_seq = [], []
for _ in range(num_steps):
Y, dec_state = net.decoder(dec_X, dec_state)
# 我们使用具有预测最高可能性的词元,作为解码器在下一时间步的输入
dec_X = Y.argmax(dim=2)
pred = dec_X.squeeze(dim=0).type(torch.int32).item()
# 保存注意力权重(稍后讨论)
if save_attention_weights:
attention_weight_seq.append(net.decoder.attention_weights)
# 一旦序列结束词元被预测,输出序列的生成就完成了
if pred == tgt_vocab['<eos>']:
break
output_seq.append(pred)
return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq

预测序列的评估

$$
\exp\left(\min\left(0, 1 - \frac{\mathrm{len}{\text{label}}}{\mathrm{len}{\text{pred}}}\right)\right) \prod_{n=1}^k p_n^{1/2^n},
$$

其中$\mathrm{len}{\text{label}}$表示标签序列中的词元数$\mathrm{len}{\text{pred}}$表示预测序列中的词元数,表示用于匹配的最长的n元语法。$p_n$表示$n$元语法的精确度,举例来说,给定标签序列$A,B,C,D,E,F$和预测序列$A,B,B,C,D$,$p_{1} = 序列长度为1(A,B,B,C,D在给定标签中是否存在,存在则加1)/预测序列步长为1的个数 = 4/5$,$p_{2} = 3(AB,BB,BC,CD在给定标签序列中存在的个数)/4(预测序列步长为2的个数)$,$p_{3} = 1/3$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def bleu(pred_seq, label_seq, k):  #@save
"""计算BLEU"""
pred_tokens, label_tokens = pred_seq.split(' '), label_seq.split(' ')
len_pred, len_label = len(pred_tokens), len(label_tokens)
score = math.exp(min(0, 1 - len_label / len_pred))
for n in range(1, k + 1):
num_matches, label_subs = 0, collections.defaultdict(int)
for i in range(len_label - n + 1):
label_subs[' '.join(label_tokens[i: i + n])] += 1
for i in range(len_pred - n + 1):
if label_subs[' '.join(pred_tokens[i: i + n])] > 0:
num_matches += 1
label_subs[' '.join(pred_tokens[i: i + n])] -= 1
score *= math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n))
return score

预测:

1
2
3
4
5
6
engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']
for eng, fra in zip(engs, fras):
translation, attention_weight_seq = predict_seq2seq(
net, eng, src_vocab, tgt_vocab, num_steps, device)
print(f'{eng} => {translation}, bleu {bleu(translation, fra, k=2):.3f}')

输出:

参考文献:

  1. 动手学深度学习
  2. Wood, F., Gasthaus, J., Archambeau, C., James, L., & Teh, Y. W. (2011). The sequence memoizer. Communications of the ACM, 54(2), 91–98.

循环神经网络
http://example.com/2024/11/14/循环神经网络/
作者
yzcabe
发布于
2024年11月14日
许可协议