在数据库操作自动化领域,安全始终是第一要务。今天我要分享的是如何在LangChain的SQL Agent中实现Human-in-the-loop(HITL)机制,这种设计模式能够有效防止AI生成的SQL语句对生产环境造成意外影响。
数据库操作不同于一般的文本处理,一条不当的SQL可能会:
我曾在一个电商项目中亲历过这样的场景:一个未经审核的自动化查询语句在促销期间执行了全表扫描,直接导致数据库CPU飙升至100%,整个网站瘫痪了近20分钟。正是这样的教训让我深刻认识到HITL机制的必要性。
LangChain通过中间件(Middleware)机制实现了灵活的HITL控制。核心组件包括:
这种架构的优势在于:
python复制middleware = [
HumanInTheLoopMiddleware(
interrupt_on={"sql_db_query": True}, # 精准拦截SQL执行
description_prefix="[安全审计] 准备执行SQL,请审核"
)
]
这段配置有几个关键点值得注意:
interrupt_on参数使用字典形式,可以同时配置多个工具的拦截规则sql_db_query而不拦截schema查询,体现了最小权限原则description_prefix会显示在审批提示中,应该用清晰的语言说明当前操作提示:在生产环境中,建议将description_prefix配置为包含SQL摘要和安全等级的信息,方便审核人员快速判断。
python复制agent = create_agent(
kimi_model,
tools,
system_prompt=system_prompt_zh,
middleware=middleware,
checkpointer=InMemorySaver(), # 必须项!
)
这里容易踩的坑是忘记配置checkpointer。没有状态存储的Agent被中断后:
python复制config = {"configurable": {"thread_id": "1"}} # 相当于会话ID
for step in agent.stream(inputs, config, stream_mode="values"):
if "__interrupt__" in step:
# 处理中断逻辑
interrupt = step["__interrupt__"][0]
for request in interrupt.value["action_requests"]:
print(request["description"]) # 这里显示待审核的SQL
这个阶段Agent会:
python复制from langgraph.types import Command
for step in agent.stream(
Command(resume={"decisions": [{"type": "approve"}]}),
config,
stream_mode="values",
):
# 处理正常输出
if "messages" in step:
step["messages"][-1].pretty_print()
审批时有三种可能的决策:
approve:批准执行reject:拒绝并终止modify:提供修改建议(需要额外实现)在大流量场景下,InMemorySaver可能成为瓶颈。我们可以在中间件层添加:
python复制# 使用Redis作为检查点存储的示例
from langgraph.checkpoint.redis import RedisSaver
checkpointer = RedisSaver(
redis_url="redis://localhost:6379/0",
ttl=3600 # 1小时过期
)
基础的HITL还不够,我们还需要:
python复制# 危险操作检测示例
def check_sql_safety(sql):
dangerous_keywords = ["drop", "delete", "truncate", "alter"]
return any(keyword in sql.lower() for keyword in dangerous_keywords)
问题1:拦截后无法恢复
问题2:多次意外拦截
问题3:审批延迟导致超时
对于关键业务数据库,可以扩展为:
python复制class MultiLevelApprovalMiddleware:
def __init__(self):
self.levels = [
{"role": "dba", "tools": ["sql_db_query"]},
{"role": "manager", "tools": ["sql_db_write"]}
]
def intercept(self, tool_name):
# 实现多级审批逻辑
...
将HITL融入现有运维体系:
python复制# 工单系统集成示例
def create_ticket(sql_description):
ticket_system = JiraService()
return ticket_system.create(
summary="SQL审批请求",
description=sql_description,
priority="High"
)
建立完整的可观测性体系:
prometheus复制# Prometheus监控指标示例
sql_agent_interrupts_total{tool="sql_db_query"} 42
sql_agent_approval_duration_seconds_bucket{le="10"} 35
经过多个项目的实践验证,这种HITL机制能够在不显著降低效率的前提下,将数据库操作事故率降低90%以上。关键在于找到自动化与人工控制的平衡点 - 对于简单的查询可以设置白名单自动放行,而对于复杂的写操作则必须严格审核。