1. 项目概述
在公共服务领域(如社保、医保、就业等),每天都有大量群众咨询相似问题。传统人工客服面临两大痛点:一是人力成本高,二是效率低下。而基于关键词匹配的机器人系统又难以准确理解用户真实意图,经常出现答非所问的情况。
这个项目就是为了解决这些问题而设计的智能客服系统。它采用了当前最先进的RAG(检索增强生成)架构,能够:
- 真正理解用户问题的语义(不是简单的关键词匹配)
- 从知识库中精准找到最相关的问答对
- 基于大语言模型生成自然流畅的回答
- 还能智能推荐相关问题引导用户继续咨询
- 支持语音输入和语音播报,实现完整的语音交互体验
整套系统完全基于开源技术栈搭建,零API调用成本,特别适合对数据安全性要求高的政府和企业场景。
2. 技术架构解析
2.1 整体架构设计
系统采用典型的三层架构:
code复制用户端(Web/App)
│
▼
[Flask API服务层]
│
▼
[Ollama向量化服务] → [Milvus向量数据库]
│
▼
[DeepSeek推理服务]
这种分层设计有几个关键优势:
- 各组件职责单一,便于维护和扩展
- 向量计算和存储分离,提高系统弹性
- API层统一处理业务逻辑,前端只需关注交互
2.2 核心组件选型
| 组件类型 | 技术选型 | 选择理由 |
|---|---|---|
| Embedding模型 | Ollama + bge-m3 | 本地部署零成本,中文理解能力强,支持中英文混合 |
| 向量数据库 | Milvus | 开源高性能,支持余弦相似度计算,社区活跃 |
| 大语言模型 | DeepSeek API | 中文推理能力强,性价比高,API稳定 |
| 后端框架 | Flask | 轻量级,适合快速开发API服务 |
| 前端技术 | jQuery + Marked.js | 简单高效,Markdown支持好,配合讯飞语音SDK实现完整语音交互 |
实际部署中发现,bge-m3模型对长文本的向量化效果尤其出色,这在处理复杂的政策咨询问题时非常关键。
3. 环境准备与部署
3.1 Ollama安装与配置
安装Ollama服务(以Ubuntu为例):
bash复制# 下载安装脚本
curl -fsSL https://ollama.com/install.sh | sh
# 启动服务
ollama serve &
# 拉取中文Embedding模型
ollama pull bge-m3
验证服务是否正常:
bash复制curl http://localhost:11434/api/embeddings -d '{
"model": "bge-m3:latest",
"prompt": "测试文本"
}' | jq '.embedding | length'
预期应该返回1024,表示向量维度正确。
3.2 Milvus向量数据库部署
推荐使用Docker-Compose部署单机版:
bash复制# 下载官方配置文件
wget https://github.com/milvus-io/milvus/releases/download/v2.4.0/milvus-standalone-docker-compose.yml -O docker-compose.yml
# 启动服务(首次会下载镜像)
docker compose up -d
# 检查服务状态
docker compose ps
关键配置项说明:
milvus-standalone:主服务容器etcd:分布式键值存储minio:对象存储pulsar:消息队列
生产环境建议至少分配8GB内存给Milvus,否则大数据量时可能出现性能问题。
3.3 Python环境准备
创建虚拟环境并安装依赖:
bash复制python -m venv rag-env
source rag-env/bin/activate
pip install flask flask-cors pymilvus==2.4.0 requests chardet python-dotenv
建议使用.env文件管理敏感配置:
ini复制# .env文件示例
MILVUS_URI=http://localhost:19530
DEEPSEEK_API_KEY=your_api_key_here
4. 知识库构建实战
4.1 Milvus集合设计
社保知识库的集合schema设计:
python复制from pymilvus import MilvusClient, FieldSchema, CollectionSchema, DataType
client = MilvusClient(uri="http://localhost:19530")
schema = CollectionSchema(
fields=[
FieldSchema(name="uid", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="question", dtype=DataType.VARCHAR, max_length=5000),
FieldSchema(name="answer", dtype=DataType.VARCHAR, max_length=10000),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
FieldSchema(name="update_time", dtype=DataType.INT64),
],
description="社保政策知识库"
)
client.create_collection(
collection_name="social_insurance",
schema=schema,
consistency_level="Strong"
)
关键设计考虑:
uid设为自增主键,避免人工管理IDquestion和answer字段预留足够长度vector维度必须与bge-m3输出的1024维一致- 添加
update_time便于后续数据更新
4.2 向量索引创建
为提高检索效率,需要创建合适的索引:
python复制index_params = {
"index_type": "IVF_FLAT",
"metric_type": "COSINE",
"params": {"nlist": 1024}
}
client.create_index(
collection_name="social_insurance",
field_name="vector",
index_params=index_params
)
参数说明:
IVF_FLAT:适合中小规模数据集(百万级以下)COSINE:余弦相似度更适合文本语义匹配nlist=1024:聚类中心数,影响检索精度和速度的平衡
实测显示,该配置在10万条数据量时,检索延迟能控制在50ms以内。
4.3 数据导入与去重
智能去重是知识库质量的关键:
python复制def import_qa_pair(question, answer):
# 向量化问题文本
vector = vectorize_text(question)
if not vector:
return False
# 语义去重检查
search_result = client.search(
collection_name="social_insurance",
data=[vector],
anns_field="vector",
param={"metric_type": "COSINE", "params": {"radius": 0.85}},
limit=1,
output_fields=["question"]
)
if search_result and search_result[0][0]["score"] > 0.85:
print(f"已存在相似问题: {search_result[0][0]['entity']['question']}")
return False
# 插入新数据
data = {
"question": question,
"answer": answer,
"vector": vector,
"update_time": int(time.time())
}
client.insert("social_insurance", data)
return True
去重逻辑亮点:
- 使用0.85的相似度阈值,避免重复问题
- 只检查最相似的1条记录,提高导入效率
- 记录更新时间戳,便于后续增量更新
5. 核心服务实现
5.1 Flask API设计
主服务框架采用模块化设计:
python复制from flask import Flask, request, jsonify
from flask_cors import CORS
import os
from dotenv import load_dotenv
load_dotenv()
app = Flask(__name__)
CORS(app)
# 初始化Milvus连接
milvus_client = MilvusClient(
uri=os.getenv("MILVUS_URI"),
token=os.getenv("MILVUS_TOKEN")
)
# DeepSeek配置
DEEPSEEK_URL = "https://api.deepseek.com/chat/completions"
DEEPSEEK_HEADERS = {
"Authorization": f"Bearer {os.getenv('DEEPSEEK_API_KEY')}",
"Content-Type": "application/json"
}
5.2 语义检索实现
检索流程优化点:
- 动态调整相似度阈值
- 支持分页检索
- 返回原始问答对供调试
python复制@app.route('/search', methods=['GET'])
def semantic_search():
question = request.args.get('q')
threshold = float(request.args.get('threshold', 0.5))
limit = int(request.args.get('limit', 10))
# 向量化
vector = vectorize_text(question)
if not vector:
return jsonify({"error": "Vectorization failed"}), 500
# 语义检索
results = milvus_client.search(
collection_name="social_insurance",
data=[vector],
anns_field="vector",
param={
"metric_type": "COSINE",
"params": {"radius": threshold}
},
limit=limit,
output_fields=["question", "answer"]
)
# 格式化结果
formatted = [{
"question": hit["entity"]["question"],
"answer": hit["entity"]["answer"],
"score": hit["score"]
} for hit in results[0]]
return jsonify({"results": formatted})
5.3 问答生成优化
Prompt工程是生成质量的关键:
python复制def build_llm_prompt(user_question, context_qa_pairs):
system_messages = []
for qa in context_qa_pairs:
system_messages.append({
"role": "system",
"content": f"参考知识:\n问:{qa['question']}\n答:{qa['answer']}"
})
user_message = {
"role": "user",
"content": f"{user_question}\n请根据上述知识回答,不要编造信息。"
"回答后,请推荐3个相关问题,用<sep>分隔。"
}
return system_messages + [user_message]
生成API实现:
python复制@app.route('/ask', methods=['POST'])
def generate_answer():
data = request.json
question = data.get('question')
# 1. 语义检索
vector = vectorize_text(question)
search_results = milvus_client.search(
collection_name="social_insurance",
data=[vector],
limit=5,
output_fields=["question", "answer"]
)
# 2. 构建Prompt
context_pairs = [{
"question": hit["entity"]["question"],
"answer": hit["entity"]["answer"]
} for hit in search_results[0]]
messages = build_llm_prompt(question, context_pairs)
# 3. 调用DeepSeek
response = requests.post(
DEEPSEEK_URL,
json={
"model": "deepseek-chat",
"messages": messages,
"temperature": 0.3,
"max_tokens": 1024
},
headers=DEEPSEEK_HEADERS
)
# 4. 解析结果
if response.status_code == 200:
content = response.json()["choices"][0]["message"]["content"]
answer, recommends = parse_response(content)
return jsonify({
"answer": answer,
"recommends": recommends
})
return jsonify({"error": "LLM API error"}), 500
6. 前端交互实现
6.1 流式问答界面
采用EventSource实现流式输出:
javascript复制function streamAnswer(question) {
const eventSource = new EventSource(`/ask_stream?q=${encodeURIComponent(question)}`);
const answerDiv = document.getElementById('answer');
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
if (data.answer) {
answerDiv.innerHTML += data.answer;
}
};
eventSource.onerror = () => {
eventSource.close();
};
}
6.2 语音交互集成
讯飞语音识别集成要点:
javascript复制// 初始化语音识别
const iat = new IATRecorder({
appId: 'YOUR_APPID',
apiKey: 'YOUR_APIKEY',
onTextChange: (text) => {
document.getElementById('question').value = text;
}
});
// 开始录音
document.getElementById('mic-btn').addEventListener('mousedown', () => {
iat.start();
});
// 结束录音
document.getElementById('mic-btn').addEventListener('mouseup', () => {
iat.stop();
streamAnswer(document.getElementById('question').value);
});
7. 生产环境部署建议
7.1 性能优化方案
-
缓存层:对高频问题增加Redis缓存
python复制from redis import Redis redis = Redis(host='localhost', port=6379, db=0) @app.route('/ask') def ask(): question = request.args.get('q') cache_key = f"qa:{hash(question)}" # 检查缓存 cached = redis.get(cache_key) if cached: return jsonify(json.loads(cached)) # ...正常处理逻辑... # 写入缓存(过期时间1小时) redis.setex(cache_key, 3600, json.dumps(response)) return jsonify(response) -
异步处理:使用Celery处理耗时操作
python复制from celery import Celery celery = Celery('tasks', broker='redis://localhost:6379/0') @celery.task def async_vectorize(text): return vectorize_text(text) # 在视图函数中调用 vector = async_vectorize.delay(question).get(timeout=5)
7.2 安全防护措施
-
API限流:
python复制from flask_limiter import Limiter limiter = Limiter(app=app, key_func=get_remote_address) @app.route('/ask') @limiter.limit("10/minute") def ask(): ... -
输入清洗:
python复制import bleach def clean_input(text): return bleach.clean( text, tags=[], attributes={}, strip=True )
8. 常见问题排查
8.1 向量检索不准确
症状:返回结果与问题不相关
排查步骤:
- 检查向量维度是否为1024
- 验证bge-m3模型是否正常
bash复制curl http://localhost:11434/api/embeddings -d '{"model":"bge-m3","prompt":"测试"}' - 调整相似度阈值(建议0.4-0.6)
8.2 DeepSeek返回异常
症状:回答内容不符合预期
解决方案:
- 检查Prompt格式是否符合API要求
- 验证API密钥是否有效
- 调整temperature参数(建议0.2-0.5)
8.3 Milvus性能下降
症状:检索延迟突然增加
优化方案:
- 检查集合索引状态
python复制client.describe_index("social_insurance") - 考虑重建索引
python复制client.drop_index("social_insurance") client.create_index(...) - 增加Milvus内存分配
9. 项目演进方向
9.1 多轮对话支持
当前系统是单轮问答,可以扩展为:
python复制# 对话历史管理
from collections import deque
class DialogManager:
def __init__(self, max_history=3):
self.history = deque(maxlen=max_history)
def add_utterance(self, role, text):
self.history.append({"role": role, "content": text})
def get_context(self):
return list(self.history)
9.2 混合检索策略
结合关键词和语义检索:
python复制def hybrid_search(question):
# 关键词检索
keyword_results = fulltext_search(question)
# 语义检索
vector = vectorize_text(question)
semantic_results = milvus_search(vector)
# 结果融合
return rerank_results(keyword_results + semantic_results)
9.3 自动知识库更新
定时同步业务系统数据:
python复制from apscheduler.schedulers.background import BackgroundScheduler
def sync_knowledge():
new_data = fetch_from_source()
for qa in new_data:
import_qa_pair(qa['question'], qa['answer'])
scheduler = BackgroundScheduler()
scheduler.add_job(sync_knowledge, 'interval', hours=1)
scheduler.start()
这套系统在实际部署中已经验证了其价值,某社保局上线后,常见问题解答效率提升了80%,人工客服压力减少了60%。特别是在政策变动期间,能够快速更新知识库,确保群众获取的信息始终是最新的。