1. 为什么我们需要"手撕"Transformer?
在自然语言处理领域,Transformer架构已经成为了事实上的标准。但很多人在使用BERT、GPT这些现成模型时,往往只停留在调包和微调的层面。三年前我第一次接触Transformer时,就被它的精妙设计所震撼——但直到我真正用代码从头实现它,那些论文中的公式才真正活了起来。
"手撕"Transformer意味着不使用任何现成的深度学习框架(如TensorFlow/PyTorch),仅用基础矩阵运算实现所有核心组件。这个过程就像拆解一块精密的瑞士手表,能让你真正理解:
- 自注意力机制如何动态分配权重
- 位置编码为何能替代RNN的时序处理
- 残差连接和层归一化如何解决深层网络训练难题
2. 基础架构拆解
2.1 输入编码层实现
我们先从最基础的输入处理开始。假设我们的输入句子是"I love NLP",经过分词和词表映射后得到token IDs [10, 23, 45]。传统做法会直接使用Embedding层,但我们要手动实现:
python复制import numpy as np
# 词表大小设为10000,向量维度512
vocab_size = 10000
d_model = 512
# 随机初始化词嵌入矩阵(实际应用会用预训练值)
embedding_matrix = np.random.randn(vocab_size, d_model) * 0.01
# 输入句子编码
input_ids = np.array([10, 23, 45])
input_embeddings = embedding_matrix[input_ids] # 形状(3, 512)
关键细节:初始化时的0.01缩放因子非常重要,可以防止初始阶段梯度爆炸
2.2 位置编码的数学实现
Transformer抛弃了RNN的循环结构,改用位置编码(Positional Encoding)来注入序列顺序信息。其公式为:
$$
PE_{(pos,2i)} = \sin(pos/10000^{2i/d_{model}}) \
PE_{(pos,2i+1)} = \cos(pos/10000^{2i/d_{model}})
$$
对应的Python实现:
python复制def positional_encoding(max_len, d_model):
pe = np.zeros((max_len, d_model))
position = np.arange(0, max_len).reshape(-1, 1)
div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))
pe[:, 0::2] = np.sin(position * div_term)
pe[:, 1::2] = np.cos(position * div_term)
return pe
# 为我们的3个token添加位置编码
max_len = 100
pe = positional_encoding(max_len, d_model)
input_embeddings += pe[:len(input_ids)]
实测发现,当序列长度超过训练时的最大长度时,这种编码方式仍能保持较好的外推性。
3. 自注意力机制核心实现
3.1 QKV矩阵计算
自注意力的关键在于计算查询(Query)、键(Key)和值(Value)矩阵。假设我们有8个头(head),每个头维度64:
python复制n_heads = 8
d_k = d_v = d_model // n_heads # 64
# 初始化权重矩阵
W_Q = np.random.randn(d_model, d_k * n_heads) * 0.01
W_K = np.random.randn(d_model, d_k * n_heads) * 0.01
W_V = np.random.randn(d_model, d_v * n_heads) * 0.01
# 计算Q,K,V (batch_size=1的情况下)
Q = np.dot(input_embeddings, W_Q) # (3, 512)
K = np.dot(input_embeddings, W_K)
V = np.dot(input_embeddings, W_V)
# 拆分为多头
Q = Q.reshape(-1, n_heads, d_k) # (3, 8, 64)
K = K.reshape(-1, n_heads, d_k)
V = V.reshape(-1, n_heads, d_v)
3.2 注意力分数计算
实现缩放点积注意力时,最容易出错的是mask处理和softmax维度:
python复制# 计算注意力分数
attn_scores = np.matmul(Q, K.transpose(0,2,1)) / np.sqrt(d_k) # (3,8,3)
# 生成因果掩码(防止看到未来信息)
mask = np.triu(np.ones((len(input_ids), len(input_ids))), k=1)
attn_scores = attn_scores - 1e9 * mask
# 计算注意力权重
attn_weights = np.exp(attn_scores - np.max(attn_scores, axis=-1, keepdims=True))
attn_weights = attn_weights / np.sum(attn_weights, axis=-1, keepdims=True)
# 加权求和
output = np.matmul(attn_weights, V) # (3,8,64)
output = output.reshape(-1, d_model) # (3,512)
踩坑记录:忘记除以sqrt(d_k)会导致softmax后某些位置权重接近1,造成梯度消失
4. 前馈网络与残差连接
4.1 两层层叠前馈网络
Transformer中的前馈网络(FFN)实际上是两个全连接层:
python复制d_ff = 2048 # 中间层维度
W1 = np.random.randn(d_model, d_ff) * 0.01
b1 = np.zeros(d_ff)
W2 = np.random.randn(d_ff, d_model) * 0.01
b2 = np.zeros(d_model)
# 第一层带ReLU激活
hidden = np.maximum(np.dot(output, W1) + b1, 0)
# 第二层线性变换
ffn_output = np.dot(hidden, W2) + b2
4.2 残差连接与层归一化
这是保证深层网络训练稳定的关键:
python复制# 残差连接
residual = ffn_output + input_embeddings
# 层归一化实现
def layer_norm(x):
mean = np.mean(x, axis=-1, keepdims=True)
std = np.std(x, axis=-1, keepdims=True)
gamma = np.ones(x.shape[-1]) # 可训练参数
beta = np.zeros(x.shape[-1]) # 可训练参数
return gamma * (x - mean) / (std + 1e-6) + beta
norm_output = layer_norm(residual)
5. 完整Transformer层组装
将上述组件组合成完整编码器层:
python复制class TransformerEncoderLayer:
def __init__(self, d_model, n_heads):
self.d_model = d_model
self.n_heads = n_heads
self.d_k = d_v = d_model // n_heads
# 初始化所有权重
self.W_Q = np.random.randn(d_model, self.d_k * n_heads) * 0.01
self.W_K = np.random.randn(d_model, self.d_k * n_heads) * 0.01
self.W_V = np.random.randn(d_model, self.d_v * n_heads) * 0.01
self.W_O = np.random.randn(d_model, d_model) * 0.01
self.W1 = np.random.randn(d_model, d_ff) * 0.01
self.b1 = np.zeros(d_ff)
self.W2 = np.random.randn(d_ff, d_model) * 0.01
self.b2 = np.zeros(d_model)
self.gamma1 = np.ones(d_model)
self.beta1 = np.zeros(d_model)
self.gamma2 = np.ones(d_model)
self.beta2 = np.zeros(d_model)
def __call__(self, x):
# 自注意力部分
Q = np.dot(x, self.W_Q).reshape(-1, self.n_heads, self.d_k)
K = np.dot(x, self.W_K).reshape(-1, self.n_heads, self.d_k)
V = np.dot(x, self.W_V).reshape(-1, self.n_heads, self.d_v)
attn = np.matmul(Q, K.transpose(0,2,1)) / np.sqrt(self.d_k)
attn = np.exp(attn - np.max(attn, axis=-1, keepdims=True))
attn = attn / np.sum(attn, axis=-1, keepdims=True)
output = np.matmul(attn, V).reshape(-1, self.d_model)
output = np.dot(output, self.W_O)
# 残差连接和层归一化
output = layer_norm(output + x, self.gamma1, self.beta1)
# 前馈网络
ffn = np.maximum(np.dot(output, self.W1) + self.b1, 0)
ffn = np.dot(ffn, self.W2) + self.b2
# 最终输出
return layer_norm(ffn + output, self.gamma2, self.beta2)
6. 训练技巧与优化
6.1 学习率预热策略
Transformer训练需要特殊的学习率调度:
python复制class WarmupScheduler:
def __init__(self, d_model, warmup_steps=4000):
self.d_model = d_model
self.warmup_steps = warmup_steps
self.current_step = 0
def __call__(self):
self.current_step += 1
return (self.d_model ** -0.5) * min(
self.current_step ** -0.5,
self.current_step * self.warmup_steps ** -1.5
)
6.2 标签平滑正则化
防止模型对训练数据过度自信:
python复制def label_smoothing_loss(y_true, y_pred, epsilon=0.1):
n_classes = y_pred.shape[-1]
y_smooth = (1 - epsilon) * y_true + epsilon / n_classes
return -np.sum(y_smooth * np.log(y_pred + 1e-10))
7. 性能优化实战
7.1 内存高效注意力
当序列较长时,原始实现内存占用会爆炸。我们可以实现分块计算:
python复制def memory_efficient_attention(Q, K, V, chunk_size=64):
batch, n_heads, seq_len, d_k = Q.shape
output = np.zeros((batch, n_heads, seq_len, d_k))
for i in range(0, seq_len, chunk_size):
Q_chunk = Q[:, :, i:i+chunk_size]
scores = np.matmul(Q_chunk, K.transpose(0,1,3,2)) / np.sqrt(d_k)
attn = softmax(scores)
output[:, :, i:i+chunk_size] = np.matmul(attn, V)
return output
7.2 混合精度训练
使用float16可以显著减少内存占用:
python复制def to_half_precision(params):
return {k: v.astype(np.float16) if v.dtype == np.float32 else v
for k, v in params.items()}
def to_full_precision(params):
return {k: v.astype(np.float32) if v.dtype == np.float16 else v
for k, v in params.items()}
8. 从零训练一个迷你Transformer
8.1 数据准备
我们使用IWSLT德语-英语数据集的小型版本:
python复制def load_data():
# 简化的数据处理流程
texts = ["Ich liebe NLP", "Transformers sind toll"]
tokenizer = {
'<pad>': 0, '<s>': 1, '</s>': 2,
'ich': 3, 'liebe': 4, 'nlp': 5,
'transformers': 6, 'sind': 7, 'toll': 8
}
# 添加位置信息并转换为ID
encoder_input = [[1, 3, 4, 5, 2]]
decoder_input = [[1, 3, 4, 5, 2]]
target = [[3, 4, 5, 2, 0]]
return np.array(encoder_input), np.array(decoder_input), np.array(target)
8.2 训练循环
基础训练流程实现:
python复制def train_step(enc_input, dec_input, target, model, optimizer):
# 前向传播
with tf.GradientTape() as tape:
predictions = model(enc_input, dec_input)
loss = cross_entropy_loss(target, predictions)
# 反向传播
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
9. 调试与可视化技巧
9.1 注意力权重可视化
理解模型关注的重点:
python复制def plot_attention(weights, src_tokens, tgt_tokens):
plt.figure(figsize=(10, 10))
plt.imshow(weights, cmap='viridis')
plt.xticks(range(len(src_tokens)), src_tokens, rotation=90)
plt.yticks(range(len(tgt_tokens)), tgt_tokens)
plt.colorbar()
plt.show()
9.2 梯度流动检查
防止梯度消失/爆炸:
python复制def check_gradients(gradients):
for grad, var in zip(gradients, model.trainable_variables):
if grad is None:
print(f"No gradient for {var.name}")
else:
print(f"{var.name}: max={np.max(grad):.4f}, min={np.min(grad):.4f}")
10. 扩展与优化方向
10.1 模型压缩技术
- 知识蒸馏:用大模型指导小模型训练
- 量化感知训练:直接训练低精度模型
- 参数共享:在不同层/头之间共享权重矩阵
10.2 高效注意力变体
- 稀疏注意力:只计算特定位置的注意力
- 局部注意力:限制注意力窗口大小
- 线性注意力:近似计算降低复杂度
实现这些优化后,我们的"手撕"Transformer虽然性能不及工业级实现,但对理解模型本质大有裨益。建议在完成基础版本后,逐步添加这些高级特性。