1. 项目概述:基于RAG架构的智能问答系统
在当今企业环境中,技术文档、产品手册和常见问题解答往往分散在各个系统中,员工需要花费大量时间查找信息。我们设计了一个基于RAG(检索增强生成)架构的智能问答系统,通过自然语言交互帮助用户快速获取准确答案。
这个系统采用前后端分离架构,后端使用Python和FastAPI框架构建,负责文档处理、向量化存储和智能问答核心逻辑;前端采用Vue3框架,提供直观的用户界面。系统支持多种文档格式(PDF、Word、Markdown等),能够自动解析内容、生成向量表示,并在用户提问时检索最相关的文档片段,结合大语言模型生成准确回答。
提示:RAG技术的核心优势在于它结合了信息检索的准确性和大语言模型的生成能力,既避免了传统搜索引擎只返回片段的问题,又解决了大模型"幻觉"和知识滞后的问题。
2. 系统架构设计
2.1 整体架构
系统采用分层架构设计,各组件职责明确:
code复制┌───────────────────────────────────────────────────────────────┐
│ 前端应用层 (Vue3) │
│ ┌─────────────┐ ┌─────────────┐ ┌──────────────────┐ │
│ │ 用户界面 │ │ 状态管理 │ │ API封装层 │ │
│ └─────────────┘ └─────────────┘ └──────────────────┘ │
└───────────────────────────────────────────────────────────────┘
▲
│ HTTP/WebSocket
▼
┌───────────────────────────────────────────────────────────────┐
│ 后端服务层 (FastAPI) │
│ ┌─────────────┐ ┌─────────────┐ ┌──────────────────┐ │
│ │ API接口层 │ ◄─ │ 业务逻辑层 │ ◄─ │ 数据访问层 │ │
│ └─────────────┘ └─────────────┘ └──────────────────┘ │
└───────────────────────────────────────────────────────────────┘
▲
│
▼
┌───────────────────────────────────────────────────────────────┐
│ 数据存储层 │
│ ┌─────────────┐ ┌─────────────┐ ┌──────────────────┐ │
│ │ 关系型数据库 │ │ 向量数据库 │ │ 大模型API │ │
│ │ (MySQL) │ │ (ChromaDB) │ │ (OpenAI/智谱AI) │ │
│ └─────────────┘ └─────────────┘ └──────────────────┘ │
└───────────────────────────────────────────────────────────────┘
2.2 技术选型分析
后端技术栈
- FastAPI框架:高性能异步Web框架,自动生成API文档,适合构建RESTful服务
- SQLAlchemy ORM:Python中最成熟的ORM工具,支持多种数据库后端
- ChromaDB:轻量级向量数据库,支持本地部署和持久化存储
- Celery:分布式任务队列,用于异步处理文档解析和向量化
- Sentence-Transformers:生成高质量文本嵌入向量的开源模型
前端技术栈
- Vue3:渐进式JavaScript框架,组件化开发体验优秀
- Pinia:Vue官方推荐的状态管理库,替代Vuex
- Axios:处理HTTP请求,与后端API交互
- Markdown渲染:将模型返回的Markdown格式答案渲染为富文本
部署方案
- Docker容器化:前后端服务分别容器化,便于部署和扩展
- Nginx反向代理:处理静态资源和负载均衡
- Redis:作为Celery的消息代理和结果后端
3. 核心功能实现
3.1 文档处理流程
文档处理是系统的核心功能之一,完整的处理流程如下:
- 文档上传:用户通过前端界面上传文档,支持拖拽和批量上传
- 文件存储:后端接收文件后保存到指定目录,并在数据库中记录元数据
- 异步处理:Celery任务队列启动文档处理任务
- 内容解析:根据文件类型调用相应的解析器提取文本内容
- 文本分块:按照配置的分块大小和重叠参数将文本切分为片段
- 向量生成:使用Embedding模型为每个文本块生成向量表示
- 向量存储:将文本块及其向量存储到ChromaDB中
- 状态更新:更新文档处理状态,可供检索使用
3.1.1 文档解析实现
文档解析器需要支持多种格式,我们采用策略模式设计解析器类:
python复制class DocumentParser:
"""文档解析器类"""
SUPPORTED_TYPES = {
'.pdf': 'parse_pdf',
'.docx': 'parse_docx',
'.txt': 'parse_text',
'.md': 'parse_markdown',
'.html': 'parse_html',
'.xlsx': 'parse_xlsx'
}
def parse_document(self, file_path: str, filename: str) -> tuple[str, list[str], list[Any]]:
file_ext = os.path.splitext(filename)[1].lower()
if file_ext not in self.SUPPORTED_TYPES:
raise ValueError(f"不支持的文件类型: {file_ext}")
parser_method = self.SUPPORTED_TYPES[file_ext]
parser_func = getattr(self, parser_method)
text = parser_func(file_path)
text = self.clean_text(text)
chunks = self.split_text(text)
chunk_metadatas = []
for i, chunk in enumerate(chunks):
chunk_metadatas.append({
'source': filename,
'chunk_index': i,
'total_chunks': len(chunks),
'file_type': file_ext
})
return text, chunks, chunk_metadatas
每种文件类型有对应的解析方法,例如PDF解析:
python复制@staticmethod
def parse_pdf(file_path: str) -> str:
text = ""
try:
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page_num, page in enumerate(pdf_reader.pages):
page_text = page.extract_text()
if page_text:
text += f"\n--- Page {page_num + 1} ---\n"
text += page_text
except Exception as e:
logger.error(f"PDF解析失败: {e}")
raise
return text
3.1.2 文本分块策略
文本分块是影响检索效果的关键因素,我们采用基于段落和固定大小的混合分块策略:
python复制def split_text(self, text: str) -> List[str]:
chunks = []
if len(text) <= self.chunk_size:
return [text]
paragraphs = text.split('\n\n')
current_chunk = ""
for paragraph in paragraphs:
if len(current_chunk) + len(paragraph) > self.chunk_size:
if current_chunk:
chunks.append(current_chunk.strip())
if len(paragraph) > self.chunk_size:
sentences = re.split(r'([。!?!?])', paragraph)
temp_chunk = ""
for part in sentences:
if len(temp_chunk) + len(part) > self.chunk_size:
if temp_chunk:
chunks.append(temp_chunk.strip())
temp_chunk = part
else:
temp_chunk += part
current_chunk = temp_chunk
else:
current_chunk = paragraph
else:
current_chunk = current_chunk + "\n\n" + paragraph if current_chunk else paragraph
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
注意事项:分块大小需要根据文档类型和内容特点进行调整。技术文档通常适合500-800字符的块大小,而结构化数据可能需要更小的块。
3.2 向量数据库集成
3.2.1 ChromaDB封装
我们封装了ChromaDB的操作,提供简洁的接口:
python复制class VectorStore:
def __init__(self):
self.client = chromadb.Client(ChromaSettings(
chroma_db_impl="duckdb+parquet",
persist_directory=settings.chroma_persist_dir,
anonymized_telemetry=False
))
self.collection = self.client.get_or_create_collection(
name=settings.chroma_collection_name,
metadata={"hnsw:space": "cosine"}
)
def add_documents(self, ids: List[str], embeddings: List[List[float]],
metadatas: List[Dict[str, Any]], documents: List[str]) -> bool:
try:
self.collection.add(
ids=ids,
embeddings=embeddings,
metadatas=metadatas,
documents=documents
)
return True
except Exception as e:
logger.error(f"添加文档向量失败: {e}")
return False
def search(self, query_embedding: List[float], top_k: int = 5,
where: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
try:
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
where=where
)
formatted_results = []
if results['ids'] and results['ids'][0]:
for i in range(len(results['ids'][0])):
formatted_results.append({
'id': results['ids'][0][i],
'distance': results['distances'][0][i] if results['distances'] else 1.0,
'metadata': results['metadatas'][0][i] if results['metadatas'] else {},
'document': results['documents'][0][i] if results['documents'] else ''
})
return formatted_results
except Exception as e:
logger.error(f"向量搜索失败: {e}")
return []
3.2.2 Embedding模型选择
我们支持多种Embedding模型,默认使用BAAI/bge-small-zh模型:
python复制class EmbeddingService:
def __init__(self, model_name: str = "BAAI/bge-small-zh"):
self.model = SentenceTransformer(model_name)
self.model.max_seq_length = 512 # 设置最大序列长度
def embed_text(self, text: str) -> List[float]:
# 对文本进行预处理
text = self.preprocess_text(text)
# 生成嵌入向量
embedding = self.model.encode(text, normalize_embeddings=True)
return embedding.tolist()
@staticmethod
def preprocess_text(text: str) -> str:
# 移除多余空格和换行
text = ' '.join(text.split())
return text
实操心得:Embedding模型的选择对检索效果影响很大。对于中文场景,BAAI系列模型通常表现优于通用模型。如果硬件条件允许,可以使用更大的模型如bge-large-zh提高准确性。
3.3 RAG问答服务实现
3.3.1 问答流程
完整的问答流程包括以下步骤:
- 接收用户问题
- 生成问题向量
- 从向量数据库检索相关文档片段
- 构建包含上下文的Prompt
- 调用大语言模型API
- 解析并返回结果
python复制class RAGService:
def __init__(self, embedding_service: EmbeddingService,
llm_service: LLMService, vector_store: VectorStore):
self.embedding_service = embedding_service
self.llm_service = llm_service
self.vector_store = vector_store
async def ask_question(self, question: str, top_k: int = 5) -> dict:
# 生成问题向量
question_embedding = self.embedding_service.embed_text(question)
# 检索相关文档
search_results = self.vector_store.search(question_embedding, top_k=top_k)
# 构建上下文
context = "\n\n".join([res['document'] for res in search_results])
# 构建Prompt
prompt = self.build_prompt(question, context)
# 调用大模型
response = await self.llm_service.generate(prompt)
# 解析结果
return {
"answer": response,
"sources": [
{
"document": res['document'],
"metadata": res['metadata'],
"similarity": 1 - res['distance']
} for res in search_results
]
}
def build_prompt(self, question: str, context: str) -> str:
return f"""基于以下上下文信息,回答问题。如果上下文不包含答案,请回答"我不知道"。
上下文:
{context}
问题:{question}
答案:"""
3.3.2 大模型服务封装
我们封装了多种大模型API的调用:
python复制class LLMService:
def __init__(self, provider: str = "openai", model: str = "gpt-3.5-turbo"):
self.provider = provider
self.model = model
self.client = self._init_client()
def _init_client(self):
if self.provider == "openai":
return openai.AsyncOpenAI(api_key=settings.openai_api_key)
elif self.provider == "zhipuai":
return zhipuai.ZhipuAI(api_key=settings.zhipuai_api_key)
else:
raise ValueError(f"不支持的提供商: {self.provider}")
async def generate(self, prompt: str, temperature: float = 0.7) -> str:
if self.provider == "openai":
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature
)
return response.choices[0].message.content
elif self.provider == "zhipuai":
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature
)
return response.choices[0].message.content
提示:在实际应用中,可以同时接入多个大模型提供商,根据响应时间、成本和效果动态选择最合适的模型。
3.4 异步任务处理
文档解析和向量化是计算密集型任务,我们使用Celery实现异步处理:
python复制from celery import Celery
from app.config import settings
from app.services.document_service import DocumentService
celery = Celery(
"tasks",
broker=settings.celery_broker_url,
backend=settings.celery_result_backend
)
@celery.task(bind=True)
def process_document_task(self, document_id: int):
"""文档处理异步任务"""
document_service = DocumentService()
try:
document_service.process_document(document_id)
return {"status": "success", "document_id": document_id}
except Exception as e:
return {"status": "failed", "error": str(e)}
在FastAPI中触发异步任务:
python复制@app.post("/documents/upload")
async def upload_document(file: UploadFile = File(...), db: Session = Depends(get_db)):
# 保存文件
file_path = save_upload_file(file)
# 创建文档记录
db_document = create_document_record(db, file)
# 触发异步任务
process_document_task.delay(db_document.id)
return {"id": db_document.id, "status": "processing"}
4. 前端实现要点
4.1 核心组件设计
前端采用Vue3组合式API开发,主要组件包括:
- 文档上传组件:支持拖拽和批量上传
- 对话界面组件:显示对话历史和用户输入
- 知识库管理组件:查看和管理上传的文档
- 设置面板组件:配置模型参数和系统设置
4.2 API封装
我们使用Axios封装后端API调用:
javascript复制import axios from 'axios';
const apiClient = axios.create({
baseURL: import.meta.env.VITE_API_BASE_URL,
timeout: 10000,
headers: {
'Content-Type': 'application/json',
}
});
// 请求拦截器
apiClient.interceptors.request.use(config => {
const token = localStorage.getItem('access_token');
if (token) {
config.headers.Authorization = `Bearer ${token}`;
}
return config;
});
// 响应拦截器
apiClient.interceptors.response.use(
response => response.data,
error => {
if (error.response?.status === 401) {
// 处理未授权错误
}
return Promise.reject(error);
}
);
export default {
// 文档相关API
uploadDocument(file) {
const formData = new FormData();
formData.append('file', file);
return apiClient.post('/documents/upload', formData, {
headers: {
'Content-Type': 'multipart/form-data'
}
});
},
// 问答相关API
askQuestion(question, conversationId = null) {
return apiClient.post('/chat/query', {
question,
conversation_id: conversationId
});
},
// 获取对话历史
getConversations() {
return apiClient.get('/chat/conversations');
}
};
4.3 流式响应处理
为了实现打字机效果的流式响应,我们使用EventSource:
javascript复制function streamAnswer(question, conversationId, onData, onError, onComplete) {
const eventSource = new EventSource(
`${import.meta.env.VITE_API_BASE_URL}/chat/stream?question=${encodeURIComponent(question)}&conversation_id=${conversationId || ''}`
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
onData(data);
};
eventSource.onerror = (error) => {
onError(error);
eventSource.close();
};
return {
close: () => eventSource.close()
};
}
5. 部署与优化
5.1 Docker部署方案
我们使用Docker Compose定义全套服务:
yaml复制version: '3.8'
services:
backend:
build: ./backend
ports:
- "8000:8000"
env_file:
- .env
depends_on:
- redis
- mysql
volumes:
- ./backend/app:/app/app
- ./data:/data
frontend:
build: ./frontend
ports:
- "8080:8080"
env_file:
- .env.frontend
depends_on:
- backend
celery:
build: ./backend
command: celery -A app.core.celery_app worker --loglevel=info
env_file:
- .env
depends_on:
- backend
- redis
volumes:
- ./backend/app:/app/app
- ./data:/data
redis:
image: redis:alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: ${DB_PASSWORD}
MYSQL_DATABASE: ${DB_NAME}
ports:
- "3306:3306"
volumes:
- mysql_data:/var/lib/mysql
volumes:
redis_data:
mysql_data:
5.2 性能优化技巧
-
向量检索优化:
- 对高频查询建立缓存
- 使用HNSW索引加速近似最近邻搜索
- 对大型知识库考虑分片策略
-
大模型调用优化:
- 实现请求批处理
- 设置合理的超时和重试机制
- 对答案实现本地缓存
-
前端性能优化:
- 对长列表实现虚拟滚动
- 使用Web Worker处理大量数据
- 实现资源的懒加载
6. 常见问题与解决方案
6.1 文档处理失败
问题现象:文档上传后长时间处于处理中状态,或最终失败
排查步骤:
- 检查Celery worker日志是否有错误
- 确认文件权限是否正确
- 验证依赖库版本是否兼容
解决方案:
bash复制# 查看Celery worker日志
docker-compose logs celery
# 检查依赖版本
pip freeze | grep -E 'pypdf2|docx|markdown'
6.2 检索结果不准确
问题现象:系统返回的答案与问题无关
可能原因:
- Embedding模型不适合当前领域
- 文本分块大小不合适
- 检索top_k参数设置过小
优化方法:
- 尝试不同的Embedding模型
- 调整分块大小和重叠参数
- 增加检索返回数量(top_k)
- 添加查询扩展技术
6.3 响应时间过长
问题现象:问答接口响应时间超过5秒
优化方向:
- 向量数据库使用GPU加速
- 实现检索结果缓存
- 对大模型响应启用流式传输
- 对Embedding模型进行量化
7. 扩展与演进
7.1 多模态支持
未来可以扩展支持图片和表格内容:
- 使用OCR技术提取图片中的文本
- 对表格内容进行结构化解析
- 开发专门的表格问答功能
7.2 多租户架构
支持多租户隔离:
- 每个租户独立的知识库
- 基于角色的访问控制
- 租户级别的使用统计
7.3 持续学习机制
实现系统的自我优化:
- 记录用户反馈改进答案质量
- 自动识别知识盲区提示更新
- 定期重新评估文档重要性
在实际部署这个系统时,我发现文档分块策略对最终效果影响最大。经过多次测试,对于技术文档,采用"按标题分块为主,固定大小为辅"的混合策略效果最好。同时,在系统上线后持续收集用户反馈,不断调整Prompt模板和检索参数,能使系统表现逐步提升。