lelands_spider.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.12.10
  4. # Date : 2026/5/13 15:54
  5. """
  6. Lelands 增量爬虫(日调度)
  7. 逻辑:
  8. 1. GET 首页解析当前网站全部 auction id
  9. 2. 查库 select distinct auction_id from lelands_record,得到已爬过的 auction
  10. 3. 差集 = 新增 auction
  11. 4. 没有新增 → 本轮无数据可抓,结束
  12. 5. 对每个新增 auction:postback 切换 → 翻页 → 写库
  13. 6. 补抓 state != 1 的详情页
  14. """
  15. import time
  16. import random
  17. import inspect
  18. import schedule
  19. from curl_cffi import requests
  20. from loguru import logger
  21. from tenacity import retry, stop_after_attempt, wait_fixed
  22. from mysql_pool import MySQLConnectionPool
  23. from lelands_core import (
  24. client_identifier_list,
  25. crawl_one_auction,
  26. get_auction_list,
  27. update_details_for_pending,
  28. after_log,
  29. )
  30. logger.remove()
  31. logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  32. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  33. level="DEBUG", retention="7 day")
  34. def get_existing_auction_ids(log, sql_pool):
  35. """查库返回已爬过的 auction_id 集合"""
  36. rows = sql_pool.select_all(
  37. "select distinct auction_id from lelands_record where auction_id is not null"
  38. )
  39. ids = {str(r[0]) for r in rows} if rows else set()
  40. log.info(f"库中已存在 {len(ids)} 个 auction_id: {sorted(ids)}")
  41. return ids
  42. def diff_new_auctions(log, all_auctions, existing_ids):
  43. """从首页解析的全部 auctions 中筛出库里没有的"""
  44. new_list = [a for a in all_auctions if a["id"] not in existing_ids]
  45. log.info(f"新增待抓取 auction 数: {len(new_list)} -> {[(a['id'], a['name']) for a in new_list]}")
  46. return new_list
  47. def run_incremental(log, sql_pool):
  48. """增量抓取主流程"""
  49. impersonate = random.choice(client_identifier_list)
  50. with requests.Session() as session:
  51. try:
  52. all_auctions = get_auction_list(log, session, impersonate)
  53. except Exception as e:
  54. log.error(f"获取拍卖会列表失败: {e}")
  55. return
  56. existing_ids = get_existing_auction_ids(log, sql_pool)
  57. new_auctions = diff_new_auctions(log, all_auctions, existing_ids)
  58. if not new_auctions:
  59. log.info("本轮无新增 auction,跳过 list 抓取")
  60. return
  61. for idx, auc in enumerate(new_auctions, 1):
  62. aid, name = auc["id"], auc["name"]
  63. log.info(f"========== [{idx}/{len(new_auctions)}] 开始抓 auction={aid} ({name}) ==========")
  64. try:
  65. crawl_one_auction(log, sql_pool, session, impersonate,
  66. auction_id=aid, auction_name=name)
  67. except Exception as e:
  68. log.error(f"auction={aid} 抓取异常: {e}")
  69. continue
  70. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  71. def lds_main(log):
  72. """日调度主函数:增量 list + 补详情"""
  73. log.info(f'开始运行 {inspect.currentframe().f_code.co_name} 增量爬虫任务 ...')
  74. sql_pool = MySQLConnectionPool(log=log)
  75. if not sql_pool:
  76. log.error("MySQL数据库连接失败")
  77. raise Exception("MySQL数据库连接失败")
  78. try:
  79. try:
  80. run_incremental(log, sql_pool)
  81. except Exception as e:
  82. log.error(f'增量抓取失败: {e}')
  83. try:
  84. update_details_for_pending(log, sql_pool)
  85. except Exception as e:
  86. log.error(f'详情补抓失败: {e}')
  87. except Exception as e:
  88. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  89. finally:
  90. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮采集 ...')
  91. def schedule_task():
  92. """每半个月 跑一次增量"""
  93. lds_main(log=logger)
  94. def run_semimonthly():
  95. # 每月 1 号和 15 号执行(半月一次)
  96. from datetime import date
  97. if date.today().day in (1, 15):
  98. lds_main(log=logger)
  99. schedule.every().day.at("05:00").do(run_semimonthly)
  100. while True:
  101. schedule.run_pending()
  102. time.sleep(1)
  103. if __name__ == '__main__':
  104. # lds_main(log=logger)
  105. schedule_task()