在AI模型的实际应用场景中,批量推理(Batch Inference)是一项高频需求。不同于实时API调用,批量处理更适合以下场景:
Google Cloud的Vertex AI平台提供了完整的批量推理解决方案,而Gemini系列模型作为Google最新推出的多模态大模型,在图像理解与生成任务上表现出色。本教程将以gemini-3-pro-image-preview模型为例,手把手演示如何构建端到端的批量推理流水线。
提示:批量推理与在线推理的成本结构不同。虽然批量任务需要更长的处理时间,但单位计算成本通常比实时API低30-50%,适合非实时业务场景。
开通必要服务:
服务账号配置:
bash复制# 创建专用服务账号
gcloud iam service-accounts create vertex-ai-batch \
--display-name="Vertex AI Batch Processing"
# 分配必要权限
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
--member="serviceAccount:vertex-ai-batch@YOUR_PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/aiplatform.user"
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
--member="serviceAccount:vertex-ai-batch@YOUR_PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/storage.objectAdmin"
本地环境准备:
bash复制pip install google-cloud-aiplatform google-cloud-storage
批量推理要求输入为JSON Lines格式(.jsonl),每条记录占一行。对于gemini-3-pro-image-preview模型,典型结构如下:
json复制{"image": {"gcsUri": "gs://your-bucket/path/to/image1.jpg"}, "task": "describe"}
{"image": {"gcsUri": "gs://your-bucket/path/to/image2.png"}, "task": "generate_caption"}
关键字段说明:
image.gcsUri: 必须指向Cloud Storage中的公开可读文件task: 自定义提示词的一部分,可根据需求修改创建JSONLgen.py脚本:
python复制import json
from pathlib import Path
def generate_jsonl(input_dir: str, output_file: str, bucket_name: str):
"""生成符合Vertex AI要求的JSONL文件
Args:
input_dir: 本地图片目录路径
output_file: 输出的JSONL文件路径
bucket_name: GCS存储桶名称
"""
image_exts = ('.jpg', '.jpeg', '.png', '.webp')
with open(output_file, 'w') as f:
for img_path in Path(input_dir).glob('*'):
if img_path.suffix.lower() in image_exts:
record = {
"image": {
"gcsUri": f"gs://{bucket_name}/{img_path.name}"
},
"task": "analyze_and_describe"
}
f.write(json.dumps(record) + '\n')
if __name__ == "__main__":
generate_jsonl(
input_dir="input_images",
output_file="batch_tasks.jsonl",
bucket_name="your-bucket-name"
)
注意事项:脚本运行前需确保所有图片已上传到指定GCS存储桶,且存储桶权限设置为公开可读。
使用gsutil命令行工具:
bash复制# 批量上传本地图片
gsutil -m cp input_images/* gs://your-bucket-name/
# 上传任务文件
gsutil cp batch_tasks.jsonl gs://your-bucket-name/batch_inputs/
监控上传进度:
bash复制gsutil du -sh gs://your-bucket-name/ # 查看存储桶占用空间
gsutil ls -l gs://your-bucket-name/input_images/ # 查看具体文件
网络优化技巧:
-m)gsutil -o "GSUtil:parallel_process_count=8"调整并发数导航到批量推理页面:
关键参数配置:
gs://your-bucket/batch_outputs/)高级选项:
json复制{
"max_output_tokens": 2048,
"temperature": 0.4,
"top_p": 0.9,
"stop_sequences": ["\n\n"]
}
对于需要自动化的场景,可使用Python SDK:
python复制from google.cloud import aiplatform
def create_batch_prediction_job(
project: str,
location: str,
model_name: str,
input_uri: str,
output_uri: str
):
aiplatform.init(project=project, location=location)
model = aiplatform.Model(model_name=model_name)
batch_job = model.batch_predict(
job_display_name="gemini-image-batch-1",
gcs_source=input_uri,
gcs_destination_prefix=output_uri,
instances_format="jsonl",
predictions_format="jsonl",
machine_type="n1-standard-4", # 根据任务规模调整
accelerator_count=0 # Gemini模型无需GPU加速
)
print(f"Job created: {batch_job.display_name}")
print(f"Monitoring URL: {batch_job._dashboard_uri()}")
# 调用示例
create_batch_prediction_job(
project="your-project-id",
location="us-central1",
model_name="gemini-3-pro-image-preview",
input_uri="gs://your-bucket/batch_inputs/batch_tasks.jsonl",
output_uri="gs://your-bucket/batch_outputs/"
)
并发度调整:
starting_replica_count=10监控与中断:
bash复制# 查看运行中任务
gcloud ai operations list --region=us-central1 --filter="metadata.operationType=BatchPredict"
# 取消任务
gcloud ai operations cancel OPERATION_ID --region=us-central1
预算预警设置:
bash复制gcloud alpha billing budgets create \
--display-name="Vertex AI Batch Budget" \
--budget-amount=500 \
--threshold-rule=percent=0.5 \
--threshold-rule=percent=0.8 \
--filter="service:aiplatform.googleapis.com"
任务完成后,输出目录会包含:
prediction.results-NNNNN-of-NNNNN.jsonl:实际预测结果prediction.errors_stats-NNNNN-of-NNNNN.json:错误统计prediction.log:完整日志典型成功响应:
json复制{
"instance": {"image": {"gcsUri": "gs://.../image1.jpg"}, "task": "describe"},
"prediction": {
"content": "这是一张阳光下的向日葵照片...",
"safety_ratings": {...}
}
}
创建parseJSONL.py脚本:
python复制import json
import shutil
from pathlib import Path
from urllib.parse import urlparse
from google.cloud import storage
def download_and_parse(output_dir: str, jsonl_path: str):
"""解析JSONL结果并下载相关图片
Args:
output_dir: 输出目录路径
jsonl_path: 下载的JSONL结果文件路径
"""
client = storage.Client()
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
with open(jsonl_path) as f:
for line in f:
try:
data = json.loads(line)
if 'prediction' in data:
# 提取原始图片名
gcs_uri = data['instance']['image']['gcsUri']
img_name = Path(urlparse(gcs_uri).path).name
# 保存结果到Markdown文件
md_file = output_path / f"{img_name}.md"
with open(md_file, 'w') as md:
md.write(f"# 分析结果 - {img_name}\n\n")
md.write(f"**原始任务**: {data['instance'].get('task')}\n\n")
md.write(f"**生成内容**:\n{data['prediction']['content']}\n\n")
md.write("## 安全评估\n")
for cat, rating in data['prediction']['safety_ratings'].items():
md.write(f"- {cat}: {rating['probability']}\n")
print(f"Processed: {img_name}")
except json.JSONDecodeError:
print(f"Invalid JSON line: {line}")
if __name__ == "__main__":
download_and_parse(
output_dir="analysis_results",
jsonl_path="downloaded_results.jsonl"
)
并行处理:
python复制from concurrent.futures import ThreadPoolExecutor
def process_line(line):
# 处理单行记录
pass
with ThreadPoolExecutor(max_workers=8) as executor:
with open("large_results.jsonl") as f:
executor.map(process_line, f)
增量处理:
gsutil rsync只下载新增结果文件结果可视化:
python复制import pandas as pd
# 将JSONL转换为DataFrame
records = [json.loads(line) for line in open('results.jsonl')]
df = pd.json_normalize(records)
# 生成统计报告
report = df['prediction.content'].apply(len).describe()
print(report)
| 错误现象 | 可能原因 | 解决方案 |
|---|---|---|
| 任务长时间处于"Pending"状态 | 配额不足 | 检查IAM配额或申请提升 |
| 部分记录处理失败 | 图片URL不可访问 | 验证GCS对象权限 |
| 输出内容截断 | 超过token限制 | 调整max_output_tokens参数 |
| 大量安全过滤 | 图片内容敏感 | 检查safety_settings配置 |
监控指标:
bash复制# 查看任务资源使用
gcloud ai operations describe OPERATION_ID \
--region=us-central1 \
--format="value(done, metadata.stats)"
典型优化方向:
Gemini-3-pro-image-preview的已知限制:
对于需要更高分辨率的场景,建议:
mermaid复制graph LR
A[原始图片] --> B{Gemini分析}
B --> C[生成描述]
C --> D[翻译模型]
D --> E[多语言输出]
实现代码框架:
python复制def multi_model_pipeline(image_uri):
# 第一阶段:Gemini分析
gemini_result = analyze_with_gemini(image_uri)
# 第二阶段:翻译
translation = translate_text(
text=gemini_result['description'],
target_language='es'
)
# 第三阶段:质量检查
quality_score = check_quality(translation)
return {
'original_analysis': gemini_result,
'translation': translation,
'quality_score': quality_score
}
使用Cloud Composer(托管Airflow)创建定期任务:
python复制from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def generate_and_submit_batch(**kwargs):
# 封装前面的生成和提交逻辑
pass
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True
}
with DAG(
'weekly_image_analysis',
default_args=default_args,
schedule_interval='0 3 * * 1', # 每周一凌晨3点
start_date=datetime(2024, 1, 1)
) as dag:
run_batch = PythonOperator(
task_id='submit_batch_job',
python_callable=generate_and_submit_batch,
op_kwargs={
'input_bucket': 'production-image-uploads',
'output_prefix': 'analysis_results/{{ ds_nodash }}'
}
)
通过Pub/Sub触发:
python复制from google.cloud import pubsub_v1
def callback(message):
image_uri = message.data.decode()
# 触发处理流程
message.ack()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
"your-project", "image-analysis-sub"
)
streaming_pull = subscriber.subscribe(subscription_path, callback=callback)
BigQuery结果存储:
python复制from google.cloud import bigquery
def save_to_bq(results):
client = bigquery.Client()
table_ref = client.dataset("ai_results").table("image_analysis")
errors = client.insert_rows_json(
table_ref,
[{
"image_uri": r['instance']['image']['gcsUri'],
"analysis_text": r['prediction']['content'],
"processed_at": datetime.utcnow().isoformat()
} for r in results]
)
if errors:
print(f"BigQuery errors: {errors}")
在实际项目中,我们团队发现几个关键优化点值得分享: