# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/8/6 16:53 import pandas as pd from sqlalchemy import create_engine from loguru import logger import os import json from typing import TextIO # 配置 loguru logger.add("data_migration.log", rotation="500 MB", level="INFO") # 断点续传文件路径 CHECKPOINT_FILE = "migration_checkpoint.json" def save_checkpoint(last_id: int) -> None: """保存断点信息""" checkpoint_data = { "last_processed_id": last_id, "table_name": "whatnot_shop_record" } with open(CHECKPOINT_FILE, 'w', encoding='utf-8') as f: # type: TextIO json.dump(checkpoint_data, f) logger.info(f"保存断点信息: last_processed_id={last_id}") def load_checkpoint() -> int: """加载断点信息""" if os.path.exists(CHECKPOINT_FILE): with open(CHECKPOINT_FILE, 'r', encoding='utf-8') as f: # type: TextIO checkpoint_data = json.load(f) logger.info(f"加载断点信息: last_processed_id={checkpoint_data['last_processed_id']}") return checkpoint_data['last_processed_id'] return 0 def clear_checkpoint() -> None: """清除断点信息""" if os.path.exists(CHECKPOINT_FILE): os.remove(CHECKPOINT_FILE) logger.info("清除断点信息") def migrate_with_resume() -> None: current_id: int = 0 # 初始化变量,解决未定义问题 try: # 创建数据库连接 source_engine = create_engine('mysql+pymysql://crawler:Pass2022@100.64.0.23/crawler') target_engine = create_engine('mysql+pymysql://crawler:Pass2022@100.64.0.21/crawler') # 加载断点信息 start_id = load_checkpoint() # 设置批次大小 batch_size = 10000 current_id = start_id total_migrated = 0 logger.info(f"开始从ID {start_id} 处继续迁移数据...") while True: # 分批读取数据,使用ID作为游标而不是OFFSET,提高性能 query = """ SELECT * FROM whatnot_shop_record WHERE id > %s ORDER BY id LIMIT %s """ logger.info(f"正在读取ID大于 {current_id} 的数据,批次大小: {batch_size}") df = pd.read_sql(query, source_engine, params=(current_id, batch_size)) # 如果没有更多数据,退出循环 if df.empty: logger.info("已读取完所有数据") break logger.info(f"读取到 {len(df)} 条记录") # 记录当前批次的最大ID,用于下次查询的起点 max_id_in_batch = int(df['id'].max()) # 删除id列 if 'id' in df.columns: df = df.drop('id', axis=1) logger.info("已删除id列") # 写入目标数据库 logger.info("开始写入目标数据库...") df.to_sql('whatnot_shop_record', target_engine, if_exists='append', index=False) # 更新计数器 total_migrated += len(df) logger.success(f"成功迁移 {len(df)} 条记录") # 更新当前ID为本批次最大ID current_id = max_id_in_batch # 保存断点信息 save_checkpoint(current_id) logger.success(f"数据迁移完成,总共迁移 {total_migrated} 条记录") # 清除断点文件 clear_checkpoint() except Exception as e: logger.error(f"数据迁移过程中发生错误: {e}") logger.info(f"迁移中断,下次运行将从ID {current_id} 继续") raise # 执行迁移 if __name__ == "__main__": migrate_with_resume()