最近在做一个数据迁移项目时,遇到了需要将多个数据库表中的数据批量导出到Excel文件的需求。这种场景在实际开发中非常常见——比如数据备份、报表生成、数据交接等场景。传统的手工导出方式不仅效率低下,而且容易出错。
我尝试了几种不同的Python方案后,总结出一套稳定高效的批量导出方法。这套方案可以处理各种常见数据库(MySQL、PostgreSQL、SQLite等),支持自定义查询条件,还能自动处理Excel格式和编码问题。
经过对比测试,我最终选择了以下工具组合:
选择这套组合主要基于以下考虑:
安装所需库:
bash复制pip install sqlalchemy pandas openpyxl
对于特定数据库还需要安装对应的驱动:
bash复制# MySQL
pip install mysql-connector-python
# PostgreSQL
pip install psycopg2
# SQLite(Python内置,无需额外安装)
首先创建一个通用的数据库连接管理器:
python复制from sqlalchemy import create_engine
def get_db_engine(db_type, host, port, db_name, user, password):
"""
获取数据库引擎
:param db_type: 数据库类型(mysql/postgresql/sqlite)
:param host: 数据库地址
:param port: 端口
:param db_name: 数据库名
:param user: 用户名
:param password: 密码
:return: SQLAlchemy引擎实例
"""
db_url = {
'mysql': f'mysql+mysqlconnector://{user}:{password}@{host}:{port}/{db_name}',
'postgresql': f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{db_name}',
'sqlite': f'sqlite:///{db_name}.db'
}.get(db_type.lower())
return create_engine(db_url, pool_recycle=3600)
python复制import pandas as pd
from sqlalchemy import text
def export_to_excel(engine, query, output_path, sheet_name='Sheet1'):
"""
执行SQL查询并将结果导出到Excel
:param engine: 数据库引擎
:param query: SQL查询语句
:param output_path: 输出文件路径
:param sheet_name: Excel工作表名
"""
with engine.connect() as conn:
# 使用pandas直接读取SQL查询结果
df = pd.read_sql(text(query), conn)
# 导出到Excel
df.to_excel(
output_path,
sheet_name=sheet_name,
index=False, # 不输出索引列
encoding='utf-8-sig' # 解决中文乱码
)
实际项目中,我们通常需要导出多个表的数据:
python复制def batch_export_tables(engine, table_queries, output_dir):
"""
批量导出多个表数据到单独Excel文件
:param engine: 数据库引擎
:param table_queries: 字典{表名: SQL查询}
:param output_dir: 输出目录
"""
for table_name, query in table_queries.items():
output_path = f"{output_dir}/{table_name}.xlsx"
export_to_excel(engine, query, output_path, table_name)
当数据量很大时,直接导出可能导致内存不足。这时可以采用分页查询:
python复制def export_large_table(engine, table_name, output_path, batch_size=50000):
"""
分页导出大表数据
:param engine: 数据库引擎
:param table_name: 表名
:param output_path: 输出路径
:param batch_size: 每批数据量
"""
total_query = f"SELECT COUNT(*) FROM {table_name}"
with engine.connect() as conn:
total = conn.execute(text(total_query)).scalar()
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
for offset in range(0, total, batch_size):
query = f"SELECT * FROM {table_name} LIMIT {batch_size} OFFSET {offset}"
df = pd.read_sql(text(query), engine)
# 如果是第一页,写入表头
if offset == 0:
df.to_excel(writer, sheet_name=table_name, index=False)
else:
# 追加数据,不写表头
df.to_excel(writer, sheet_name=table_name,
startrow=offset+1, header=False, index=False)
通过openpyxl可以自定义Excel样式:
python复制from openpyxl.styles import Font, Alignment
from openpyxl.utils.dataframe import dataframe_to_rows
def export_with_style(engine, query, output_path):
"""
带样式的Excel导出
"""
df = pd.read_sql(text(query), engine)
from openpyxl import Workbook
wb = Workbook()
ws = wb.active
# 设置标题行样式
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill(start_color="4F81BD", end_color="4F81BD", fill_type="solid")
# 写入数据
for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), 1):
for c_idx, value in enumerate(row, 1):
cell = ws.cell(row=r_idx, column=c_idx, value=value)
if r_idx == 1: # 标题行
cell.font = header_font
cell.fill = header_fill
# 设置列宽自适应
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2) * 1.2
ws.column_dimensions[column_letter].width = adjusted_width
wb.save(output_path)
中文乱码问题:
encoding='utf-8-sig'参数日期格式问题:
python复制# 在导出前统一处理日期格式
df['date_column'] = pd.to_datetime(df['date_column']).dt.strftime('%Y-%m-%d')
大数据导出超时:
SQL注入防护:
文件权限控制:
下面是一个完整的批量导出脚本示例:
python复制import pandas as pd
from sqlalchemy import create_engine, text
import os
class DatabaseExporter:
def __init__(self, db_config):
self.engine = self._create_engine(db_config)
def _create_engine(self, config):
"""创建数据库引擎"""
db_url = f"{config['type']}://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['name']}"
return create_engine(db_url, pool_recycle=3600)
def export_tables(self, tables, output_dir):
"""批量导出表数据"""
os.makedirs(output_dir, exist_ok=True)
for table in tables:
output_path = os.path.join(output_dir, f"{table}.xlsx")
self.export_table(table, output_path)
print(f"已导出: {output_path}")
def export_table(self, table_name, output_path, query=None):
"""导出单个表数据"""
query = query or f"SELECT * FROM {table_name}"
df = pd.read_sql(text(query), self.engine)
df.to_excel(output_path, index=False, encoding='utf-8-sig')
# 使用示例
if __name__ == "__main__":
db_config = {
'type': 'mysql',
'host': 'localhost',
'port': '3306',
'name': 'your_database',
'user': 'your_username',
'password': 'your_password'
}
exporter = DatabaseExporter(db_config)
tables_to_export = ['users', 'products', 'orders']
exporter.export_tables(tables_to_export, './exports')
结合APScheduler可以实现定时自动导出:
python复制from apscheduler.schedulers.blocking import BlockingScheduler
def scheduled_export():
exporter = DatabaseExporter(db_config)
exporter.export_tables(tables_to_export, './exports')
scheduler = BlockingScheduler()
scheduler.add_job(scheduled_export, 'cron', hour=2) # 每天凌晨2点执行
scheduler.start()
在导出前进行数据清洗:
python复制def clean_data(df):
"""数据清洗示例"""
# 处理空值
df.fillna('N/A', inplace=True)
# 格式化金额
if 'price' in df.columns:
df['price'] = df['price'].apply(lambda x: f"¥{x:.2f}")
return df
# 在导出函数中调用
df = pd.read_sql(text(query), engine)
df = clean_data(df)
df.to_excel(output_path, index=False)
将多个查询结果导出到同一个Excel的不同Sheet:
python复制def export_to_multisheet(engine, queries, output_path):
"""多Sheet导出"""
with pd.ExcelWriter(output_path) as writer:
for sheet_name, query in queries.items():
df = pd.read_sql(text(query), engine)
df.to_excel(writer, sheet_name=sheet_name, index=False)
这套方案在实际项目中已经稳定运行了半年多,每天处理数十万条数据的导出需求。最大的优势在于其灵活性和可扩展性,可以根据具体需求轻松调整。比如最近我们就扩展了支持导出到Google Sheets的功能,原理类似,只是更换了输出适配器。