【d2l】d2l api手册

Posted by ShawnD on March 19, 2021

API

d2l

read_ptb

读取ptb数据集

1
2
3
4
5
def read_ptb():
    data_dir = d2l.download_extract('ptb')
    with open(os.path.join(data_dir, 'ptb.train.txt')) as f:
        raw_text = f.read()
    return [line.split() for line in raw_text.split('\n')]

count_corpus

1
count_corpus(tokens)

args:

  • tokens(二重list): 第一重list的元素是每个句子, 第二重list是每个句子里的单词。

return -> collection.Counter:

  • 返回collection.Counter类, 这个类的items是dict

Vocab

1
 vocab(tokens=None, min_freq=0, reserved_tokens=None)

args:

  • tokens(二重list): 第一重list的元素是每个句子, 第二重list是每个句子里的单词。
  • min_freq: 低于这个频率的词被置为 ‘<unk>‘。
  • reserved: 要保留下来的token。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Vocab:
    """Vocabulary for text."""
    def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
        if tokens is None:
            tokens = []
        if reserved_tokens is None:
            reserved_tokens = [] 
        # Sort according to frequencies
        counter = count_corpus(tokens)
        #--- x 为tuple(key, value),x[1]为value ---
        self.token_freqs = sorted(counter.items(), key=lambda x: x[1],
                                  reverse=True)
        # The index for the unknown token is 0
        self.unk, uniq_tokens = 0, ['<unk>'] + reserved_tokens
        uniq_tokens += [token for token, freq in self.token_freqs
                        if freq >= min_freq and token not in uniq_tokens]
        self.idx_to_token, self.token_to_idx = [], dict()
        for token in uniq_tokens:
            self.idx_to_token.append(token)
            self.token_to_idx[token] = len(self.idx_to_token) - 1

    def __len__(self):
        return len(self.idx_to_token)

    def __getitem__(self, tokens):
        if not isinstance(tokens, (list, tuple)):
            return self.token_to_idx.get(tokens, self.unk)
        return [self.__getitem__(token) for token in tokens]

    def to_tokens(self, indices):
        if not isinstance(indices, (list, tuple)):
            return self.idx_to_token[indices]
        return [self.idx_to_token[index] for index in indices]

subsampling

1
subsampling(sentences, vocab)

args:

  • sentence(list) : 输入的一组句子
  • vocab(Vocab类) : 词典

return -> list:

  • 返回二次采样后的一组句子

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def subsampling(sentences, vocab):
    """
    args: 
        sentence(二重list) : 第一重list的元素是每个句子, 第二重list是每个句子里的单词。 
        vocab(Vocab类) : 词典
     
    return: 
        sentence(二重list) : 二次采样后的句子
    """
    #--- vocab[tk]返回的是索引 token 在语料库中的索引 ---
    #--- vocab.idx_to_token 再按照索引变成 token ---
    sentences = [[vocab.idx_to_token[vocab[tk]]for tk in line]  for line in sentences]

    #--- 计算每个词的频率 ---
    counter = d2l.count_corpus(sentences)
    num_tokens = sum(counter.values())

    #--- 如果在二次采样时要保留下该token返回True ---
    def keep(token):
        #--- f(w_i) 等于 语料库中所有词出现的频数之和 / w_i这个词的出现频数 ---
        return (random.uniform(0, 1) < math.sqrt(1e-4 / counter[token] * num_tokens))

    
    #--- 进行二次采样 ---
    return [[tk for tk in line if keep(tk)] for line in sentences]

get_centers_and_contexts

1
get_centers_and_contexts(corpus, max_window_size):

args:

  • corpus(二重list) : 第一重list的元素是每个句子, 第二重list是每个句子里的单词对应的索引。

return:

  • centers(list) : 每个元素是语料库中每个词对应的索引, 每个词都会做中心词
  • contexts(二重list): 每个中心词对应的上下文词的索引

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def get_centers_and_contexts(corpus, max_window_size):
    """
    args: 
        corpus(二重list) :  第一重list的元素是每个句子, 第二重list是每个句子里的单词对应的索引。
     
    return: 
        centers(list) : 每个元素是语料库中每个词对应的索引, 每个词都会做中心词
        contexts(二重list): 每个中心词对应的上下文词的索引
    """
    centers, contexts = [], []
    for line in corpus:
        #--- 每个句子需要有至少两个词形成 “中心目标词 - 上下文词” 对 ---
        if len(line) < 2:
            continue
        #--- 一次性将所有中心词都加入了 ---
        centers += line
        for i in range(len(line)):
            #--- 上下文窗口的中心为i ---
            window_size = random.randint(1, max_window_size)
            indices = list(range(max(0, i - window_size), min(len(line), i + 1 + window_size)))

            #--- 将中心词从上下文词中排除掉 ---
            indices.remove(i)
            contexts.append([line[idx] for idx in indices])

    return centers, contexts

RandomGenerator

根据 sampling_weights 的长度决定采样的 population, sampling_weight 决定 population 中数据出现的频率。

一次性缓存10000个数据, 而不是每次采样都调用ramdom.choices函数。

candidates 用于缓存随机选取的数据, 采样从candidates中采。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class RandomGenerator:
    #--- 根据 n个 采样权重 采样 一个 [0, n] 的随机整数 ---
    def __init__(self, sampling_weights):
        self.population = list(range(len(sampling_weights)))
        self.sampling_weights = sampling_weights
        self.candidates = []
        self.i = 0

    def draw(self):
        if self.i == len(self.candidates):
            self.candidates = random.choices(self.population, self.sampling_weights, k=10000)
            self.i = 0
        self.i += 1
        return self.candidates[self.i - 1]

get_negatives

1
get_negatives(all_contexts, corpus, K):

args:

  • all_contexts(二重list) : 所有中心词对应的上下文词
  • corpus : 语料库
  • K : 采样 K 倍上下文词数量的噪声词

return:

  • all_negatives(二重list) : 每个中心词和上下文词对 的 噪声词
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def get_negatives(all_contexts, corpus, K):
    """
    args: 
        all_contexts(二重list) :  所有中心词对应的上下文词
        corpus : 语料库
        K : 采样 K 倍上下文词数量的噪声词
     
    return: 
        all_negatives(二重list) : 每个中心词和上下文词对 的 噪声词
    """
    counter = d2l.count_corpus(corpus)
    #--- 噪声次采样的概率P(w) 为 w 的词频 与 所有词的频率的比值的0.75词方 ---
    sampling_weights = [counter[i]**0.75 for i in range(len(counter))]
    all_negatives, generator = [], RandomGenerator(sampling_weights)
    for contexts in all_contexts:
        negatives = []
        while len(negatives) < len(contexts) * K:
            neg = generator.draw()
            
            #--- 噪声词不可以是上下文词 ---
            if neg not in contexts:
                negatives.append(neg)
        all_negatives.append(negatives)
    return all_negatives

all_negatives = get_negatives(all_contexts, corpus, 5)

batchify

1
batchify(data)

args:

  • data : 中心词 centers, 上下文词 contexts, 噪声词 negative_contexts

return:

  • centers(tensor) : shape (batch, 1)
  • context_negatives(tensor) : 上下文词和噪声词拼接起来
  • masks(tensor): 区别padding和context_negatives
  • labels(tensor): 区别上下文词 和 噪声词 以及 padding
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def batchify(data):
    """
    args: 
        data :  中心词 centers, 上下文词 contexts, 噪声词 negative_contexts
     
    return: 
        centers(tensor) : shape (batch, 1)
        context_negatives(tensor) : 上下文词和噪声词拼接起来
        masks(tensor): 区别padding和context_negatives
        labels(tensor): 区别上下文词 和 噪声词 以及 padding 
    """
    max_len = max(len(c) + len(n) for _, c, n in data)
    centers, contexts_negatives, masks, labels = [], [], [], []
    for center, context, negative in data:
        cur_len = len(context) + len(negative)
        centers += [center]
        contexts_negatives += [context + negative + [0] * (max_len - cur_len)]
        masks += [[1] * cur_len + [0] * (max_len - cur_len)]
        labels += [[1] * len(context) + [0] * (max_len - len(context))]
    return (torch.tensor(centers).reshape(-1, 1), torch.tensor(contexts_negatives), torch.tensor(masks), torch.tensor(labels))

load_data_ptb

1
load_data_ptb(batch_size, max_window_size, num_noise_words)

args:

  • batch_size : 批大小
  • max_window_size : 最大窗大小
  • num_noise_words : 噪声词数量

return:

  • data_iter : 数据迭代器
  • vocab : 词典

sentence -> vocab -> subsampled -> corpus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def load_data_ptb(batch_size, max_window_size, num_noise_words):
    """
    args: 
        batch_size : 批大小
        max_window_size : 最大窗大小
        num_noise_words : 噪声词数量 
     
    return: 
        data_iter : 数据迭代器
        vocab : 词典

    sentence -> vocab -> subsampled -> corpus
    """
    num_workers = d2l.get_dataloader_workers()
    sentences = read_ptb()
    #--- 构造语料库 ---
    vocab = d2l.Vocab(sentences, min_freq=10)
    #--- 二次采样 ---
    subsampled = subsampling(sentences, vocab)
    #--- 将词变成索引 ---
    corpus = [vocab[line] for line in subsampled]
    all_centers, all_contexts = get_centers_and_contexts(corpus, max_window_size)
    all_negatives = get_negatives(all_contexts, corpus, num_noise_words)

    #--- 构造数据集 ---
    class PTBDataset(torch.utils.data.Dataset):
        def __init__(self, centers, contexts, negatives):
            assert len(centers) ==  len(contexts) == len(negatives)
            self.centers = centers
            self.contexts = contexts
            self.negatives = negatives

        def __getitem__(self, index):
            return (self.centers[index], self.contexts[index], self.negatives[index])

        
        def __len__(self):
            return len(self.centers)

    dataset = PTBDataset(all_centers, all_contexts, all_negatives)
    
    data_iter = torch.utils.data.DataLoader(dataset, batch_size, shuffle=True, collate_fn=batchify, num_workers=num_workers)

    return data_iter, vocab

sequence_mask

1
sequence_mask(X, valid_len, value=0):

参数:

  • X : 输入的序列,它要求的输入形式是一行一行的, 因此一个batch输入的时候,会将batch变成一行一行的形式
  • valid_len : 函数中会通过[:, None]将它变成二维也就是列的形式, 每一行一个数, 表示每一行有效的长度
  • value : 填充的默认值

返回:

  • X : mask之后的值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def sequence_mask(X, valid_len, value=0):
    """
    args: 
        X : 输入的序列,它要求的输入形式是一行一行的, 因此一个batch输入的时候,会将batch变成一行一行的形式
        valid_len : 函数中会通过[:, None]将它变成二维也就是列的形式, 每一行一个数, 表示每一行有效的长度
        value : 填充的默认值
     
    return: 
        X : mask之后的值
    """
    #--- 这个函数逐行地对序列进行mask ---
    """Mask irrelevant entries in sequences."""
    maxlen = X.size(1)
    #--- [None, :] 变成二维的 ---
    #--- mask 返回一个布尔类型的序列 ---
    mask = torch.arange((maxlen), dtype=torch.float32,
                        device=X.device)[None, :] < valid_len[:, None]
    X[~mask] = value
    return X

masked_softmax

1
masked_softmax(X, valid_lens)

参数:

  • X : (batch, rows, columns)
  • valid_lens : batch中各个样本要mask的长度

返回:

  • mask 之后的X 的 softmax 结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def masked_softmax(X, valid_lens):
    """
    args: 
        X : (batch, rows, columns)
        valid_lens :  batch中各个样本要mask的长度  
    return: 
        mask 之后的X 的 softmax 结果 
    """
    """Perform softmax operation by masking elements on the last axis."""
    # `X`: 3D tensor, `valid_lens`: 1D or 2D tensor
    if valid_lens is None:
        return nn.functional.softmax(X, dim=-1)
    else:
        shape = X.shape
        if valid_lens.dim() == 1:
            #--- 因为输入的valid_lens是batch对应的,现在要把它变成列对应的, 所以需要复制它 ---
            valid_lens = torch.repeat_interleave(valid_lens, shape[1])
        else:
            valid_lens = valid_lens.reshape(-1)
        # On the last axis, replace masked elements with a very large negative
        # value, whose exponentiation outputs 0
        #--- X.reshape(-1, shape[-1]) 这个操作其实是将batch变成了行的形式 ---
        X = d2l.sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6)
        #--- 再将mask后的结果还原成batch的形式 ---
        return nn.functional.softmax(X.reshape(shape), dim=-1)

AdditiveAttention

args

  • queries : shape(batch_size, no. of queries, queries_features)
  • keys: shape(batch_size, no. of key-value pairs, keys_features)
  • values: shape(batch_size, no. of key-value pairs, values_features)

return:

  • outputs.shape : (batch_size, no. of queries, values_ features)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class AdditiveAttention(nn.Module):
    def __init__(self, key_size, query_size, num_hiddens, dropout, **kwargs):
        super(AdditiveAttention, self).__init__(**kwargs)
        self.W_k = nn.Linear(key_size, num_hiddens, bias=False)
        self.W_q = nn.Linear(query_size, num_hiddens, bias=False)
        self.w_v = nn.Linear(num_hiddens, 1, bias=False)
        self.dropout = nn.Dropout(dropout)

    def forward(self, queries, keys, values, valid_lens):
        """
        args: 
            queries : shape(batch_size, no. of queries, queries_features)
            keys: shape(batch_size, no. of key-value pairs, keys_features)
            values: shape(batch_size, no. of key-value pairs, values_features)
        return: 
            outputs.shape : (batch_size, no. of queries, values_ features)
        """
        # queries.shape: (2, 1, 10) -> (2, 1, 8)
        # keys.shape: (2, 10, 2) -> (2, 10, 8)
        queries, keys = self.W_q(queries), self.W_k(keys)

        # After dimension expansion, shape of `queries`: (`batch_size`, no. of
        # queries, 1, `num_hiddens`) and shape of `keys`: (`batch_size`, 1,
        # no. of key-value pairs, `num_hiddens`). Sum them up with broadcasting
        # 通过广播机制将线性变换后的queries和keys相加
        # queries.shape: (2, 1, 8) -> (2, 1, 1, 8)
        # keys.shape: (2, 10, 2) -> (2, 1, 10, 8)
        features = queries.unsqueeze(2) + keys.unsqueeze(1)
        features = torch.tanh(features)
        # There is only one output of `self.w_v`, so we remove the last
        # one-dimensional entry from the shape. Shape of `scores`:
        # (`batch_size`, no. of queries, no. of key-value pairs)
        # scores.shape: (2, 10, 8)
        scores = self.w_v(features).squeeze(-1)
        # self.attention_weights.shape: (2, 1, 10)
        self.attention_weights = masked_softmax(scores, valid_lens)
        # Shape of `values`: (`batch_size`, no. of key-value pairs, value
        # dimension)
        # output.shape: (2, 1, 4)
        return torch.bmm(self.dropout(self.attention_weights), values)

DotProductAttention

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class DotProductAttention(nn.Module):
    """Scaled dot product attention."""
    def __init__(self, dropout, **kwargs):
        super(DotProductAttention, self).__init__(**kwargs)
        self.dropout = nn.Dropout(dropout)

    # Shape of `queries`: (`batch_size`, no. of queries, `d`)
    # Shape of `keys`: (`batch_size`, no. of key-value pairs, `d`)
    # Shape of `values`: (`batch_size`, no. of key-value pairs, value
    # dimension)
    # Shape of `valid_lens`: (`batch_size`,) or (`batch_size`, no. of queries)
    def forward(self, queries, keys, values, valid_lens=None):
        d = queries.shape[-1]
        # Set `transpose_b=True` to swap the last two dimensions of `keys`
        scores = torch.bmm(queries, keys.transpose(1, 2)) / math.sqrt(d)
        self.attention_weights = masked_softmax(scores, valid_lens)
        return torch.bmm(self.dropout(self.attention_weights), values)

Seq2SeqEncoder

args:

  • X : shape(batch, steps)

return:

  • output : shape(num_steps, batch_size, num_hiddens), 最后一层每个时间步的输出
  • state : (num_layers, batch_size, num_hiddens), 每层最后一个时间步的输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Seq2SeqEncoder(d2l.Encoder):
    """The RNN encoder for sequence to sequence learning."""
    def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs):
        super(Seq2SeqEncoder, self).__init__(**kwargs)
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.rnn = nn.GRU(embed_size, num_hiddens, num_layers dropout=dropout)

    def forward(self, X, *args):
        """
        args: 
            X : shape(batch, steps) 
         
        return: 
            output : shape(num_steps, batch_size, num_hiddens), 最后一层每个时间步的输出
            state : (num_layers, batch_size, num_hiddens), 每层最后一个时间步的输出
        """
        # The output `X` shape: (`batch_size`, `num_steps`, `embed_size`)
        X = self.embedding(X)
        # In RNN models, the first axis corresponds to time steps
        X = X.permute(1, 0, 2)
        # When state is not mentioned, it defaults to zeros
        output, state = self.rnn(X)
        # `output` shape: (`num_steps`, `batch_size`, `num_hiddens`)
        # `state` shape: (`num_layers`, `batch_size`, `num_hiddens`)
        return output, state

transpose_qkv

args:

  • X : shape (2, 4, 100)
  • num_heads: 5

return:

  • X : shape (2 * 5, 4, 20)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def transpose_qkv(X, num_heads):
    """
    args: 
        X : shape (2, 4, 100)
        num_heads: 5
    return: 
        X : shape (2 * 5, 4, 20)
    """
    # Shape of input `X`: (`batch_size`, no. of queries or key-value pairs, `num_hiddens`).
    # Shape of output `X`: (`batch_size`, no. of queries or key-value pairs, `num_heads`, `num_hiddens` / `num_heads`)
    # X.shape: (2, 4, 100) -> (2, 4, 5, 20)
    X = X.reshape(X.shape[0], X.shape[1], num_heads, -1)

    # Shape of output `X`:
    # (`batch_size`, `num_heads`, no. of queries or key-value pairs, `num_hiddens` / `num_heads`)
    X = X.permute(0, 2, 1, 3)

    # Shape of `output`:
    # (`batch_size` * `num_heads`, no. of queries or key-value pairs, `num_hiddens` / `num_heads`)
    return X.reshape(-1, X.shape[2], X.shape[3])

MultiHeadAttention

args:

  • queries : shape(2, 4, 100)
  • keys : shape(2, 6, 100)
  • values : shape(2, 6, 100)
  • valid_lens : torch.tensor([3, 2])

return:

  • output_concat : shape(2, 4, 100)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class MultiHeadAttention(nn.Module):
    def __init__(self, key_size, query_size, value_size, num_hiddens,
                 num_heads, dropout, bias=False, **kwargs):
        super(MultiHeadAttention, self).__init__(**kwargs)
        self.num_heads = num_heads
        self.attention = d2l.DotProductAttention(dropout)
        self.W_q = nn.Linear(query_size, num_hiddens, bias=bias)
        self.W_k = nn.Linear(key_size, num_hiddens, bias=bias)
        self.W_v = nn.Linear(value_size, num_hiddens, bias=bias)
        self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias)

    def forward(self, queries, keys, values, valid_lens):
        """
        args: 
            queries : shape(2, 4, 100)
            keys : shape(2, 6, 100)
            values : shape(2, 6, 100)
            valid_lens : torch.tensor([3, 2])
        return: 
            output_concat : shape(2, 4, 100)
        """
        # Shape of `queries`, `keys`, or `values`:
        # (`batch_size`, no. of queries or key-value pairs, `num_hiddens`)
        # Shape of `valid_lens`:
        # (`batch_size`,) or (`batch_size`, no. of queries)
        # After transposing, shape of output `queries`, `keys`, or `values`:
        # (`batch_size` * `num_heads`, no. of queries or key-value pairs, `num_hiddens` / `num_heads`)

        # queries.shape: (2, 4, 100) -> (2*5, 4, 20)
        queries = transpose_qkv(self.W_q(queries), self.num_heads)
        # keys.shape: (2, 6, 100) -> (2*5, 6, 20)
        keys = transpose_qkv(self.W_k(keys), self.num_heads)
        # values.shape: (2, 6, 100) -> (2*5, 6, 20)
        values = transpose_qkv(self.W_v(values), self.num_heads)

        if valid_lens is not None:
            # On axis 0, copy the first item (scalar or vector) for
            # `num_heads` times, then copy the next item, and so on
            # [3, 2] -> [3, 3, 3, 3, 3, 2, 2, 2, 2, 2]
            valid_lens = torch.repeat_interleave(valid_lens,
                                                 repeats=self.num_heads,
                                                 dim=0)

        # Shape of `output`: (`batch_size` * `num_heads`, no. of queries, `num_hiddens` / `num_heads`)
        # queries.shape: (10, 4, 20)
        # keys.shape: (10, 6, 20)
        # values.shape: (10, 6, 20)
        # output: (10, 4, 20)
        output = self.attention(queries, keys, values, valid_lens)

        # Shape of `output_concat`:
        # (`batch_size`, no. of queries, `num_hiddens`)
        output_concat = transpose_output(output, self.num_heads)
        return self.W_o(output_concat)

transpose_output

args:

  • X : shape(10, 4, 20)
  • num_heads : 5

return:

  • X : shape(2, 4, 100)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def transpose_output(X, num_heads):
    """
    args: 
        X : shape(10, 4, 20)
        num_heads : 5 
     
    return: 
        X : shape(2, 4, 100)
    """
    """Reverse the operation of `transpose_qkv`"""
    # X.shape: (10 ,4, 20) -> (2, 5, 4, 20)
    X = X.reshape(-1, num_heads, X.shape[1], X.shape[2])
    # X.shape: (2, 5, 4, 20) -> (2, 4, 5, 20)
    X = X.permute(0, 2, 1, 3)
    # X.shape: (2, 4, 5, 20) -> (2, 4, 100)
    return X.reshape(X.shape[0], X.shape[1], -1)

AddNorm

1
AddNorm(init: normalized_shape, dropout, **kwargs)
1
2
3
4
5
6
7
8
class AddNorm(nn.Module):
    def __init__(self, normalized_shape, dropout, **kwargs):
        super(AddNorm, self).__init__(**kwargs)
        self.dropout = nn.Dropout(dropout)
        self.ln = nn.LayerNorm(normalized_shape)

    def forward(self, X, Y):
        return self.ln(self.dropout(Y) + X)

EncoderBlock

1
EncoderBlock(init: key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, dropout, use_bias=False, **kwargs)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class EncoderBlock(nn.Module):
    def __init__(self, key_size, query_size, value_size, num_hiddens,
                 norm_shape, ffn_num_input, ffn_num_hiddens, num_heads,
                 dropout, use_bias=False, **kwargs):
        super(EncoderBlock, self).__init__(**kwargs)
        self.attention = d2l.MultiHeadAttention(key_size, query_size,
                                                value_size, num_hiddens,
                                                num_heads, dropout, use_bias)
        self.addnorm1 = AddNorm(norm_shape, dropout)
        self.ffn = PositionWiseFFN(ffn_num_input, ffn_num_hiddens,
                                   num_hiddens)
        self.addnorm2 = AddNorm(norm_shape, dropout)

    def forward(self, X, valid_lens):
        Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))
        return self.addnorm2(Y, self.ffn(Y))

TransformerEncoder

1
TransformerEncoder(init: vocab_size, key_size, query_size,      value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers, dropout, use_bias=False, **kwargs)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class TransformerEncoder(d2l.Encoder):
    def __init__(self, vocab_size, key_size, query_size,      value_size,
                 num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens,
                 num_heads, num_layers, dropout, use_bias=False, **kwargs):
        super(TransformerEncoder, self).__init__(**kwargs)
        self.num_hiddens = num_hiddens
        self.embedding = nn.Embedding(vocab_size, num_hiddens)
        # 位置编码信息
        self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout)
        self.blks = nn.Sequential()
        for i in range(num_layers):
            self.blks.add_module(
                "block" + str(i),
                EncoderBlock(key_size, query_size, value_size, num_hiddens,
                             norm_shape, ffn_num_input, ffn_num_hiddens,
                             num_heads, dropout, use_bias))

    def forward(self, X, valid_lens, *args):
        # Since positional encoding values are between -1 and 1, the embedding values are multiplied by the square root of the embedding dimension to rescale before they are summed up
        # 
        X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
        self.attention_weights = [None] * len(self.blks)
        for i, blk in enumerate(self.blks):
            X = blk(X, valid_lens)
            self.attention_weights[
                i] = blk.attention.attention.attention_weights
        return X

predict_seq2seq

truncate_pad

1
truncate_pad(line, num_steps, padding_token):

如果 line 长度 大于 num_step, 截断, 否则 pad。

1
2
3
4
5
def truncate_pad(line, num_steps, padding_token):
    """Truncate or pad sequences."""
    if len(line) > num_steps:
        return line[:num_steps]  # Truncate
    return line + [padding_token] * (num_steps - len(line))  # Pad

d2l.torch

set_figsize

1
d2l.torch.set_figsize(figsize=3.5, 2.5)

设置matlotlib的图标尺寸。

源码:

1
2
3
4
def set_figsize(figsize=(3.5, 2.5)):
    """Set the figure size for matplotlib."""
    use_svg_display()
    d2l.plt.rcParams['figure.figsize'] = figsize
1
2
3
4
5
from IPython import display

def use_svg_display():
    """Use the svg format to display a plot in Jupyter."""
    display.set_matplotlib_formats('svg')

plot

1
d2l.mxnet.plot(X, Y=None, xlabel=None, ylabel=None, legend=None, xlim=None, ylim=None, xscale='linear', yscale='linear', fmts=('-', 'm--', 'g-.', 'r:'), figsize=(3.5, 2.5), axes=None)

参数:

  • X, Y: 数据
  • xlabel: x轴的标签
  • ylabel: y轴的标签
  • legend: 每条线的标记
  • xlim: x轴的限制范围
  • ylim: y轴的限制范围

show_heatmaps

1
show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5),cmap='Reds')

画热力图, 可用于绘制注意力可视化

参数:

  • matrices(tensor) : 要画的attention weights, shape 为 要展示的行数, 要展示的列数, query的数量, keys的数量
  • xlabel(str) : x轴的标签
  • ylabel(str) : y轴的标签
  • titles : 标题
  • figsize : 设置fig的尺寸
  • cmap : map的color
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5),cmap='Reds'):
    """
    args: 
        matrices(tensor) : 要画的attention weights, shape 为 要展示的行数, 要展示的列数, query的数量, keys的数量
        xlabel(str) : x轴的标签
        ylabel(str) : y轴的标签
        titles : 标题
        figsize : 设置fig的尺寸
        cmap : map的color
    return: 
        code : 
    """
    d2l.use_svg_display()
    num_rows, num_cols = matrices.shape[0], matrices.shape[1]
    fig, axes = d2l.plt.subplots(num_rows, num_cols, figsize=figsize,
                                 sharex=True, sharey=True, squeeze=False)
    # axes 应该是一个 n 行 m 列的fig “矩阵”, 遍历时先遍历行,再遍历列
    for i, (row_axes, row_matrices) in enumerate(zip(axes, matrices)):
        for j, (ax, matrix) in enumerate(zip(row_axes, row_matrices)):
            pcm = ax.imshow(matrix.detach().numpy(), cmap=cmap)
            # 在最下面一行添加 x 轴的标签
            if i == num_rows - 1:
                ax.set_xlabel(xlabel)
            # 在第一列添加 y 轴的标签
            if j == 0:
                ax.set_ylabel(ylabel)
            if titles:
                ax.set_title(titles[j])
    fig.colorbar(pcm, ax=axes, shrink=0.6)