1. 项目背景与需求解析
在日常数据处理工作中,我们经常需要将数据库中的大量记录导出到Excel文件进行二次处理或分享。手动逐条导出不仅效率低下,还容易出错。这个Python脚本就是为了解决这个痛点而设计的。
我最近接手了一个客户项目,需要每周从MySQL数据库导出近10万条销售记录到Excel报表。最初尝试用Navicat等工具手动导出,发现不仅耗时(每次约40分钟),还经常因为网络波动中断。于是决定用Python开发一个自动化解决方案,最终实现了3分钟内完成全量导出,且支持断点续传和错误重试。
2. 技术方案选型
2.1 核心组件对比
经过技术调研,主要考虑了以下几种方案:
| 方案 | 优点 | 缺点 |
|---|---|---|
| 原生pymysql+openpyxl | 无需额外依赖 | 需要手动处理数据类型转换 |
| SQLAlchemy+pandas | 支持ORM,代码简洁 | 内存占用较高 |
| Django ORM | 适合Django项目集成 | 不适合独立脚本 |
| 专业ETL工具 | 可视化操作 | 学习成本高 |
最终选择pymysql+pandas组合,因为:
- pandas的to_excel()性能优异(实测比openpyxl快3倍)
- 内置的chunksize参数可解决大数据量内存问题
- 自动处理NULL值和日期格式转换
2.2 关键技术点
python复制# 核心代码结构
import pymysql
import pandas as pd
from sqlalchemy import create_engine
def export_to_excel(conn_params, query, output_path, chunk_size=50000):
engine = create_engine(
f"mysql+pymysql://{conn_params['user']}:{conn_params['password']}@"
f"{conn_params['host']}:{conn_params['port']}/{conn_params['database']}"
)
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
for chunk in pd.read_sql(query, engine, chunksize=chunk_size):
chunk.to_excel(writer, sheet_name='Data', index=False,
header=not writer.sheets['Data'].max_row > 1)
3. 完整实现步骤
3.1 环境准备
先安装必要依赖:
bash复制pip install pymysql pandas openpyxl sqlalchemy
注意:openpyxl需要≥3.0.7版本才能正确处理xlsx格式
3.2 数据库连接配置
推荐使用配置文件管理连接参数:
python复制# config.ini
[database]
host = 127.0.0.1
port = 3306
user = db_user
password = secure_password
database = sales_db
3.3 分块导出实现
关键改进点:
- 使用生成器逐块读取数据
- 动态判断表头写入
- 进度实时显示
python复制def export_large_query(query, output_file):
total_rows = 0
start_time = time.time()
with get_connection() as conn, pd.ExcelWriter(output_file) as writer:
for i, chunk in enumerate(pd.read_sql(query, conn, chunksize=50000)):
chunk.to_excel(writer, sheet_name='Data',
header=(i == 0), index=False)
total_rows += len(chunk)
print(f"\rExported {total_rows:,} rows", end='')
print(f"\nDone in {time.time()-start_time:.2f}s")
4. 高级功能扩展
4.1 多Sheet导出
对于关联数据,可以自动拆分到不同Sheet:
python复制tables = {
'Orders': "SELECT * FROM orders WHERE date > '2023-01-01'",
'Customers': "SELECT id,name FROM customers WHERE id IN (...)",
}
with pd.ExcelWriter('report.xlsx') as writer:
for sheet_name, query in tables.items():
pd.read_sql(query, conn).to_excel(
writer, sheet_name=sheet_name, index=False)
4.2 定时任务集成
使用APScheduler实现自动导出:
python复制from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('cron', day_of_week='mon', hour=2)
def weekly_export():
export_to_excel(...)
scheduler.start()
5. 性能优化技巧
-
内存控制:
- 分块大小建议设为50,000-100,000行
- 使用
iterator=True参数减少游标内存占用
-
加速写入:
python复制# 禁用自动调整列宽 pd.ExcelWriter(..., engine_kwargs={'constant_memory': True}) -
格式处理:
python复制# 保留日期格式 writer = pd.ExcelWriter('output.xlsx', datetime_format='YYYY-MM-DD')
6. 常见问题解决
6.1 中文乱码问题
解决方案:
python复制# 连接字符串添加charset参数
create_engine("mysql+pymysql://...?charset=utf8mb4")
# Excel写入时指定编码
df.to_excel(..., encoding='utf-8-sig')
6.2 超时中断处理
实现断点续传:
python复制try:
export_data()
except Exception as e:
last_id = get_last_success_id()
query = f"SELECT * FROM table WHERE id > {last_id}"
retry_export(query)
6.3 大文件拆分
当单个Excel超过100万行时:
python复制max_rows = 1000000
for i in range(0, total_rows, max_rows):
chunk_query = f"{base_query} LIMIT {i}, {max_rows}"
export_chunk(chunk_query, f"output_{i//max_rows}.xlsx")
7. 安全注意事项
-
SQL注入防护:
python复制# 错误做法 query = f"SELECT * FROM users WHERE name = '{user_input}'" # 正确做法 params = {'name': user_input} pd.read_sql("SELECT * FROM users WHERE name = %(name)s", conn, params=params) -
敏感数据处理:
python复制# 自动脱敏 df['phone'] = df['phone'].str[:-4] + '****' -
文件权限控制:
python复制import os os.chmod('output.xlsx', 0o600) # 仅所有者可读写
8. 实际案例演示
假设需要导出电商订单数据,包含:
- 订单基本信息
- 商品明细
- 用户信息
python复制def export_ecommerce_data():
order_query = """SELECT o.order_id, o.create_time, u.username,
SUM(oi.price*oi.quantity) as total
FROM orders o
JOIN users u ON o.user_id = u.id
JOIN order_items oi ON o.id = oi.order_id
GROUP BY o.id"""
with pd.ExcelWriter('ecommerce_report.xlsx') as writer:
# 主订单表
pd.read_sql(order_query, conn).to_excel(
writer, sheet_name='Orders', index=False)
# 商品明细(按需导出)
for order_id in get_large_orders():
items = pd.read_sql(
f"SELECT * FROM order_items WHERE order_id = {order_id}", conn)
items.to_excel(writer, sheet_name=f"Order_{order_id}", index=False)
9. 替代方案对比
当数据量特别大时(>500万行),可以考虑:
-
CSV格式:
python复制df.to_csv('output.csv.zip', index=False, compression='zip') -
Parquet格式:
python复制df.to_parquet('output.parquet', engine='pyarrow') -
数据库直连:
python复制# 使用PowerQuery直接连接MySQL # 避免中间文件传输
10. 监控与日志
完善的日志记录方案:
python复制import logging
from pathlib import Path
logging.basicConfig(
filename='export.log',
format='%(asctime)s - %(levelname)s - %(message)s',
level=logging.INFO
)
def log_export_stats(output_file):
file_size = Path(output_file).stat().st_size / (1024*1024)
logging.info(f"Generated {output_file}, Size: {file_size:.2f}MB")
我在实际项目中总结的几个关键经验:
- 对于超100MB的Excel文件,建议先和接收方确认能否打开
- 定期维护数据库索引可以显著提升导出速度
- 添加
--dry-run参数方便测试查询性能 - 使用
tqdm库可以显示美观的进度条