这个案例展示了如何利用Dify平台构建一个完整的Text2SQL工作流,专门针对信贷风控策略分析场景。作为一名长期从事金融科技领域的技术专家,我将在本文详细拆解这个从自然语言到SQL再到Python分析的全流程解决方案。
在银行风控部门工作过的同行都深有体会,日常策略迭代中最大的时间消耗往往不是策略设计本身,而是数据查询和预处理环节。传统模式下,一个简单的数据分析需求可能需要经历:需求沟通→SQL编写→调试验证→结果分析等多个环节,耗时数小时甚至数天。而通过这个基于大语言模型的Text2SQL解决方案,我们成功将这一流程缩短到了分钟级别。
经过多年演进,Text2SQL技术已经形成了几个明确的技术路线:
这是最直接的方法,通过精心设计的提示词引导大模型生成SQL。我在实际项目中发现,以下几个技巧特别有效:
提示:对于简单的单表查询,这种方法可以达到90%+的准确率,但复杂查询仍需额外处理。
当业务场景固定且有一定量的标注数据时,微调专用模型是更好的选择。我们曾基于Llama2-13B微调过一个风控专用模型,关键点包括:
这也是本案例采用的核心方法,其优势在于:
在信贷风控场景下,我们最终选择RAG方案主要基于以下考虑:
| 考量维度 | Prompt工程 | 模型微调 | RAG |
|---|---|---|---|
| 开发成本 | 低 | 高 | 中 |
| 维护难度 | 中 | 高 | 低 |
| 准确率 | 60-70% | 85-95% | 75-90% |
| 业务适配 | 差 | 优 | 良 |
| 冷启动 | 易 | 难 | 中 |
在与多家银行风控团队交流后,我们梳理出以下几个核心痛点:
针对上述痛点,我们的系统设计了以下应对机制:
整个系统采用模块化设计,主要组件包括:
code复制自然语言输入 → 意图解析 → 知识检索 → SQL生成 → 安全验证 → 查询执行 → Python分析 → 结果呈现
每个环节都有相应的容错和修正机制,形成闭环处理流程。
我们构建了两个核心知识库:
sql复制-- 企业贷后监控模板
SELECT e.企业名称,
COUNT(i.发票编号) AS 有效发票数,
AVG(i.金额) AS 平均发票金额
FROM enterprise_info e
JOIN invoice_info i ON e.企业代码 = i.企业代码
WHERE i.状态 = '有效'
GROUP BY e.企业名称
code复制"近三个月交易额" → SUM(CASE WHEN 交易日期 BETWEEN NOW()-INTERVAL '3 months' AND NOW() THEN 金额 ELSE 0 END)
检索时采用混合策略,同时考虑文本相似度和业务场景匹配度。
SQL生成采用两阶段设计:
json复制{
"query_analysis": "计算指定企业的月度发票金额波动情况",
"required_tables": ["enterprise_info", "invoice_info"],
"required_fields": ["企业代码", "发票日期", "金额"],
"time_range": {
"field": "发票日期",
"start": "2023-01-01",
"end": "2023-12-31"
}
}
SQL查询结果会传递给Python分析模块,典型处理包括:
python复制def analyze_invoice_trend(data):
"""
分析企业发票趋势
输入: DataFrame包含[日期, 金额]列
输出: 包含趋势分析的字典
"""
# 按月聚合
monthly = data.resample('M', on='日期')['金额'].sum().reset_index()
# 计算环比
monthly['环比'] = monthly['金额'].pct_change() * 100
# 返回结构化结果
return {
'summary': f"平均月销售额:{monthly['金额'].mean():.2f}",
'trend': monthly.to_dict('records')
}
在从Coze迁移到Dify时,我们遇到了几个典型问题:
Coze将参数打包为单个对象,而Dify直接展开为命名参数。解决方案:
python复制# 原Coze代码
def main(args):
sql = args['llm_sql']
# 适配Dify的修改
def main(llm_sql: str, ...):
sql = llm_sql
Dify不支持原生布尔类型,我们采用字符串模拟:
python复制# 验证结果返回
return {
"is_valid": "true" if valid else "false",
"message": validation_msg
}
# 下游条件判断
if validation_result["is_valid"] == "true":
...
LLM输出常包含多余内容,我们采用标签提取法:
python复制import re
def extract_content(text):
"""
从LLM输出中提取标签内容
示例输入:
...<sql>SELECT * FROM table</sql>...
"""
pattern = r'<sql>(.*?)</sql>'
match = re.search(pattern, text, re.DOTALL)
return match.group(1) if match else text
我们设计了三级缓存机制:
实现示例:
python复制from sentence_transformers import SentenceTransformer
encoder = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
def get_similar_queries(query, threshold=0.9):
query_vec = encoder.encode(query)
# 与缓存查询比较余弦相似度
...
我们建立了覆盖四个维度的测试集:
| 测试类型 | 示例用例 | 验证重点 |
|---|---|---|
| 基础查询 | "查询企业E1的基本信息" | 单表查询准确性 |
| 复杂查询 | "计算各分行近6个月逾期率的移动平均" | 多表关联、窗口函数 |
| 边缘情况 | "查询注册资本为NULL的企业" | 空值处理 |
| 多轮对话 | "按季度统计...不对,我指的是按月" | 上下文保持 |
经过多个项目的实践,我总结出以下几点关键经验:
对于想要尝试类似项目的团队,建议从简单的单表查询开始,逐步扩展到复杂场景。我们最初版本只支持5种基本查询模式,经过3个月迭代才达到现在的覆盖范围。