最近在数据迁移项目中遇到一个典型需求:将MySQL数据库中的多张表数据批量导出为Excel文件。这种需求在数据交接、报表生成和数据分析场景中非常常见。传统做法需要手动编写大量重复代码,而通过Python自动化处理可以显著提升效率。
我最终实现的方案核心功能包括:
实测在8核服务器上,该脚本将原本需要人工操作3小时的工作压缩到8分钟完成,且保证数据零差错。下面分享具体实现方案和关键代码。
首先需要安装必要的Python库:
bash复制pip install pandas openpyxl sqlalchemy pymysql
各库的作用说明:
pandas:数据处理核心库,提供DataFrame结构和Excel读写接口openpyxl:Excel文件操作引擎(比xlwt支持更多新特性)sqlalchemy:数据库ORM工具,统一不同数据库的操作接口pymysql:MySQL的Python驱动注意:如果导出超大型数据(超过50万行),建议改用
xlsxwriter引擎,它对内存优化更好。
创建通用的数据库连接工厂函数:
python复制from sqlalchemy import create_engine
def create_db_engine(db_type='mysql', **kwargs):
"""创建数据库引擎工厂函数"""
conn_map = {
'mysql': 'mysql+pymysql://{user}:{password}@{host}:{port}/{database}',
'postgresql': 'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}',
'sqlite': 'sqlite:///{database}'
}
conn_str = conn_map[db_type].format(**kwargs)
return create_engine(conn_str, pool_recycle=3600)
典型调用示例:
python复制engine = create_db_engine(
db_type='mysql',
host='localhost',
port=3306,
user='root',
password='securepassword',
database='mydb'
)
基础导出函数需要处理以下关键点:
python复制import pandas as pd
from datetime import datetime
def export_table_to_excel(engine, table_name, output_path, chunk_size=50000):
"""导出单表数据到Excel"""
start_time = datetime.now()
print(f"[{start_time}] 开始导出表 {table_name}")
# 创建Excel写入器
writer = pd.ExcelWriter(
output_path,
engine='openpyxl',
datetime_format='YYYY-MM-DD HH:MM:SS'
)
try:
# 分块读取数据库
for i, chunk in enumerate(
pd.read_sql_table(table_name, engine, chunksize=chunk_size)
):
# 处理特殊数据类型
for col in chunk.select_dtypes(include=['datetime']):
chunk[col] = chunk[col].dt.tz_localize(None)
# 写入Excel
chunk.to_excel(
writer,
sheet_name=table_name,
index=False,
header=(i == 0), # 只有第一块写列头
startrow=i * chunk_size if i > 0 else 0
)
writer.close()
elapsed = (datetime.now() - start_time).total_seconds()
print(f"表 {table_name} 导出完成,耗时 {elapsed:.2f}秒")
return True
except Exception as e:
print(f"导出表 {table_name} 失败: {str(e)}")
writer.close()
return False
实现多表并行导出的调度逻辑:
python复制from concurrent.futures import ThreadPoolExecutor
import os
def batch_export_tables(engine, tables, output_dir, max_workers=4):
"""批量导出多表数据"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
success_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for table in tables:
output_path = os.path.join(output_dir, f"{table}.xlsx")
future = executor.submit(
export_table_to_excel,
engine, table, output_path
)
futures.append((table, future))
for table, future in futures:
try:
if future.result():
success_count += 1
except Exception as e:
print(f"表 {table} 导出异常: {str(e)}")
print(f"导出完成: 成功 {success_count}/{len(tables)}")
return success_count
对于需要导出整个数据库的场景,可以动态获取表名列表:
python复制def get_all_tables(engine):
"""获取数据库所有表名"""
from sqlalchemy import inspect
inspector = inspect(engine)
return inspector.get_table_names()
使用tqdm添加进度条增强用户体验:
python复制from tqdm import tqdm
def export_with_progress(engine, tables, output_dir):
"""带进度条的导出函数"""
with tqdm(total=len(tables), desc="导出进度") as pbar:
def update_pbar(future):
pbar.update(1)
try:
result = future.result()
pbar.set_postfix_str(f"成功:{result}")
except Exception:
pbar.set_postfix_str("失败")
with ThreadPoolExecutor() as executor:
futures = []
for table in tables:
path = os.path.join(output_dir, f"{table}.xlsx")
future = executor.submit(export_table_to_excel, engine, table, path)
future.add_done_callback(update_pbar)
futures.append(future)
# 等待所有任务完成
for future in futures:
future.result()
内存控制:
chunk_size设为10000-50000xlsxwriter引擎替代openpyxl可降低30%内存占用并行度设置:
IO优化:
问题1:导出时间过长
chunk_size减少IO次数问题2:Excel打开报错
/ \ [ ] : * ?)出现在表名中问题3:中文乱码
?charset=utf8mb4参数python复制import schedule
import time
def daily_backup():
engine = create_db_engine(...)
tables = get_all_tables(engine)
batch_export_tables(engine, tables, '/backups')
schedule.every().day.at("02:00").do(daily_backup)
while True:
schedule.run_pending()
time.sleep(60)
python复制def validate_export(orig_table, excel_path):
"""验证导出数据一致性"""
df_db = pd.read_sql_table(orig_table, engine)
df_excel = pd.read_excel(excel_path)
return df_db.equals(df_excel)
python复制import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders
def send_export_email(recipient, attachment_path):
msg = MIMEMultipart()
msg['Subject'] = '数据库导出报告'
msg['From'] = 'export@company.com'
msg['To'] = recipient
part = MIMEBase('application', 'octet-stream')
with open(attachment_path, 'rb') as f:
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename="{os.path.basename(attachment_path)}"')
msg.attach(part)
with smtplib.SMTP('smtp.company.com') as server:
server.send_message(msg)
在实际项目中,这个脚本经过多次迭代已经发展成一个完善的数据库导出工具集。最关键的收获是:一定要在初期就设计好异常处理机制,因为数据导出往往是在非工作时间执行,一旦出错需要能够自我修复或至少提供详细的错误报告。