断点续迁mysql表.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/8/6 16:53
  5. import pandas as pd
  6. from sqlalchemy import create_engine
  7. from loguru import logger
  8. import os
  9. import json
  10. from typing import TextIO
  11. # 配置 loguru
  12. logger.add("data_migration.log", rotation="500 MB", level="INFO")
  13. # 断点续传文件路径
  14. CHECKPOINT_FILE = "migration_checkpoint.json"
  15. def save_checkpoint(last_id: int) -> None:
  16. """保存断点信息"""
  17. checkpoint_data = {
  18. "last_processed_id": last_id,
  19. "table_name": "whatnot_shop_record"
  20. }
  21. with open(CHECKPOINT_FILE, 'w', encoding='utf-8') as f: # type: TextIO
  22. json.dump(checkpoint_data, f)
  23. logger.info(f"保存断点信息: last_processed_id={last_id}")
  24. def load_checkpoint() -> int:
  25. """加载断点信息"""
  26. if os.path.exists(CHECKPOINT_FILE):
  27. with open(CHECKPOINT_FILE, 'r', encoding='utf-8') as f: # type: TextIO
  28. checkpoint_data = json.load(f)
  29. logger.info(f"加载断点信息: last_processed_id={checkpoint_data['last_processed_id']}")
  30. return checkpoint_data['last_processed_id']
  31. return 0
  32. def clear_checkpoint() -> None:
  33. """清除断点信息"""
  34. if os.path.exists(CHECKPOINT_FILE):
  35. os.remove(CHECKPOINT_FILE)
  36. logger.info("清除断点信息")
  37. def migrate_with_resume() -> None:
  38. current_id: int = 0 # 初始化变量,解决未定义问题
  39. try:
  40. # 创建数据库连接
  41. source_engine = create_engine('mysql+pymysql://crawler:Pass2022@100.64.0.23/crawler')
  42. target_engine = create_engine('mysql+pymysql://crawler:Pass2022@100.64.0.21/crawler')
  43. # 加载断点信息
  44. start_id = load_checkpoint()
  45. # 设置批次大小
  46. batch_size = 10000
  47. current_id = start_id
  48. total_migrated = 0
  49. logger.info(f"开始从ID {start_id} 处继续迁移数据...")
  50. while True:
  51. # 分批读取数据,使用ID作为游标而不是OFFSET,提高性能
  52. query = """
  53. SELECT * FROM whatnot_shop_record
  54. WHERE id > %s
  55. ORDER BY id
  56. LIMIT %s
  57. """
  58. logger.info(f"正在读取ID大于 {current_id} 的数据,批次大小: {batch_size}")
  59. df = pd.read_sql(query, source_engine, params=(current_id, batch_size))
  60. # 如果没有更多数据,退出循环
  61. if df.empty:
  62. logger.info("已读取完所有数据")
  63. break
  64. logger.info(f"读取到 {len(df)} 条记录")
  65. # 记录当前批次的最大ID,用于下次查询的起点
  66. max_id_in_batch = int(df['id'].max())
  67. # 删除id列
  68. if 'id' in df.columns:
  69. df = df.drop('id', axis=1)
  70. logger.info("已删除id列")
  71. # 写入目标数据库
  72. logger.info("开始写入目标数据库...")
  73. df.to_sql('whatnot_shop_record', target_engine, if_exists='append', index=False)
  74. # 更新计数器
  75. total_migrated += len(df)
  76. logger.success(f"成功迁移 {len(df)} 条记录")
  77. # 更新当前ID为本批次最大ID
  78. current_id = max_id_in_batch
  79. # 保存断点信息
  80. save_checkpoint(current_id)
  81. logger.success(f"数据迁移完成,总共迁移 {total_migrated} 条记录")
  82. # 清除断点文件
  83. clear_checkpoint()
  84. except Exception as e:
  85. logger.error(f"数据迁移过程中发生错误: {e}")
  86. logger.info(f"迁移中断,下次运行将从ID {current_id} 继续")
  87. raise
  88. # 执行迁移
  89. if __name__ == "__main__":
  90. migrate_with_resume()