PyTorch在计算机视觉领域的崛起并非偶然。2017年PyTorch刚发布时,大多数研究者还在使用TensorFlow或Caffe,但短短几年间情况就发生了逆转。我清楚地记得2019年参加CVPR时,会场里PyTorch相关的海报数量首次超过了TensorFlow。这种转变的核心在于PyTorch的"define-by-run"特性——动态计算图让研究人员能够像写Python脚本一样自然地构建模型,调试时可以直接使用熟悉的Python工具链。
在工业实践中,PyTorch的另一个优势是它的Python原生感。当我们需要快速实现一个新论文中的注意力机制时,用PyTorch可以像写普通Python类一样继承nn.Module,而不用处理静态图框架中那些令人头疼的图构建和会话管理。这种开发效率的提升对于需要频繁迭代的计算机视觉项目尤为宝贵。
torchvision是PyTorch生态中专门为计算机视觉设计的工具库。它包含三个关键部分:
python复制from torchvision import models
model = models.resnet50(pretrained=True)
model.fc = nn.Linear(2048, 10) # 替换最后一层适配新任务
python复制from torchvision import transforms, datasets
transform = transforms.Compose([
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
train_set = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
python复制train_transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomRotation(30),
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
PyTorch的nn.Module是所有神经网络模块的基类。构建一个卷积神经网络时,我们会这样组织代码:
python复制class CNN(nn.Module):
def __init__(self):
super().__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2)
)
self.classifier = nn.Sequential(
nn.Linear(128 * 56 * 56, 512),
nn.ReLU(inplace=True),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.features(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
在实际项目中,我通常会采用更模块化的设计,将每个组件拆分成单独的子模块,便于复用和调试。
一个完整的图像分类流程包括以下几个关键步骤:
python复制from torch.utils.data import DataLoader
batch_size = 32
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size)
python复制device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(10):
model.train()
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
python复制model.eval()
correct = 0
total = 0
with torch.no_grad():
for images, labels in val_loader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print(f'Accuracy: {100 * correct / total}%')
PyTorch实现目标检测通常有两种方式:
python复制from torchvision.models.detection import fasterrcnn_resnet50_fpn
model = fasterrcnn_resnet50_fpn(pretrained=True).to(device)
model.eval()
# 推理示例
with torch.no_grad():
predictions = model([images.to(device)])
python复制class DetectionModel(nn.Module):
def __init__(self):
super().__init__()
self.backbone = models.resnet50(pretrained=True)
self.rpn = RegionProposalNetwork()
self.roi_heads = RoIHeads()
def forward(self, images, targets=None):
features = self.backbone(images)
proposals = self.rpn(features)
detections = self.roi_heads(features, proposals)
return detections
在实际项目中,我通常会先使用预训练模型快速验证想法,然后再根据需求进行定制开发。
python复制scaler = torch.cuda.amp.GradScaler()
for epoch in range(10):
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
with torch.cuda.amp.autocast():
outputs = model(images)
loss = criterion(outputs, labels)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
python复制train_loader = DataLoader(
train_set,
batch_size=64,
shuffle=True,
num_workers=4,
pin_memory=True,
persistent_workers=True
)
python复制# 在模型中加入梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 或者使用梯度检查
for name, param in model.named_parameters():
if param.grad is not None:
print(name, param.grad.abs().mean())
python复制# 添加正则化
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
# 使用早停
best_loss = float('inf')
patience = 3
counter = 0
for epoch in range(100):
val_loss = validate(model, val_loader)
if val_loss < best_loss:
best_loss = val_loss
counter = 0
torch.save(model.state_dict(), 'best_model.pth')
else:
counter += 1
if counter >= patience:
break
PyTorch实现语义分割通常使用U-Net或DeepLab等架构:
python复制class UNet(nn.Module):
def __init__(self):
super().__init__()
self.encoder = nn.Sequential(
DoubleConv(3, 64),
nn.MaxPool2d(2),
DoubleConv(64, 128),
nn.MaxPool2d(2),
DoubleConv(128, 256),
nn.MaxPool2d(2),
DoubleConv(256, 512),
nn.MaxPool2d(2),
DoubleConv(512, 1024)
)
self.decoder = nn.Sequential(
UpConv(1024, 512),
DoubleConv(1024, 512),
UpConv(512, 256),
DoubleConv(512, 256),
UpConv(256, 128),
DoubleConv(256, 128),
UpConv(128, 64),
DoubleConv(128, 64),
nn.Conv2d(64, num_classes, kernel_size=1)
)
def forward(self, x):
skip_connections = []
for down in self.encoder[:-1]:
x = down(x)
skip_connections.append(x)
x = self.encoder[-1](x)
for up, skip in zip(self.decoder[::2], reversed(skip_connections)):
x = up(x)
x = torch.cat([x, skip], dim=1)
x = self.decoder[1](x)
return self.decoder[-1](x)
使用PyTorch实现GAN进行图像生成:
python复制class Generator(nn.Module):
def __init__(self, latent_dim):
super().__init__()
self.model = nn.Sequential(
nn.Linear(latent_dim, 256*7*7),
nn.Unflatten(1, (256, 7, 7)),
nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1),
nn.BatchNorm2d(128),
nn.LeakyReLU(0.2),
nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),
nn.BatchNorm2d(64),
nn.LeakyReLU(0.2),
nn.ConvTranspose2d(64, 1, 3, stride=1, padding=1),
nn.Tanh()
)
def forward(self, z):
return self.model(z)
class Discriminator(nn.Module):
def __init__(self):
super().__init__()
self.model = nn.Sequential(
nn.Conv2d(1, 64, 3, stride=2, padding=1),
nn.LeakyReLU(0.2),
nn.Dropout2d(0.25),
nn.Conv2d(64, 128, 3, stride=2, padding=1),
nn.BatchNorm2d(128),
nn.LeakyReLU(0.2),
nn.Dropout2d(0.25),
nn.Flatten(),
nn.Linear(128*7*7, 1),
nn.Sigmoid()
)
def forward(self, img):
return self.model(img)
将PyTorch模型导出为TorchScript格式:
python复制model = models.resnet18(pretrained=True)
model.eval()
# 方法1:跟踪(tracing)
example_input = torch.rand(1, 3, 224, 224)
traced_script_module = torch.jit.trace(model, example_input)
traced_script_module.save("resnet18_traced.pt")
# 方法2:脚本化(scripting)
scripted_model = torch.jit.script(model)
scripted_model.save("resnet18_scripted.pt")
将模型导出为ONNX格式:
python复制dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
model,
dummy_input,
"model.onnx",
input_names=["input"],
output_names=["output"],
dynamic_axes={
"input": {0: "batch_size"},
"output": {0: "batch_size"}
}
)
创建模型存档文件:
python复制# 创建模型类
class MyModel(nn.Module):
def __init__(self):
super().__init__()
self.model = models.resnet18(pretrained=True)
def forward(self, x):
return self.model(x)
# 创建handler
from ts.torch_handler.base_handler import BaseHandler
class MyHandler(BaseHandler):
def __init__(self):
super().__init__()
self.model = None
def initialize(self, context):
self.model = MyModel()
self.model.load_state_dict(torch.load("model.pth"))
self.model.eval()
def preprocess(self, data):
# 实现预处理逻辑
pass
def inference(self, data):
# 实现推理逻辑
pass
def postprocess(self, data):
# 实现后处理逻辑
pass
python复制class PatchEmbedding(nn.Module):
def __init__(self, img_size=224, patch_size=16, in_chans=3, embed_dim=768):
super().__init__()
self.img_size = img_size
self.patch_size = patch_size
self.n_patches = (img_size // patch_size) ** 2
self.proj = nn.Conv2d(
in_chans,
embed_dim,
kernel_size=patch_size,
stride=patch_size
)
def forward(self, x):
x = self.proj(x) # (B, E, H/P, W/P)
x = x.flatten(2) # (B, E, N)
x = x.transpose(1, 2) # (B, N, E)
return x
class VisionTransformer(nn.Module):
def __init__(self, img_size=224, patch_size=16, in_chans=3, embed_dim=768,
depth=12, num_heads=12, mlp_ratio=4., num_classes=1000):
super().__init__()
self.patch_embed = PatchEmbedding(img_size, patch_size, in_chans, embed_dim)
self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
self.pos_embed = nn.Parameter(torch.zeros(1, self.patch_embed.n_patches + 1, embed_dim))
self.blocks = nn.ModuleList([
TransformerBlock(embed_dim, num_heads, mlp_ratio) for _ in range(depth)
])
self.norm = nn.LayerNorm(embed_dim)
self.head = nn.Linear(embed_dim, num_classes)
def forward(self, x):
B = x.shape[0]
x = self.patch_embed(x)
cls_tokens = self.cls_token.expand(B, -1, -1)
x = torch.cat((cls_tokens, x), dim=1)
x = x + self.pos_embed
for block in self.blocks:
x = block(x)
x = self.norm(x)
cls_token_final = x[:, 0]
return self.head(cls_token_final)
python复制class SimCLR(nn.Module):
def __init__(self, base_encoder, projection_dim=128):
super().__init__()
self.encoder = base_encoder(pretrained=False)
dim_mlp = self.encoder.fc.weight.shape[1]
self.encoder.fc = nn.Sequential(
nn.Linear(dim_mlp, dim_mlp),
nn.ReLU(),
nn.Linear(dim_mlp, projection_dim)
)
def forward(self, x1, x2):
z1 = self.encoder(x1)
z2 = self.encoder(x2)
return z1, z2
def contrastive_loss(z1, z2, temperature=0.5):
batch_size = z1.shape[0]
z = torch.cat([z1, z2], dim=0)
sim_matrix = torch.exp(torch.mm(z, z.t()) / temperature)
mask = (torch.ones_like(sim_matrix) - torch.eye(2 * batch_size, device=sim_matrix.device)).bool()
pos_sim = torch.exp(torch.sum(z1 * z2, dim=1) / temperature)
pos_sim = torch.cat([pos_sim, pos_sim], dim=0)
loss = -torch.log(pos_sim / (sim_matrix.masked_select(mask).view(2 * batch_size, -1).sum(dim=1) + 1e-8))
return loss.mean()
python复制class CustomDataset(Dataset):
def __init__(self, image_paths, labels, transform=None):
self.image_paths = image_paths
self.labels = labels
self.transform = transform
self.cache = {} # 简单缓存机制
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
if idx in self.cache:
return self.cache[idx]
image = Image.open(self.image_paths[idx])
label = self.labels[idx]
if self.transform:
image = self.transform(image)
self.cache[idx] = (image, label)
return image, label
python复制from torch.utils.data import WeightedRandomSampler
class_counts = torch.bincount(torch.tensor(labels))
class_weights = 1. / class_counts
sample_weights = class_weights[labels]
sampler = WeightedRandomSampler(sample_weights, len(sample_weights))
train_loader = DataLoader(train_set, batch_size=32, sampler=sampler)
python复制def check_gradients(model):
for name, param in model.named_parameters():
if param.grad is not None:
grad_mean = param.grad.abs().mean().item()
if grad_mean < 1e-7:
print(f"Warning: {name} has very small gradients ({grad_mean:.2e})")
elif torch.isnan(param.grad).any():
print(f"Warning: {name} has NaN gradients")
python复制def register_hooks(model):
activation_stats = {}
def get_activation_stats(name):
def hook(module, input, output):
activation_stats[name] = {
'mean': output.mean().item(),
'std': output.std().item(),
'min': output.min().item(),
'max': output.max().item()
}
return hook
for name, module in model.named_modules():
if isinstance(module, (nn.Conv2d, nn.Linear, nn.ReLU)):
module.register_forward_hook(get_activation_stats(name))
return activation_stats
python复制# 训练循环中的内存清理
for epoch in range(epochs):
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad(set_to_none=True) # 更高效的内存清零
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
if batch_idx % 100 == 0:
torch.cuda.empty_cache() # 定期清理缓存
python复制if torch.cuda.device_count() > 1:
print(f"Using {torch.cuda.device_count()} GPUs")
model = nn.DataParallel(model)
model.to(device)
# 或者使用更灵活的DistributedDataParallel
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
dist.init_process_group("nccl")
model = DDP(model.to(device), device_ids=[local_rank])