在日常数据处理工作中,我们经常需要将数据库中的大量记录导出到Excel文件中进行分析或共享。手动逐条导出不仅效率低下,而且容易出错。这个Python脚本项目正是为了解决这个痛点而生——通过自动化方式实现数据库数据的批量导出,支持灵活的条件筛选和多表关联查询。
我最近在一个电商数据分析项目中就遇到了类似需求:需要每天从MySQL导出近万条订单数据给运营团队。最初用Navicat手动导出,不仅耗时20多分钟,还经常因网络中断导致前功尽弃。后来开发的这个Python脚本,现在只需3秒就能完成全部导出工作。
脚本采用分层设计架构:
python复制# 架构示例代码
class DataExporter:
def __init__(self, db_uri):
self.engine = create_engine(db_uri)
self.session = sessionmaker(bind=self.engine)()
def export_to_excel(self, query, output_path):
df = pd.read_sql(query, self.engine)
self._format_dataframe(df)
df.to_excel(output_path, index=False)
数据库连接:
Excel处理:
提示:当数据量超过50万行时,建议使用csv格式替代Excel,否则可能内存溢出
安装必要依赖:
bash复制pip install sqlalchemy openpyxl pandas
对于特定数据库还需要安装对应驱动:
bash复制# MySQL示例
pip install pymysql
python复制from sqlalchemy import create_engine
import pandas as pd
def export_table_to_excel(db_url, table_name, output_file):
"""
导出整表数据到Excel
:param db_url: 数据库连接字符串
:param table_name: 要导出的表名
:param output_file: 输出文件路径
"""
engine = create_engine(db_url)
with engine.connect() as conn:
df = pd.read_sql_table(table_name, conn)
df.to_excel(output_file, index=False)
# 使用示例
export_table_to_excel(
"mysql+pymysql://user:pass@localhost/dbname",
"orders",
"output.xlsx"
)
python复制def export_with_condition(db_url, sql_query, output_file):
engine = create_engine(db_url)
with engine.connect() as conn:
df = pd.read_sql(sql_query, conn)
df.to_excel(output_file, index=False)
# 使用示例
query = """
SELECT * FROM orders
WHERE order_date >= '2023-01-01'
AND status = 'completed'
"""
export_with_condition(db_url, query, "filtered_orders.xlsx")
python复制def export_joined_tables(db_url, output_file):
query = """
SELECT o.order_id, o.order_date, c.customer_name, p.product_name
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN products p ON o.product_id = p.id
"""
export_with_condition(db_url, query, output_file)
当处理超过10万行数据时:
python复制def export_large_data(db_url, table_name, output_file, chunk_size=50000):
engine = create_engine(db_url)
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
for chunk in pd.read_sql_table(
table_name,
engine,
chunksize=chunk_size
):
chunk.to_excel(
writer,
sheet_name=f"Data_{writer.sheets.__len__()}",
index=False
)
python复制dtype = {
'id': 'int32',
'price': 'float32',
'description': 'string'
}
pd.read_sql(..., dtype=dtype)
python复制query = "SELECT id, name FROM products"
问题现象:导出的Excel中中文显示为乱码
解决方案:
python复制"mysql+pymysql://user:pass@localhost/dbname?charset=utf8mb4"
python复制with pd.ExcelWriter(..., engine='openpyxl', options={'encoding': 'utf-8'})
问题现象:数据库中的DATETIME在Excel中变成数字
解决方案:
python复制df['date_column'] = pd.to_datetime(df['date_column']).dt.strftime('%Y-%m-%d')
问题现象:导出大量数据时速度很慢
优化方案:
python复制df.to_excel(..., engine='xlsxwriter', index=False, header=True)
python复制def export_with_pivot(db_url, output_file):
engine = create_engine(db_url)
df = pd.read_sql("SELECT * FROM sales", engine)
pivot = pd.pivot_table(
df,
values='amount',
index=['region'],
columns=['product'],
aggfunc='sum'
)
with pd.ExcelWriter(output_file) as writer:
df.to_excel(writer, sheet_name='Raw Data', index=False)
pivot.to_excel(writer, sheet_name='Pivot Table')
结合APScheduler实现定时任务:
python复制from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('cron', hour=8)
def daily_export():
export_table_to_excel(...)
scheduler.start()
SQL注入防护:
python复制# 错误做法
f"SELECT * FROM users WHERE name = '{user_input}'"
# 正确做法
"SELECT * FROM users WHERE name = %s", (user_input,)
敏感数据处理:
python复制SENSITIVE_COLUMNS = ['password', 'credit_card']
df = df.drop(columns=SENSITIVE_COLUMNS)
文件权限控制:
python复制# 设置文件权限为仅拥有者可读写
os.chmod(output_file, 0o600)
在实际项目中,我发现最常遇到的坑是数据类型自动推断错误。比如数据库中的BIT(1)类型被pandas误读为bool,而实际上存储的是0/1数字。解决方法是在read_sql时明确指定dtype参数:
python复制dtype_map = {
'is_active': 'int8',
'price': 'float64'
}
pd.read_sql(..., dtype=dtype_map)
另一个实用技巧是添加进度条显示。对于大数据量导出,使用tqdm可以显著提升用户体验:
python复制from tqdm import tqdm
# 分块处理时添加进度条
for chunk in tqdm(pd.read_sql(..., chunksize=10000)):
process_chunk(chunk)