最近在搭建推理终端服务时,我发现大多数项目都强制要求使用Python原生绑定,这在不同语言环境和构建体系中造成了诸多限制。为此我开发了hfendpoint-draft项目,尝试将API暴露逻辑与推理引擎彻底解耦。这个方案成功适配了transformers和llama.cpp引擎,支持图像生成、嵌入向量计算和流式聊天补全功能。而新出现的Nano-vLLM项目——一个轻量级vLLM实现——恰好成为验证这个架构灵活性的绝佳案例。
Nano-vLLM的官方示例展示了基础用法,但直接使用LLM类无法满足高并发Web服务需求。通过分析源码,我梳理出四个关键模块:
要将离线示例改造成可并发的Web服务,需要解决三个核心问题:
Worker作为连接Web层与推理引擎的桥梁,我对其进行了三项关键改造:
python复制class Worker:
def __init__(self):
# 自动下载模型(部署友好)
model_path = snapshot_download(repo_id="Qwen/Qwen3-0.6B")
# 初始化核心组件
self.config = Config(model_path, **CONFIG)
self.tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=True)
self.config.eos = self.tokenizer.eos_token_id
# 异步通信设施
self.requests = queue.Queue()
self.notifier = threading.Condition()
# 多GPU并行支持
if self.config.tensor_parallel_size > 1:
ctx = mp.get_context("spawn")
for i in range(1, self.config.tensor_parallel_size):
event = ctx.Event()
process = ctx.Process(
target=ModelRunner,
args=(self.config, i, event)
)
process.start()
self.processes.append(process)
self.events.append(event)
self.model_runner = ModelRunner(self.config, 0, self.events)
self.scheduler = Scheduler(self.config)
atexit.register(self.stop)
关键改进:增加线程安全的请求队列和条件变量,实现异步通知机制
_run方法是整个服务的核心事件循环,其工作流程如下:
notifier.wait_for休眠直到有新请求scheduler.add加入调度系统scheduler.schedule生成最优计算批次model_runner.run执行前向计算scheduler.postprocess更新序列状态response_queue将token实时返回python复制def _run(self):
while True:
try:
with self.notifier:
self.notifier.wait_for(
lambda: not self.requests.empty()
or not self.scheduler.is_finished()
)
# 批量处理请求
while not self.requests.empty():
seq = self.requests.get_nowait()
self.scheduler.add(seq)
# 执行计算
sequences, is_prefill = self.scheduler.schedule()
if not sequences:
continue
new_token_ids = self.model_runner.call("run", sequences, is_prefill)
self.scheduler.postprocess(sequences, new_token_ids)
# 流式返回结果
for seq, token_id in zip(sequences, new_token_ids):
response_queue = getattr(seq, 'response_queue', None)
if not response_queue:
continue
self.loop.call_soon_threadsafe(
response_queue.put_nowait,
token_id
)
if seq.is_finished:
self.loop.call_soon_threadsafe(
response_queue.put_nowait,
None
)
except Exception as e:
hfendpoint.error(f"worker loop: {e}")
submit方法实现了线程安全的请求提交:
python复制def submit(self, prompt_token_ids: list[int], sampling_params: SamplingParams) -> asyncio.Queue:
seq = Sequence(prompt_token_ids, sampling_params)
seq.response_queue = asyncio.Queue() # 每个请求独立响应队列
self.requests.put(seq)
with self.notifier:
self.notifier.notify() # 唤醒工作线程
return seq.response_queue # 返回客户端可监听的队列
这种设计实现了:
python复制worker = Worker()
@hfendpoint.handler("chat_completions")
async def chat(request_data: Dict[str, Any]):
# 对话模板处理
prompt_text = worker.tokenizer.apply_chat_template(
request_data["messages"],
tokenize=False,
add_generation_prompt=True,
enable_thinking=True
)
# Token化处理
prompt_token_ids = worker.tokenizer.encode(prompt_text)
# 采样参数设置
sampling_params = SamplingParams(
temperature=request_data.get("temperature", 0.7),
max_tokens=request_data.get("max_tokens", 2048),
)
# 提交推理请求
response = worker.submit(prompt_token_ids, sampling_params)
# 流式解码
decoder = DecodeStream(skip_special_tokens=True)
while True:
token_id = await response.get()
if token_id is None:
break
output = decoder.step(worker.tokenizer._tokenizer, token_id)
if output:
yield {"content": output}
yield {"content": "", "finish_reason": "stop"}
Nano-vLLM需要GPU支持,Hugging Face Inference Endpoints提供了开箱即用的解决方案:
创建终端节点:
容器配置:
ghcr.io/angt/hfendpoint-draft-nanovllm访问控制:
部署完成后,可通过以下方式测试:
bash复制HF_ENDPOINT_URL=https://<your-endpoint-name>.endpoints.huggingface.cloud
HF_TOKEN=hf_xxxx
curl "$HF_ENDPOINT_URL/v1/chat/completions" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $HF_TOKEN" \
-d '{
"messages": [
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": "Hello!"
}
]
}'
通过调整Scheduler的以下参数可显著提升吞吐量:
| 参数 | 建议值 | 影响 |
|---|---|---|
| max_batch_size | 8-16 | 增大可提升GPU利用率但增加延迟 |
| max_seq_len | 2048 | 根据实际需求调整 |
| prefilling_timeout | 0.1s | 平衡等待时间与批处理效率 |
Nano-vLLM的KV缓存实现较精简,可通过以下方式改进:
python复制config = Config(
max_num_seqs=64, # 最大并发序列数
max_seq_len=4096,
block_size=64, # 缓存块大小
gpu_memory_utilization=0.9 # GPU内存利用率
)
在实际部署中遇到的典型问题及解决方案:
GPU内存不足:
gpu_memory_utilization或使用更小模型响应延迟高:
scheduler.schedule的耗时prefilling_timeout减少空等时间token解码异常:
decode参数设置,特别是skip_special_tokens多GPU负载不均:
tensor_parallel_size与实际GPU数量这个架构已经成功支持Qwen3-0.6B模型的实时推理,平均延迟控制在200ms以内(L4 GPU)。最大的收获是验证了引擎与服务层解耦的可行性——后续集成新引擎时,只需实现对应的Worker类即可保持API不变。对于需要快速实验不同推理后端的场景,这种设计能大幅降低迁移成本。