在日常数据处理工作中,我们经常需要将数据库中的大量记录导出到Excel文件中进行分析或共享。手动操作不仅效率低下,还容易出错。Python作为数据处理领域的利器,配合适当的库完全可以实现自动化批量导出功能。
这个项目要解决的核心痛点是:
Python连接数据库主要有以下几种方式:
对于本项目,我推荐使用SQLAlchemy作为基础连接方案,原因如下:
Python处理Excel的主流库包括:
综合考虑后,我选择pandas作为核心处理库,因为:
首先安装必要的依赖库:
bash复制pip install sqlalchemy pandas openpyxl
对于特定数据库还需要安装对应的驱动:
pip install pymysqlpip install psycopg2-binarypip install cx_oracle创建一个通用的数据库连接函数:
python复制from sqlalchemy import create_engine
def get_db_connection(db_type, host, port, db_name, user, password):
"""
获取数据库连接引擎
:param db_type: 数据库类型(mysql/postgresql/oracle等)
:param host: 数据库地址
:param port: 端口号
:param db_name: 数据库名
:param user: 用户名
:param password: 密码
:return: SQLAlchemy引擎对象
"""
db_url = f"{db_type}://{user}:{password}@{host}:{port}/{db_name}"
return create_engine(db_url, pool_pre_ping=True)
核心导出函数实现:
python复制import pandas as pd
from datetime import datetime
def export_to_excel(engine, sql_query, output_file, sheet_name='Sheet1'):
"""
执行SQL查询并将结果导出到Excel
:param engine: 数据库引擎
:param sql_query: 要执行的SQL语句
:param output_file: 输出文件路径
:param sheet_name: Excel工作表名
"""
start_time = datetime.now()
try:
# 执行查询并读取到DataFrame
df = pd.read_sql(sql_query, engine)
# 导出到Excel
writer = pd.ExcelWriter(output_file, engine='openpyxl')
df.to_excel(writer, sheet_name=sheet_name, index=False)
# 自动调整列宽
worksheet = writer.sheets[sheet_name]
for column in worksheet.columns:
max_length = max(len(str(cell.value)) for cell in column)
worksheet.column_dimensions[column[0].column_letter].width = max_length + 2
writer.save()
elapsed = datetime.now() - start_time
print(f"成功导出 {len(df)} 条记录到 {output_file}, 耗时: {elapsed}")
except Exception as e:
print(f"导出失败: {str(e)}")
对于需要导出多个表或查询的情况:
python复制def batch_export(engine, queries, output_dir):
"""
批量执行多个查询并导出到不同文件
:param engine: 数据库引擎
:param queries: 字典{文件名: SQL查询}
:param output_dir: 输出目录
"""
for filename, query in queries.items():
output_path = f"{output_dir}/{filename}.xlsx"
export_to_excel(engine, query, output_path)
在导出前对数据进行清洗和转换:
python复制def preprocess_data(df):
"""
数据预处理函数
:param df: 原始DataFrame
:return: 处理后的DataFrame
"""
# 处理空值
df.fillna('N/A', inplace=True)
# 日期格式标准化
date_columns = ['create_time', 'update_time']
for col in date_columns:
if col in df.columns:
df[col] = pd.to_datetime(df[col]).dt.strftime('%Y-%m-%d %H:%M:%S')
# 敏感数据脱敏
if 'phone' in df.columns:
df['phone'] = df['phone'].str[:-4] + '****'
return df
使用APScheduler实现定时任务:
python复制from apscheduler.schedulers.blocking import BlockingScheduler
def setup_scheduler(engine, config):
"""
设置定时导出任务
:param engine: 数据库引擎
:param config: 定时配置{时间: (查询, 输出路径)}
"""
scheduler = BlockingScheduler()
for time_str, (query, output) in config.items():
scheduler.add_job(
export_to_excel,
'cron',
hour=time_str.split(':')[0],
minute=time_str.split(':')[1],
args=[engine, query, output]
)
scheduler.start()
python复制chunk_size = 10000
for chunk in pd.read_sql(query, engine, chunksize=chunk_size):
process(chunk)
python复制import gc
del large_df
gc.collect()
对于多个不相关的查询,可以使用线程池并行执行:
python复制from concurrent.futures import ThreadPoolExecutor
def parallel_export(engine, queries, output_dir, max_workers=4):
"""
并行导出多个查询
"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for filename, query in queries.items():
output_path = f"{output_dir}/{filename}.xlsx"
futures.append(executor.submit(
export_to_excel, engine, query, output_path
))
for future in futures:
future.result() # 等待所有任务完成
问题:数据库连接超时或失败
解决方案:
python复制engine = create_engine(
db_url,
pool_pre_ping=True,
pool_recycle=3600,
connect_args={'connect_timeout': 10}
)
问题:处理大数据时内存溢出
解决方案:
pd.read_sql(query, engine, columns=['col1','col2'])python复制dtypes = {
'id': 'int32',
'price': 'float32',
'description': 'string'
}
df = pd.read_sql(query, engine, dtype=dtypes)
问题:Excel中日期显示为数字
解决方案:
python复制df['date_column'] = pd.to_datetime(df['date_column']).dt.strftime('%Y-%m-%d')
python复制writer = pd.ExcelWriter(output_file, engine='xlsxwriter')
df.to_excel(writer, sheet_name='Sheet1')
workbook = writer.book
worksheet = writer.sheets['Sheet1']
date_format = workbook.add_format({'num_format': 'yyyy-mm-dd'})
worksheet.set_column('D:D', None, date_format) # D列是日期列
以下是一个完整的脚本示例,包含配置文件和命令行接口:
python复制#!/usr/bin/env python3
"""
数据库批量导出工具
"""
import argparse
import configparser
from datetime import datetime
import pandas as pd
from sqlalchemy import create_engine
class DatabaseExporter:
def __init__(self, config_file):
self.config = configparser.ConfigParser()
self.config.read(config_file)
self.engine = self._create_engine()
def _create_engine(self):
"""创建数据库引擎"""
db_config = self.config['database']
db_url = (
f"{db_config['type']}://{db_config['user']}:{db_config['password']}"
f"@{db_config['host']}:{db_config['port']}/{db_config['name']}"
)
return create_engine(db_url, pool_pre_ping=True)
def export_query(self, query_name, output_file=None):
"""导出单个查询"""
if not output_file:
output_file = f"{query_name}.xlsx"
query = self.config['queries'][query_name]
df = pd.read_sql(query, self.engine)
# 数据预处理
df = self._preprocess_data(df)
# 导出Excel
writer = pd.ExcelWriter(output_file, engine='openpyxl')
df.to_excel(writer, index=False, sheet_name='Data')
# 调整列宽
worksheet = writer.sheets['Data']
for column in worksheet.columns:
max_length = max(len(str(cell.value)) for cell in column)
worksheet.column_dimensions[column[0].column_letter].width = min(max_length + 2, 50)
writer.save()
print(f"成功导出到 {output_file}")
def _preprocess_data(self, df):
"""数据预处理"""
# 填充空值
df.fillna('', inplace=True)
# 日期格式化
for col in df.select_dtypes(include=['datetime64']).columns:
df[col] = df[col].dt.strftime('%Y-%m-%d %H:%M:%S')
return df
def batch_export(self):
"""批量导出所有查询"""
output_dir = self.config.get('output', 'dir', fallback='.')
for query_name in self.config['queries']:
output_file = f"{output_dir}/{query_name}.xlsx"
self.export_query(query_name, output_file)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='数据库批量导出工具')
parser.add_argument('-c', '--config', required=True, help='配置文件路径')
parser.add_argument('-q', '--query', help='指定查询名称(不指定则导出所有)')
parser.add_argument('-o', '--output', help='输出文件路径(单个查询时使用)')
args = parser.parse_args()
exporter = DatabaseExporter(args.config)
if args.query:
exporter.export_query(args.query, args.output)
else:
exporter.batch_export()
配套的配置文件示例(config.ini):
ini复制[database]
type = mysql
host = localhost
port = 3306
name = my_database
user = root
password = password
[queries]
users = SELECT id, username, email, created_at FROM users WHERE status=1
orders = SELECT o.id, u.username, o.amount, o.create_time
FROM orders o JOIN users u ON o.user_id=u.id
WHERE o.create_time > '2023-01-01'
[output]
dir = ./exports
python复制import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('export.log'),
logging.StreamHandler()
]
)
python复制try:
df = pd.read_sql(query, engine)
except Exception as e:
logging.error(f"查询执行失败: {str(e)}")
raise
python复制from tqdm import tqdm
# 分块处理时显示进度
for chunk in tqdm(pd.read_sql(query, engine, chunksize=10000)):
process(chunk)
python复制from cryptography.fernet import Fernet
# 加密
key = Fernet.generate_key()
cipher_suite = Fernet(key)
encrypted_pwd = cipher_suite.encrypt(b"my_password")
# 解密
decrypted_pwd = cipher_suite.decrypt(encrypted_pwd).decode()
python复制import smtplib
from email.mime.text import MIMEText
def send_email(subject, body, to):
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = 'export@example.com'
msg['To'] = to
with smtplib.SMTP('smtp.example.com') as server:
server.send_message(msg)