断点续迁pgsql表.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/8/7 14:34
  5. import pandas as pd
  6. from sqlalchemy import create_engine
  7. from loguru import logger
  8. import os
  9. import json
  10. # 断点续传文件路径
  11. CHECKPOINT_FILE = "migration_checkpoint_pgsql.json"
  12. def save_checkpoint(last_id: int) -> None:
  13. """保存断点信息"""
  14. checkpoint_data = {
  15. "last_processed_id": last_id,
  16. "table_name": "whatnot_shop_record"
  17. }
  18. # 修复类型注解问题:移除 TextIO 类型声明
  19. with open(CHECKPOINT_FILE, 'w', encoding='utf-8') as f:
  20. json.dump(checkpoint_data, f)
  21. logger.info(f"保存断点信息: last_processed_id={last_id}")
  22. def load_checkpoint() -> int:
  23. """加载断点信息"""
  24. if os.path.exists(CHECKPOINT_FILE):
  25. # 修复类型注解问题:移除 TextIO 类型声明
  26. with open(CHECKPOINT_FILE, 'r', encoding='utf-8') as f:
  27. checkpoint_data = json.load(f)
  28. logger.info(f"加载断点信息: last_processed_id={checkpoint_data['last_processed_id']}")
  29. return checkpoint_data['last_processed_id']
  30. return 0
  31. def clear_checkpoint() -> None:
  32. """清除断点信息"""
  33. if os.path.exists(CHECKPOINT_FILE):
  34. os.remove(CHECKPOINT_FILE)
  35. logger.info("清除断点信息")
  36. def migrate_with_resume() -> None:
  37. current_id: int = 0 # 初始化变量,解决未定义问题
  38. try:
  39. # 创建数据库连接 (PostgreSQL)
  40. source_engine = create_engine('postgresql://username:password@source_host:5432/source_db')
  41. target_engine = create_engine('postgresql://username:password@target_host:5432/target_db')
  42. # 加载断点信息
  43. start_id = load_checkpoint()
  44. # 设置批次大小
  45. batch_size = 10000
  46. current_id = start_id
  47. total_migrated = 0
  48. logger.info(f"开始从ID {start_id} 处继续迁移数据...")
  49. while True:
  50. # 分批读取数据,使用ID作为游标而不是OFFSET,提高性能
  51. # PostgreSQL使用%s作为参数占位符
  52. query = """
  53. SELECT * FROM whatnot_shop_record
  54. WHERE id > %s
  55. ORDER BY id
  56. LIMIT %s
  57. """
  58. logger.info(f"正在读取ID大于 {current_id} 的数据,批次大小: {batch_size}")
  59. df = pd.read_sql(query, source_engine, params=(current_id, batch_size))
  60. # 如果没有更多数据,退出循环
  61. if df.empty:
  62. logger.info("已读取完所有数据")
  63. break
  64. logger.info(f"读取到 {len(df)} 条记录")
  65. # 记录当前批次的最大ID,用于下次查询的起点
  66. max_id_in_batch = int(df['id'].max())
  67. # # 删除id列
  68. # if 'id' in df.columns:
  69. # df = df.drop('id', axis=1)
  70. # logger.info("已删除id列")
  71. # 写入目标数据库
  72. logger.info("开始写入目标数据库...")
  73. df.to_sql('whatnot_shop_record', target_engine, if_exists='append', index=False)
  74. # 更新计数器
  75. total_migrated += len(df)
  76. logger.success(f"成功迁移 {len(df)} 条记录")
  77. # 更新当前ID为本批次最大ID
  78. current_id = max_id_in_batch
  79. # 保存断点信息
  80. save_checkpoint(current_id)
  81. logger.success(f"数据迁移完成,总共迁移 {total_migrated} 条记录")
  82. # 清除断点文件
  83. clear_checkpoint()
  84. except Exception as e:
  85. logger.error(f"数据迁移过程中发生错误: {e}")
  86. logger.info(f"迁移中断,下次运行将从ID {current_id} 继续")
  87. raise
  88. # 执行迁移
  89. if __name__ == "__main__":
  90. migrate_with_resume()