Transformer架构最初由Vaswani等人在2017年的论文《Attention is All You Need》中提出,其设计初衷是为了解决机器翻译任务中的序列建模问题。与传统的RNN和LSTM不同,Transformer完全基于注意力机制构建,摒弃了循环结构,使得模型能够并行处理整个输入序列。
原始Transformer采用经典的编码器-解码器结构:
python复制class Transformer(nn.Module):
def __init__(self, num_layers, d_model, num_heads, d_ff, input_vocab_size,
target_vocab_size, dropout=0.1):
super().__init__()
self.encoder = Encoder(num_layers, d_model, num_heads, d_ff,
input_vocab_size, dropout)
self.decoder = Decoder(num_layers, d_model, num_heads, d_ff,
target_vocab_size, dropout)
self.final_layer = nn.Linear(d_model, target_vocab_size)
编码器由N个相同的层堆叠而成(通常N=6或12),每层包含两个主要子层:
每个子层都采用残差连接和层归一化:
python复制class EncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super().__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
# 自注意力子层
attn_output = self.self_attn(x, x, x, mask)
x = x + self.dropout(attn_output)
x = self.norm1(x)
# 前馈子层
ff_output = self.feed_forward(x)
x = x + self.dropout(ff_output)
x = self.norm2(x)
return x
随着研究的深入,现代大型语言模型(LLM)大多采用仅解码器架构(Decoder-only),主要原因包括:
架构对比表:
| 架构类型 | 代表模型 | 注意力模式 | 适用场景 |
|---|---|---|---|
| Encoder-only | BERT, RoBERTa | 双向注意力 | 文本分类、NER |
| Decoder-only | GPT, LLaMA | 因果注意力 | 文本生成 |
| Encoder-Decoder | T5, BART | 混合注意力 | 翻译、摘要 |
自注意力机制的核心是Query、Key、Value三个矩阵的交互。从线性代数角度看,这三个矩阵实现了输入序列在不同子空间中的投影:
python复制# 维度定义
d_model = 512 # 模型维度
d_k = d_v = 64 # Q/K/V维度
num_heads = 8 # 注意力头数
# 投影矩阵初始化
W_Q = nn.Parameter(torch.randn(num_heads, d_model, d_k))
W_K = nn.Parameter(torch.randn(num_heads, d_model, d_k))
W_V = nn.Parameter(torch.randn(num_heads, d_model, d_v))
投影过程的数学表达:
[ Q = XW_Q, \quad K = XW_K, \quad V = XW_V ]
其中 ( X \in \mathbb{R}^{n \times d_{model}} ) 是输入序列,( n ) 是序列长度。
完整的注意力计算包含以下步骤:
python复制scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(d_k)
数学表达式:
[ \text{Attention}(Q,K,V) = \text{softmax}(\frac{QK^T}{\sqrt{d_k}})V ]
python复制if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
python复制attn_weights = F.softmax(scores, dim=-1)
python复制output = torch.matmul(attn_weights, V)
多头注意力将Q/K/V分割到多个子空间并行计算:
python复制class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads, dropout):
super().__init__()
assert d_model % num_heads == 0
self.d_k = d_model // num_heads
self.num_heads = num_heads
self.linears = clones(nn.Linear(d_model, d_model), 4)
self.dropout = nn.Dropout(p=dropout)
def forward(self, query, key, value, mask=None):
batch_size = query.size(0)
# 线性投影 + 分头
query, key, value = [
lin(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
for lin, x in zip(self.linears, (query, key, value))
]
# 计算注意力
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
p_attn = F.softmax(scores, dim=-1)
p_attn = self.dropout(p_attn)
x = torch.matmul(p_attn, value)
# 合并多头
x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.d_k)
return self.linears[-1](x)
原始Transformer使用正弦余弦编码:
[ PE_{(pos,2i)} = \sin(pos/10000^{2i/d_{model}}) ]
[ PE_{(pos,2i+1)} = \cos(pos/10000^{2i/d_{model}}) ]
实现代码:
python复制class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super().__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:x.size(1)]
旋转位置编码(Rotary Position Embedding)是当前主流方案:
python复制def apply_rotary_pos_emb(q, k, sin, cos):
q_embed = (q * cos) + (rotate_half(q) * sin)
k_embed = (k * cos) + (rotate_half(k) * sin)
return q_embed, k_embed
RoPE的优势:
GQA在多头注意力(MHA)和Multi-Query注意力(MQA)间取得平衡:
python复制class GroupedQueryAttention(nn.Module):
def __init__(self, d_model, num_heads, num_groups):
super().__init__()
self.num_heads = num_heads
self.num_groups = num_groups
self.head_dim = d_model // num_heads
self.q_proj = nn.Linear(d_model, d_model)
self.k_proj = nn.Linear(d_model, self.head_dim * num_groups)
self.v_proj = nn.Linear(d_model, self.head_dim * num_groups)
def forward(self, q, k, v, mask=None):
# 查询保持多头
q = self.q_proj(q).view(q.size(0), q.size(1), self.num_heads, self.head_dim)
# 键值共享组
k = self.k_proj(k).view(k.size(0), k.size(1), self.num_groups, self.head_dim)
v = self.v_proj(v).view(v.size(0), v.size(1), self.num_groups, self.head_dim)
# 计算注意力(需要广播)
...
Flash Attention通过以下技术大幅提升计算效率:
python复制# Flash Attention伪代码
def flash_attention(Q, K, V):
# 分块处理
for i in range(0, N, block_size):
Qi = Q[i:i+block_size]
for j in range(0, N, block_size):
Kj = K[j:j+block_size]
Vj = V[j:j+block_size]
# 计算局部注意力
S_ij = Qi @ Kj.T / sqrt(d_k)
P_ij = softmax(S_ij)
O_i += P_ij @ Vj
return O
python复制class TransformerLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
super().__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads, dropout)
self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
# 自注意力子层
attn_output = self.self_attn(x, x, x, mask)
x = x + self.dropout(attn_output)
x = self.norm1(x)
# 前馈子层
ff_output = self.feed_forward(x)
x = x + self.dropout(ff_output)
x = self.norm2(x)
return x
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff, dropout=0.1):
super().__init__()
self.w_1 = nn.Linear(d_model, d_ff)
self.w_2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
return self.w_2(self.dropout(F.gelu(self.w_1(x))))
python复制# 检查梯度流动
for name, param in model.named_parameters():
if param.grad is None:
print(f"No gradient for {name}")
else:
print(f"{name} grad norm: {param.grad.norm().item():.4f}")
python复制def plot_attention(attention_weights, tokens):
plt.figure(figsize=(10, 8))
sns.heatmap(attention_weights, annot=False,
xticklabels=tokens, yticklabels=tokens,
cmap="YlGnBu")
plt.title("Attention Weights")
plt.show()
python复制def check_nan_inf(tensor, name):
if torch.isnan(tensor).any():
print(f"NaN detected in {name}")
if torch.isinf(tensor).any():
print(f"Inf detected in {name}")
python复制scaler = torch.cuda.amp.GradScaler()
for batch in dataloader:
with torch.cuda.amp.autocast():
outputs = model(batch.inputs)
loss = criterion(outputs, batch.targets)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
python复制from torch.utils.checkpoint import checkpoint
class CheckpointTransformerLayer(nn.Module):
def forward(self, x, mask):
return checkpoint(self._forward, x, mask)
def _forward(self, x, mask):
# 原始前向计算
...
训练不收敛:
内存溢出:
长序列处理:
稀疏注意力:
线性注意力:
[ \text{Attention}(Q,K,V) = V \cdot \text{softmax}(K^T Q) ]
实现复杂度从O(n²)降到O(n)
状态空间模型:
混合专家(MoE):
python复制class MoELayer(nn.Module):
def __init__(self, d_model, num_experts, top_k=2):
super().__init__()
self.experts = nn.ModuleList([FFN(d_model) for _ in range(num_experts)])
self.gate = nn.Linear(d_model, num_experts)
self.top_k = top_k
def forward(self, x):
# 计算门控值
gates = F.softmax(self.gate(x), dim=-1) # [batch, seq_len, num_experts]
# 选择top-k专家
topk_values, topk_indices = torch.topk(gates, self.top_k, dim=-1)
# 加权求和专家输出
output = torch.zeros_like(x)
for i in range(self.top_k):
expert_mask = F.one_hot(topk_indices[..., i], num_classes=len(self.experts))
expert_output = torch.stack([e(x) for e in self.experts], dim=-2)
output += (expert_output * expert_mask.unsqueeze(-1)).sum(dim=-2) * topk_values[..., i:i+1]
return output
python复制# 动态量化
model = torch.quantization.quantize_dynamic(
model, {nn.Linear}, dtype=torch.qint8
)
# 静态量化
model.qconfig = torch.quantization.get_default_qconfig('fbgemm')
torch.quantization.prepare(model, inplace=True)
# 校准...
torch.quantization.convert(model, inplace=True)
python复制class GenerationCache:
def __init__(self, max_length):
self.k_cache = []
self.v_cache = []
def update(self, new_k, new_v):
self.k_cache.append(new_k)
self.v_cache.append(new_v)
GPU优化:
CPU部署:
python复制# ONNX导出
torch.onnx.export(model,
dummy_input,
"model.onnx",
opset_version=13,
input_names=["input_ids"],
output_names=["logits"])