wheatland_spider.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.12.10
  4. # Date : 2026/5/29 13:52
  5. """
  6. Wheatland 增量爬虫(日调度)
  7. 逻辑(两阶段,与 lelands 一致):
  8. 1. GET 首页解析当前全部 auction id
  9. 2. 查库 select distinct auction_id from wheatland_record,得到已爬过的 auction
  10. 3. 差集 = 新增 auction
  11. 4. 没有新增 → 本轮无数据可抓,结束
  12. 5. 阶段一 列表:对每个新增 auction postback 切换 → 解析整张列表入库
  13. 6. 阶段二 详情:扫库 state != 1 的记录 → 逐条进 LotDetail 抓多图写回
  14. 目标网站: https://wheatlandauctionservices.com/auctionresults.aspx
  15. """
  16. import time
  17. import random
  18. import inspect
  19. import schedule
  20. from curl_cffi import requests
  21. from loguru import logger
  22. from tenacity import retry, stop_after_attempt, wait_fixed
  23. from wheatland_core import (
  24. client_identifier_list,
  25. crawl_one_auction,
  26. get_auction_list,
  27. update_details_for_pending,
  28. after_log,
  29. )
  30. logger.remove()
  31. logger.add("./logs/{time:YYYYMMDD}.log", encoding="utf-8", rotation="00:00",
  32. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  33. level="DEBUG", retention="7 day")
  34. def get_existing_auction_ids(log, sql_pool):
  35. """查库返回已爬过的 auction_id 集合。
  36. Args:
  37. log: logger 对象。
  38. sql_pool: MySQL 连接池;为 None 时返回空集合(首次跑)。
  39. Returns:
  40. set[str]: 已存在的 auction_id 字符串集合。
  41. """
  42. if sql_pool is None:
  43. log.warning("sql_pool 为 None,视为库内无任何 auction(将全量重抓)")
  44. return set()
  45. rows = sql_pool.select_all(
  46. "select distinct auction_id from wheatland_record where auction_id is not null"
  47. )
  48. ids = {str(r[0]) for r in rows} if rows else set()
  49. log.info(f"库中已存在 {len(ids)} 个 auction_id: {sorted(ids)}")
  50. return ids
  51. def diff_new_auctions(log, all_auctions, existing_ids):
  52. """从首页解析的全部 auctions 中筛出库里没有的。
  53. Args:
  54. log: logger 对象。
  55. all_auctions (list[dict]): get_auction_list 返回的全部 auction。
  56. existing_ids (set[str]): 已存在的 auction_id 集合。
  57. Returns:
  58. list[dict]: 待抓取的新 auction 列表。
  59. """
  60. new_list = [a for a in all_auctions if a["id"] not in existing_ids]
  61. log.info(f"新增待抓取 auction 数: {len(new_list)} -> {[(a['id'], a['name']) for a in new_list]}")
  62. return new_list
  63. def run_incremental(log, sql_pool):
  64. """增量抓取主流程。
  65. Args:
  66. log: logger 对象。
  67. sql_pool: MySQL 连接池;为 None 时不入库,仅在内存收集 print 样本。
  68. """
  69. impersonate = random.choice(client_identifier_list)
  70. with requests.Session() as session:
  71. try:
  72. all_auctions = get_auction_list(log, session, impersonate)
  73. except Exception as e:
  74. log.error(f"获取 auction 列表失败: {e}")
  75. return
  76. existing_ids = get_existing_auction_ids(log, sql_pool)
  77. new_auctions = diff_new_auctions(log, all_auctions, existing_ids)
  78. if not new_auctions:
  79. log.info("本轮无新增 auction,跳过 list 抓取")
  80. return
  81. collected = []
  82. for idx, auc in enumerate(new_auctions, 1):
  83. aid, name = auc["id"], auc["name"]
  84. log.info(f"========== [{idx}/{len(new_auctions)}] 开始抓 auction={aid} ({name}) ==========")
  85. try:
  86. lots = crawl_one_auction(log, sql_pool, session, impersonate,
  87. auction_id=aid, auction_name=name)
  88. if sql_pool is None:
  89. collected.extend(lots)
  90. except Exception as e:
  91. log.error(f"auction={aid} 抓取异常: {e}")
  92. continue
  93. if sql_pool is None:
  94. log.info(f"增量抓取结束,共 {len(collected)} 条 lot(未入库)")
  95. for row in collected[:3]:
  96. print(row)
  97. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  98. def wld_main(log):
  99. """日调度主函数:增量 list + 补详情。
  100. Args:
  101. log: logger 对象。
  102. """
  103. log.info(f"开始运行 {inspect.currentframe().f_code.co_name} 增量爬虫任务 ...")
  104. # 主公自己接 MySQL;暂时传 None,crawl_one_auction 会跳过入库
  105. sql_pool = None
  106. try:
  107. # 阶段一:抓新增 auction 的列表入库
  108. try:
  109. run_incremental(log, sql_pool)
  110. except Exception as e:
  111. log.error(f"增量抓取失败: {e}")
  112. # 阶段二:扫库 state != 1 的记录补抓详情多图(未接库时跳过)
  113. if sql_pool is not None:
  114. try:
  115. update_details_for_pending(log, sql_pool)
  116. except Exception as e:
  117. log.error(f"详情补抓失败: {e}")
  118. except Exception as e:
  119. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  120. finally:
  121. log.info(f"爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮采集 ...")
  122. def schedule_task():
  123. """启动调度器:先即时跑一次,之后每月 1 号和 15 号 05:00 跑一次(半月一次)。"""
  124. wld_main(log=logger)
  125. def run_semimonthly():
  126. from datetime import date
  127. if date.today().day in (1, 15):
  128. wld_main(log=logger)
  129. schedule.every().day.at("05:00").do(run_semimonthly)
  130. while True:
  131. schedule.run_pending()
  132. time.sleep(1)
  133. if __name__ == "__main__":
  134. # 测试时直接跑一次
  135. # wld_main(log=logger)
  136. # 上生产再切回 schedule
  137. schedule_task()