| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/8/6 16:53
- import pandas as pd
- from sqlalchemy import create_engine
- from loguru import logger
- import os
- import json
- from typing import TextIO
- # 配置 loguru
- logger.add("data_migration.log", rotation="500 MB", level="INFO")
- # 断点续传文件路径
- CHECKPOINT_FILE = "migration_checkpoint.json"
- def save_checkpoint(last_id: int) -> None:
- """保存断点信息"""
- checkpoint_data = {
- "last_processed_id": last_id,
- "table_name": "whatnot_shop_record"
- }
- with open(CHECKPOINT_FILE, 'w', encoding='utf-8') as f: # type: TextIO
- json.dump(checkpoint_data, f)
- logger.info(f"保存断点信息: last_processed_id={last_id}")
- def load_checkpoint() -> int:
- """加载断点信息"""
- if os.path.exists(CHECKPOINT_FILE):
- with open(CHECKPOINT_FILE, 'r', encoding='utf-8') as f: # type: TextIO
- checkpoint_data = json.load(f)
- logger.info(f"加载断点信息: last_processed_id={checkpoint_data['last_processed_id']}")
- return checkpoint_data['last_processed_id']
- return 0
- def clear_checkpoint() -> None:
- """清除断点信息"""
- if os.path.exists(CHECKPOINT_FILE):
- os.remove(CHECKPOINT_FILE)
- logger.info("清除断点信息")
- def migrate_with_resume() -> None:
- current_id: int = 0 # 初始化变量,解决未定义问题
- try:
- # 创建数据库连接
- source_engine = create_engine('mysql+pymysql://crawler:Pass2022@100.64.0.23/crawler')
- target_engine = create_engine('mysql+pymysql://crawler:Pass2022@100.64.0.21/crawler')
- # 加载断点信息
- start_id = load_checkpoint()
- # 设置批次大小
- batch_size = 10000
- current_id = start_id
- total_migrated = 0
- logger.info(f"开始从ID {start_id} 处继续迁移数据...")
- while True:
- # 分批读取数据,使用ID作为游标而不是OFFSET,提高性能
- query = """
- SELECT * FROM whatnot_shop_record
- WHERE id > %s
- ORDER BY id
- LIMIT %s
- """
- logger.info(f"正在读取ID大于 {current_id} 的数据,批次大小: {batch_size}")
- df = pd.read_sql(query, source_engine, params=(current_id, batch_size))
- # 如果没有更多数据,退出循环
- if df.empty:
- logger.info("已读取完所有数据")
- break
- logger.info(f"读取到 {len(df)} 条记录")
- # 记录当前批次的最大ID,用于下次查询的起点
- max_id_in_batch = int(df['id'].max())
- # 删除id列
- if 'id' in df.columns:
- df = df.drop('id', axis=1)
- logger.info("已删除id列")
- # 写入目标数据库
- logger.info("开始写入目标数据库...")
- df.to_sql('whatnot_shop_record', target_engine, if_exists='append', index=False)
- # 更新计数器
- total_migrated += len(df)
- logger.success(f"成功迁移 {len(df)} 条记录")
- # 更新当前ID为本批次最大ID
- current_id = max_id_in_batch
- # 保存断点信息
- save_checkpoint(current_id)
- logger.success(f"数据迁移完成,总共迁移 {total_migrated} 条记录")
- # 清除断点文件
- clear_checkpoint()
- except Exception as e:
- logger.error(f"数据迁移过程中发生错误: {e}")
- logger.info(f"迁移中断,下次运行将从ID {current_id} 继续")
- raise
- # 执行迁移
- if __name__ == "__main__":
- migrate_with_resume()
|