# -*- coding: utf-8 -*- # Author : Charley # Python : 3.12.10 # Date : 2026/5/29 13:52 """ Wheatland 增量爬虫(日调度) 逻辑(两阶段,与 lelands 一致): 1. GET 首页解析当前全部 auction id 2. 查库 select distinct auction_id from wheatland_record,得到已爬过的 auction 3. 差集 = 新增 auction 4. 没有新增 → 本轮无数据可抓,结束 5. 阶段一 列表:对每个新增 auction postback 切换 → 解析整张列表入库 6. 阶段二 详情:扫库 state != 1 的记录 → 逐条进 LotDetail 抓多图写回 目标网站: https://wheatlandauctionservices.com/auctionresults.aspx """ import time import random import inspect import schedule from curl_cffi import requests from loguru import logger from tenacity import retry, stop_after_attempt, wait_fixed from wheatland_core import ( client_identifier_list, crawl_one_auction, get_auction_list, update_details_for_pending, after_log, ) logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding="utf-8", rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") def get_existing_auction_ids(log, sql_pool): """查库返回已爬过的 auction_id 集合。 Args: log: logger 对象。 sql_pool: MySQL 连接池;为 None 时返回空集合(首次跑)。 Returns: set[str]: 已存在的 auction_id 字符串集合。 """ if sql_pool is None: log.warning("sql_pool 为 None,视为库内无任何 auction(将全量重抓)") return set() rows = sql_pool.select_all( "select distinct auction_id from wheatland_record where auction_id is not null" ) ids = {str(r[0]) for r in rows} if rows else set() log.info(f"库中已存在 {len(ids)} 个 auction_id: {sorted(ids)}") return ids def diff_new_auctions(log, all_auctions, existing_ids): """从首页解析的全部 auctions 中筛出库里没有的。 Args: log: logger 对象。 all_auctions (list[dict]): get_auction_list 返回的全部 auction。 existing_ids (set[str]): 已存在的 auction_id 集合。 Returns: list[dict]: 待抓取的新 auction 列表。 """ new_list = [a for a in all_auctions if a["id"] not in existing_ids] log.info(f"新增待抓取 auction 数: {len(new_list)} -> {[(a['id'], a['name']) for a in new_list]}") return new_list def run_incremental(log, sql_pool): """增量抓取主流程。 Args: log: logger 对象。 sql_pool: MySQL 连接池;为 None 时不入库,仅在内存收集 print 样本。 """ impersonate = random.choice(client_identifier_list) with requests.Session() as session: try: all_auctions = get_auction_list(log, session, impersonate) except Exception as e: log.error(f"获取 auction 列表失败: {e}") return existing_ids = get_existing_auction_ids(log, sql_pool) new_auctions = diff_new_auctions(log, all_auctions, existing_ids) if not new_auctions: log.info("本轮无新增 auction,跳过 list 抓取") return collected = [] for idx, auc in enumerate(new_auctions, 1): aid, name = auc["id"], auc["name"] log.info(f"========== [{idx}/{len(new_auctions)}] 开始抓 auction={aid} ({name}) ==========") try: lots = crawl_one_auction(log, sql_pool, session, impersonate, auction_id=aid, auction_name=name) if sql_pool is None: collected.extend(lots) except Exception as e: log.error(f"auction={aid} 抓取异常: {e}") continue if sql_pool is None: log.info(f"增量抓取结束,共 {len(collected)} 条 lot(未入库)") for row in collected[:3]: print(row) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def wld_main(log): """日调度主函数:增量 list + 补详情。 Args: log: logger 对象。 """ log.info(f"开始运行 {inspect.currentframe().f_code.co_name} 增量爬虫任务 ...") # 主公自己接 MySQL;暂时传 None,crawl_one_auction 会跳过入库 sql_pool = None try: # 阶段一:抓新增 auction 的列表入库 try: run_incremental(log, sql_pool) except Exception as e: log.error(f"增量抓取失败: {e}") # 阶段二:扫库 state != 1 的记录补抓详情多图(未接库时跳过) if sql_pool is not None: try: update_details_for_pending(log, sql_pool) except Exception as e: log.error(f"详情补抓失败: {e}") except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") finally: log.info(f"爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮采集 ...") def schedule_task(): """启动调度器:先即时跑一次,之后每月 1 号和 15 号 05:00 跑一次(半月一次)。""" wld_main(log=logger) def run_semimonthly(): from datetime import date if date.today().day in (1, 15): wld_main(log=logger) schedule.every().day.at("05:00").do(run_semimonthly) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": # 测试时直接跑一次 # wld_main(log=logger) # 上生产再切回 schedule schedule_task()