API
d2l
read_ptb
读取ptb数据集
1
2
3
4
5
def read_ptb():
data_dir = d2l.download_extract('ptb')
with open(os.path.join(data_dir, 'ptb.train.txt')) as f:
raw_text = f.read()
return [line.split() for line in raw_text.split('\n')]
count_corpus
1
count_corpus(tokens)
args:
- tokens(二重list): 第一重list的元素是每个句子, 第二重list是每个句子里的单词。
return -> collection.Counter:
- 返回collection.Counter类, 这个类的items是dict
Vocab
1
vocab(tokens=None, min_freq=0, reserved_tokens=None)
args:
- tokens(二重list): 第一重list的元素是每个句子, 第二重list是每个句子里的单词。
- min_freq: 低于这个频率的词被置为 ‘<unk>‘。
- reserved: 要保留下来的token。
源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Vocab:
"""Vocabulary for text."""
def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
if tokens is None:
tokens = []
if reserved_tokens is None:
reserved_tokens = []
# Sort according to frequencies
counter = count_corpus(tokens)
#--- x 为tuple(key, value),x[1]为value ---
self.token_freqs = sorted(counter.items(), key=lambda x: x[1],
reverse=True)
# The index for the unknown token is 0
self.unk, uniq_tokens = 0, ['<unk>'] + reserved_tokens
uniq_tokens += [token for token, freq in self.token_freqs
if freq >= min_freq and token not in uniq_tokens]
self.idx_to_token, self.token_to_idx = [], dict()
for token in uniq_tokens:
self.idx_to_token.append(token)
self.token_to_idx[token] = len(self.idx_to_token) - 1
def __len__(self):
return len(self.idx_to_token)
def __getitem__(self, tokens):
if not isinstance(tokens, (list, tuple)):
return self.token_to_idx.get(tokens, self.unk)
return [self.__getitem__(token) for token in tokens]
def to_tokens(self, indices):
if not isinstance(indices, (list, tuple)):
return self.idx_to_token[indices]
return [self.idx_to_token[index] for index in indices]
subsam
pling
1
subsampling(sentences, vocab)
args:
- sentence(list) : 输入的一组句子
- vocab(Vocab类) : 词典
return -> list:
- 返回二次采样后的一组句子
源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def subsampling(sentences, vocab):
"""
args:
sentence(二重list) : 第一重list的元素是每个句子, 第二重list是每个句子里的单词。
vocab(Vocab类) : 词典
return:
sentence(二重list) : 二次采样后的句子
"""
#--- vocab[tk]返回的是索引 token 在语料库中的索引 ---
#--- vocab.idx_to_token 再按照索引变成 token ---
sentences = [[vocab.idx_to_token[vocab[tk]]for tk in line] for line in sentences]
#--- 计算每个词的频率 ---
counter = d2l.count_corpus(sentences)
num_tokens = sum(counter.values())
#--- 如果在二次采样时要保留下该token返回True ---
def keep(token):
#--- f(w_i) 等于 语料库中所有词出现的频数之和 / w_i这个词的出现频数 ---
return (random.uniform(0, 1) < math.sqrt(1e-4 / counter[token] * num_tokens))
#--- 进行二次采样 ---
return [[tk for tk in line if keep(tk)] for line in sentences]
get_centers_and_contexts
1
get_centers_and_contexts(corpus, max_window_size):
args:
- corpus(二重list) : 第一重list的元素是每个句子, 第二重list是每个句子里的单词对应的索引。
return:
- centers(list) : 每个元素是语料库中每个词对应的索引, 每个词都会做中心词
- contexts(二重list): 每个中心词对应的上下文词的索引
源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def get_centers_and_contexts(corpus, max_window_size):
"""
args:
corpus(二重list) : 第一重list的元素是每个句子, 第二重list是每个句子里的单词对应的索引。
return:
centers(list) : 每个元素是语料库中每个词对应的索引, 每个词都会做中心词
contexts(二重list): 每个中心词对应的上下文词的索引
"""
centers, contexts = [], []
for line in corpus:
#--- 每个句子需要有至少两个词形成 “中心目标词 - 上下文词” 对 ---
if len(line) < 2:
continue
#--- 一次性将所有中心词都加入了 ---
centers += line
for i in range(len(line)):
#--- 上下文窗口的中心为i ---
window_size = random.randint(1, max_window_size)
indices = list(range(max(0, i - window_size), min(len(line), i + 1 + window_size)))
#--- 将中心词从上下文词中排除掉 ---
indices.remove(i)
contexts.append([line[idx] for idx in indices])
return centers, contexts
RandomGenerator
根据 sampling_weights 的长度决定采样的 population, sampling_weight 决定 population 中数据出现的频率。
一次性缓存10000个数据, 而不是每次采样都调用ramdom.choices函数。
candidates 用于缓存随机选取的数据, 采样从candidates中采。
源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class RandomGenerator:
#--- 根据 n个 采样权重 采样 一个 [0, n] 的随机整数 ---
def __init__(self, sampling_weights):
self.population = list(range(len(sampling_weights)))
self.sampling_weights = sampling_weights
self.candidates = []
self.i = 0
def draw(self):
if self.i == len(self.candidates):
self.candidates = random.choices(self.population, self.sampling_weights, k=10000)
self.i = 0
self.i += 1
return self.candidates[self.i - 1]
get_negatives
1
get_negatives(all_contexts, corpus, K):
args:
- all_contexts(二重list) : 所有中心词对应的上下文词
- corpus : 语料库
- K : 采样 K 倍上下文词数量的噪声词
return:
- all_negatives(二重list) : 每个中心词和上下文词对 的 噪声词
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def get_negatives(all_contexts, corpus, K):
"""
args:
all_contexts(二重list) : 所有中心词对应的上下文词
corpus : 语料库
K : 采样 K 倍上下文词数量的噪声词
return:
all_negatives(二重list) : 每个中心词和上下文词对 的 噪声词
"""
counter = d2l.count_corpus(corpus)
#--- 噪声次采样的概率P(w) 为 w 的词频 与 所有词的频率的比值的0.75词方 ---
sampling_weights = [counter[i]**0.75 for i in range(len(counter))]
all_negatives, generator = [], RandomGenerator(sampling_weights)
for contexts in all_contexts:
negatives = []
while len(negatives) < len(contexts) * K:
neg = generator.draw()
#--- 噪声词不可以是上下文词 ---
if neg not in contexts:
negatives.append(neg)
all_negatives.append(negatives)
return all_negatives
all_negatives = get_negatives(all_contexts, corpus, 5)
batchify
1
batchify(data)
args:
- data : 中心词 centers, 上下文词 contexts, 噪声词 negative_contexts
return:
- centers(tensor) : shape (batch, 1)
- context_negatives(tensor) : 上下文词和噪声词拼接起来
- masks(tensor): 区别padding和context_negatives
- labels(tensor): 区别上下文词 和 噪声词 以及 padding
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def batchify(data):
"""
args:
data : 中心词 centers, 上下文词 contexts, 噪声词 negative_contexts
return:
centers(tensor) : shape (batch, 1)
context_negatives(tensor) : 上下文词和噪声词拼接起来
masks(tensor): 区别padding和context_negatives
labels(tensor): 区别上下文词 和 噪声词 以及 padding
"""
max_len = max(len(c) + len(n) for _, c, n in data)
centers, contexts_negatives, masks, labels = [], [], [], []
for center, context, negative in data:
cur_len = len(context) + len(negative)
centers += [center]
contexts_negatives += [context + negative + [0] * (max_len - cur_len)]
masks += [[1] * cur_len + [0] * (max_len - cur_len)]
labels += [[1] * len(context) + [0] * (max_len - len(context))]
return (torch.tensor(centers).reshape(-1, 1), torch.tensor(contexts_negatives), torch.tensor(masks), torch.tensor(labels))
load_data_ptb
1
load_data_ptb(batch_size, max_window_size, num_noise_words)
args:
- batch_size : 批大小
- max_window_size : 最大窗大小
- num_noise_words : 噪声词数量
return:
- data_iter : 数据迭代器
- vocab : 词典
sentence -> vocab -> subsampled -> corpus
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def load_data_ptb(batch_size, max_window_size, num_noise_words):
"""
args:
batch_size : 批大小
max_window_size : 最大窗大小
num_noise_words : 噪声词数量
return:
data_iter : 数据迭代器
vocab : 词典
sentence -> vocab -> subsampled -> corpus
"""
num_workers = d2l.get_dataloader_workers()
sentences = read_ptb()
#--- 构造语料库 ---
vocab = d2l.Vocab(sentences, min_freq=10)
#--- 二次采样 ---
subsampled = subsampling(sentences, vocab)
#--- 将词变成索引 ---
corpus = [vocab[line] for line in subsampled]
all_centers, all_contexts = get_centers_and_contexts(corpus, max_window_size)
all_negatives = get_negatives(all_contexts, corpus, num_noise_words)
#--- 构造数据集 ---
class PTBDataset(torch.utils.data.Dataset):
def __init__(self, centers, contexts, negatives):
assert len(centers) == len(contexts) == len(negatives)
self.centers = centers
self.contexts = contexts
self.negatives = negatives
def __getitem__(self, index):
return (self.centers[index], self.contexts[index], self.negatives[index])
def __len__(self):
return len(self.centers)
dataset = PTBDataset(all_centers, all_contexts, all_negatives)
data_iter = torch.utils.data.DataLoader(dataset, batch_size, shuffle=True, collate_fn=batchify, num_workers=num_workers)
return data_iter, vocab
sequence_mask
1
sequence_mask(X, valid_len, value=0):
参数:
- X : 输入的序列,它要求的输入形式是一行一行的, 因此一个batch输入的时候,会将batch变成一行一行的形式
- valid_len : 函数中会通过[:, None]将它变成二维也就是列的形式, 每一行一个数, 表示每一行有效的长度
- value : 填充的默认值
返回:
- X : mask之后的值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def sequence_mask(X, valid_len, value=0):
"""
args:
X : 输入的序列,它要求的输入形式是一行一行的, 因此一个batch输入的时候,会将batch变成一行一行的形式
valid_len : 函数中会通过[:, None]将它变成二维也就是列的形式, 每一行一个数, 表示每一行有效的长度
value : 填充的默认值
return:
X : mask之后的值
"""
#--- 这个函数逐行地对序列进行mask ---
"""Mask irrelevant entries in sequences."""
maxlen = X.size(1)
#--- [None, :] 变成二维的 ---
#--- mask 返回一个布尔类型的序列 ---
mask = torch.arange((maxlen), dtype=torch.float32,
device=X.device)[None, :] < valid_len[:, None]
X[~mask] = value
return X
masked_softmax
1
masked_softmax(X, valid_lens)
参数:
- X : (batch, rows, columns)
- valid_lens : batch中各个样本要mask的长度
返回:
- mask 之后的X 的 softmax 结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def masked_softmax(X, valid_lens):
"""
args:
X : (batch, rows, columns)
valid_lens : batch中各个样本要mask的长度
return:
mask 之后的X 的 softmax 结果
"""
"""Perform softmax operation by masking elements on the last axis."""
# `X`: 3D tensor, `valid_lens`: 1D or 2D tensor
if valid_lens is None:
return nn.functional.softmax(X, dim=-1)
else:
shape = X.shape
if valid_lens.dim() == 1:
#--- 因为输入的valid_lens是batch对应的,现在要把它变成列对应的, 所以需要复制它 ---
valid_lens = torch.repeat_interleave(valid_lens, shape[1])
else:
valid_lens = valid_lens.reshape(-1)
# On the last axis, replace masked elements with a very large negative
# value, whose exponentiation outputs 0
#--- X.reshape(-1, shape[-1]) 这个操作其实是将batch变成了行的形式 ---
X = d2l.sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6)
#--- 再将mask后的结果还原成batch的形式 ---
return nn.functional.softmax(X.reshape(shape), dim=-1)
AdditiveAttention
args
- queries : shape(batch_size, no. of queries, queries_features)
- keys: shape(batch_size, no. of key-value pairs, keys_features)
- values: shape(batch_size, no. of key-value pairs, values_features)
return:
- outputs.shape : (batch_size, no. of queries, values_ features)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class AdditiveAttention(nn.Module):
def __init__(self, key_size, query_size, num_hiddens, dropout, **kwargs):
super(AdditiveAttention, self).__init__(**kwargs)
self.W_k = nn.Linear(key_size, num_hiddens, bias=False)
self.W_q = nn.Linear(query_size, num_hiddens, bias=False)
self.w_v = nn.Linear(num_hiddens, 1, bias=False)
self.dropout = nn.Dropout(dropout)
def forward(self, queries, keys, values, valid_lens):
"""
args:
queries : shape(batch_size, no. of queries, queries_features)
keys: shape(batch_size, no. of key-value pairs, keys_features)
values: shape(batch_size, no. of key-value pairs, values_features)
return:
outputs.shape : (batch_size, no. of queries, values_ features)
"""
# queries.shape: (2, 1, 10) -> (2, 1, 8)
# keys.shape: (2, 10, 2) -> (2, 10, 8)
queries, keys = self.W_q(queries), self.W_k(keys)
# After dimension expansion, shape of `queries`: (`batch_size`, no. of
# queries, 1, `num_hiddens`) and shape of `keys`: (`batch_size`, 1,
# no. of key-value pairs, `num_hiddens`). Sum them up with broadcasting
# 通过广播机制将线性变换后的queries和keys相加
# queries.shape: (2, 1, 8) -> (2, 1, 1, 8)
# keys.shape: (2, 10, 2) -> (2, 1, 10, 8)
features = queries.unsqueeze(2) + keys.unsqueeze(1)
features = torch.tanh(features)
# There is only one output of `self.w_v`, so we remove the last
# one-dimensional entry from the shape. Shape of `scores`:
# (`batch_size`, no. of queries, no. of key-value pairs)
# scores.shape: (2, 10, 8)
scores = self.w_v(features).squeeze(-1)
# self.attention_weights.shape: (2, 1, 10)
self.attention_weights = masked_softmax(scores, valid_lens)
# Shape of `values`: (`batch_size`, no. of key-value pairs, value
# dimension)
# output.shape: (2, 1, 4)
return torch.bmm(self.dropout(self.attention_weights), values)
DotProductAttention
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class DotProductAttention(nn.Module):
"""Scaled dot product attention."""
def __init__(self, dropout, **kwargs):
super(DotProductAttention, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
# Shape of `queries`: (`batch_size`, no. of queries, `d`)
# Shape of `keys`: (`batch_size`, no. of key-value pairs, `d`)
# Shape of `values`: (`batch_size`, no. of key-value pairs, value
# dimension)
# Shape of `valid_lens`: (`batch_size`,) or (`batch_size`, no. of queries)
def forward(self, queries, keys, values, valid_lens=None):
d = queries.shape[-1]
# Set `transpose_b=True` to swap the last two dimensions of `keys`
scores = torch.bmm(queries, keys.transpose(1, 2)) / math.sqrt(d)
self.attention_weights = masked_softmax(scores, valid_lens)
return torch.bmm(self.dropout(self.attention_weights), values)
Seq2SeqEncoder
args:
- X : shape(batch, steps)
return:
- output : shape(num_steps, batch_size, num_hiddens), 最后一层每个时间步的输出
- state : (num_layers, batch_size, num_hiddens), 每层最后一个时间步的输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Seq2SeqEncoder(d2l.Encoder):
"""The RNN encoder for sequence to sequence learning."""
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs):
super(Seq2SeqEncoder, self).__init__(**kwargs)
# Embedding layer
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.GRU(embed_size, num_hiddens, num_layers dropout=dropout)
def forward(self, X, *args):
"""
args:
X : shape(batch, steps)
return:
output : shape(num_steps, batch_size, num_hiddens), 最后一层每个时间步的输出
state : (num_layers, batch_size, num_hiddens), 每层最后一个时间步的输出
"""
# The output `X` shape: (`batch_size`, `num_steps`, `embed_size`)
X = self.embedding(X)
# In RNN models, the first axis corresponds to time steps
X = X.permute(1, 0, 2)
# When state is not mentioned, it defaults to zeros
output, state = self.rnn(X)
# `output` shape: (`num_steps`, `batch_size`, `num_hiddens`)
# `state` shape: (`num_layers`, `batch_size`, `num_hiddens`)
return output, state
transpose_qkv
args:
- X : shape (2, 4, 100)
- num_heads: 5
return:
- X : shape (2 * 5, 4, 20)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def transpose_qkv(X, num_heads):
"""
args:
X : shape (2, 4, 100)
num_heads: 5
return:
X : shape (2 * 5, 4, 20)
"""
# Shape of input `X`: (`batch_size`, no. of queries or key-value pairs, `num_hiddens`).
# Shape of output `X`: (`batch_size`, no. of queries or key-value pairs, `num_heads`, `num_hiddens` / `num_heads`)
# X.shape: (2, 4, 100) -> (2, 4, 5, 20)
X = X.reshape(X.shape[0], X.shape[1], num_heads, -1)
# Shape of output `X`:
# (`batch_size`, `num_heads`, no. of queries or key-value pairs, `num_hiddens` / `num_heads`)
X = X.permute(0, 2, 1, 3)
# Shape of `output`:
# (`batch_size` * `num_heads`, no. of queries or key-value pairs, `num_hiddens` / `num_heads`)
return X.reshape(-1, X.shape[2], X.shape[3])
MultiHeadAttention
args:
- queries : shape(2, 4, 100)
- keys : shape(2, 6, 100)
- values : shape(2, 6, 100)
- valid_lens : torch.tensor([3, 2])
return:
- output_concat : shape(2, 4, 100)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class MultiHeadAttention(nn.Module):
def __init__(self, key_size, query_size, value_size, num_hiddens,
num_heads, dropout, bias=False, **kwargs):
super(MultiHeadAttention, self).__init__(**kwargs)
self.num_heads = num_heads
self.attention = d2l.DotProductAttention(dropout)
self.W_q = nn.Linear(query_size, num_hiddens, bias=bias)
self.W_k = nn.Linear(key_size, num_hiddens, bias=bias)
self.W_v = nn.Linear(value_size, num_hiddens, bias=bias)
self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias)
def forward(self, queries, keys, values, valid_lens):
"""
args:
queries : shape(2, 4, 100)
keys : shape(2, 6, 100)
values : shape(2, 6, 100)
valid_lens : torch.tensor([3, 2])
return:
output_concat : shape(2, 4, 100)
"""
# Shape of `queries`, `keys`, or `values`:
# (`batch_size`, no. of queries or key-value pairs, `num_hiddens`)
# Shape of `valid_lens`:
# (`batch_size`,) or (`batch_size`, no. of queries)
# After transposing, shape of output `queries`, `keys`, or `values`:
# (`batch_size` * `num_heads`, no. of queries or key-value pairs, `num_hiddens` / `num_heads`)
# queries.shape: (2, 4, 100) -> (2*5, 4, 20)
queries = transpose_qkv(self.W_q(queries), self.num_heads)
# keys.shape: (2, 6, 100) -> (2*5, 6, 20)
keys = transpose_qkv(self.W_k(keys), self.num_heads)
# values.shape: (2, 6, 100) -> (2*5, 6, 20)
values = transpose_qkv(self.W_v(values), self.num_heads)
if valid_lens is not None:
# On axis 0, copy the first item (scalar or vector) for
# `num_heads` times, then copy the next item, and so on
# [3, 2] -> [3, 3, 3, 3, 3, 2, 2, 2, 2, 2]
valid_lens = torch.repeat_interleave(valid_lens,
repeats=self.num_heads,
dim=0)
# Shape of `output`: (`batch_size` * `num_heads`, no. of queries, `num_hiddens` / `num_heads`)
# queries.shape: (10, 4, 20)
# keys.shape: (10, 6, 20)
# values.shape: (10, 6, 20)
# output: (10, 4, 20)
output = self.attention(queries, keys, values, valid_lens)
# Shape of `output_concat`:
# (`batch_size`, no. of queries, `num_hiddens`)
output_concat = transpose_output(output, self.num_heads)
return self.W_o(output_concat)
transpose_output
args:
- X : shape(10, 4, 20)
- num_heads : 5
return:
- X : shape(2, 4, 100)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def transpose_output(X, num_heads):
"""
args:
X : shape(10, 4, 20)
num_heads : 5
return:
X : shape(2, 4, 100)
"""
"""Reverse the operation of `transpose_qkv`"""
# X.shape: (10 ,4, 20) -> (2, 5, 4, 20)
X = X.reshape(-1, num_heads, X.shape[1], X.shape[2])
# X.shape: (2, 5, 4, 20) -> (2, 4, 5, 20)
X = X.permute(0, 2, 1, 3)
# X.shape: (2, 4, 5, 20) -> (2, 4, 100)
return X.reshape(X.shape[0], X.shape[1], -1)
AddNorm
1
AddNorm(init: normalized_shape, dropout, **kwargs)
1
2
3
4
5
6
7
8
class AddNorm(nn.Module):
def __init__(self, normalized_shape, dropout, **kwargs):
super(AddNorm, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
self.ln = nn.LayerNorm(normalized_shape)
def forward(self, X, Y):
return self.ln(self.dropout(Y) + X)
EncoderBlock
1
EncoderBlock(init: key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, dropout, use_bias=False, **kwargs)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class EncoderBlock(nn.Module):
def __init__(self, key_size, query_size, value_size, num_hiddens,
norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
dropout, use_bias=False, **kwargs):
super(EncoderBlock, self).__init__(**kwargs)
self.attention = d2l.MultiHeadAttention(key_size, query_size,
value_size, num_hiddens,
num_heads, dropout, use_bias)
self.addnorm1 = AddNorm(norm_shape, dropout)
self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens,
num_hiddens)
self.addnorm2 = AddNorm(norm_shape, dropout)
def forward(self, X, valid_lens):
Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))
return self.addnorm2(Y, self.ffn(Y))
TransformerEncoder
1
TransformerEncoder(init: vocab_size, key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers, dropout, use_bias=False, **kwargs)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class TransformerEncoder(d2l.Encoder):
def __init__(self, vocab_size, key_size, query_size, value_size,
num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
num_heads, num_layers, dropout, use_bias=False, **kwargs):
super(TransformerEncoder, self).__init__(**kwargs)
self.num_hiddens = num_hiddens
self.embedding = nn.Embedding(vocab_size, num_hiddens)
# 位置编码信息
self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout)
self.blks = nn.Sequential()
for i in range(num_layers):
self.blks.add_module(
"block" + str(i),
EncoderBlock(key_size, query_size, value_size, num_hiddens,
norm_shape, ffn_num_input, ffn_num_hiddens,
num_heads, dropout, use_bias))
def forward(self, X, valid_lens, *args):
# Since positional encoding values are between -1 and 1, the embedding values are multiplied by the square root of the embedding dimension to rescale before they are summed up
#
X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
self.attention_weights = [None] * len(self.blks)
for i, blk in enumerate(self.blks):
X = blk(X, valid_lens)
self.attention_weights[
i] = blk.attention.attention.attention_weights
return X
predict_seq2seq
truncate_pad
1
truncate_pad(line, num_steps, padding_token):
如果 line 长度 大于 num_step, 截断, 否则 pad。
1
2
3
4
5
def truncate_pad(line, num_steps, padding_token):
"""Truncate or pad sequences."""
if len(line) > num_steps:
return line[:num_steps] # Truncate
return line + [padding_token] * (num_steps - len(line)) # Pad
d2l.torch
set_figsize
1
d2l.torch.set_figsize(figsize=3.5, 2.5)
设置matlotlib的图标尺寸。
源码:
1
2
3
4
def set_figsize(figsize=(3.5, 2.5)):
"""Set the figure size for matplotlib."""
use_svg_display()
d2l.plt.rcParams['figure.figsize'] = figsize
1
2
3
4
5
from IPython import display
def use_svg_display():
"""Use the svg format to display a plot in Jupyter."""
display.set_matplotlib_formats('svg')
plot
1
d2l.mxnet.plot(X, Y=None, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale='linear', yscale='linear', fmts=('-', 'm--', 'g-.', 'r:'), figsize=(3.5, 2.5), axes=None)
参数:
- X, Y: 数据
- xlabel: x轴的标签
- ylabel: y轴的标签
- legend: 每条线的标记
- xlim: x轴的限制范围
- ylim: y轴的限制范围
show_heatmaps
1
show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5),cmap='Reds')
画热力图, 可用于绘制注意力可视化
参数:
- matrices(tensor) : 要画的attention weights, shape 为 要展示的行数, 要展示的列数, query的数量, keys的数量
- xlabel(str) : x轴的标签
- ylabel(str) : y轴的标签
- titles : 标题
- figsize : 设置fig的尺寸
- cmap : map的color
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5),cmap='Reds'):
"""
args:
matrices(tensor) : 要画的attention weights, shape 为 要展示的行数, 要展示的列数, query的数量, keys的数量
xlabel(str) : x轴的标签
ylabel(str) : y轴的标签
titles : 标题
figsize : 设置fig的尺寸
cmap : map的color
return:
code :
"""
d2l.use_svg_display()
num_rows, num_cols = matrices.shape[0], matrices.shape[1]
fig, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize,
sharex=True, sharey=True, squeeze=False)
# axes 应该是一个 n 行 m 列的fig “矩阵”, 遍历时先遍历行,再遍历列
for i, (row_axes, row_matrices) in enumerate(zip(axes, matrices)):
for j, (ax, matrix) in enumerate(zip(row_axes, row_matrices)):
pcm = ax.imshow(matrix.detach().numpy(), cmap=cmap)
# 在最下面一行添加 x 轴的标签
if i == num_rows - 1:
ax.set_xlabel(xlabel)
# 在第一列添加 y 轴的标签
if j == 0:
ax.set_ylabel(ylabel)
if titles:
ax.set_title(titles[j])
fig.colorbar(pcm, ax=axes, shrink=0.6)