# -*- coding: utf-8 -*- # Author : Charley # Python : 3.12.10 # Date : 2026/5/18 15:17 """ Memory Lane 增量爬虫(日调度) 逻辑: 1. GET 首页解析当前网站全部 auction id 2. 查库 select distinct auction_id from memory_lane_record,得到已爬过的 auction 3. 差集 = 新增 auction 4. 没有新增 → 本轮无数据可抓,结束 5. 对每个新增 auction:postback 切换 → 翻页 → 写库 6. 补抓 state != 1 的详情页 """ import time import random import inspect import schedule from curl_cffi import requests from loguru import logger from tenacity import retry, stop_after_attempt, wait_fixed from mysql_pool import MySQLConnectionPool from meml_core import ( client_identifier_list, crawl_one_auction, get_auction_list, update_details_for_pending, after_log, ) logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") def get_existing_auction_ids(log, sql_pool): """查库返回已爬过的 auction_id 集合""" rows = sql_pool.select_all( "select distinct auction_id from memory_lane_record where auction_id is not null" ) ids = {str(r[0]) for r in rows} if rows else set() log.info(f"库中已存在 {len(ids)} 个 auction_id: {sorted(ids)}") return ids def diff_new_auctions(log, all_auctions, existing_ids): """从首页解析的全部 auctions 中筛出库里没有的""" new_list = [a for a in all_auctions if a["id"] not in existing_ids] log.info(f"新增待抓取 auction 数: {len(new_list)} -> {[(a['id'], a['name']) for a in new_list]}") return new_list def run_incremental(log, sql_pool): """增量抓取主流程""" impersonate = random.choice(client_identifier_list) with requests.Session() as session: try: all_auctions = get_auction_list(log, session, impersonate) except Exception as e: log.error(f"获取拍卖会列表失败: {e}") return existing_ids = get_existing_auction_ids(log, sql_pool) new_auctions = diff_new_auctions(log, all_auctions, existing_ids) if not new_auctions: log.info("本轮无新增 auction,跳过 list 抓取") return for idx, auc in enumerate(new_auctions, 1): aid, name = auc["id"], auc["name"] log.info(f"========== [{idx}/{len(new_auctions)}] 开始抓 auction={aid} ({name}) ==========") try: crawl_one_auction(log, sql_pool, session, impersonate, auction_id=aid, auction_name=name) except Exception as e: log.error(f"auction={aid} 抓取异常: {e}") continue @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def meml_main(log): """日调度主函数:增量 list + 补详情""" log.info(f'开始运行 {inspect.currentframe().f_code.co_name} 增量爬虫任务 ...') sql_pool = MySQLConnectionPool(log=log) if not sql_pool: log.error("MySQL数据库连接失败") raise Exception("MySQL数据库连接失败") try: try: run_incremental(log, sql_pool) except Exception as e: log.error(f'增量抓取失败: {e}') try: update_details_for_pending(log, sql_pool) except Exception as e: log.error(f'详情补抓失败: {e}') except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮采集 ...') def schedule_task(): """每半个月跑一次增量""" meml_main(log=logger) def run_semimonthly(): # 每月 1 号和 15 号执行(半月一次) from datetime import date if date.today().day in (1, 15): meml_main(log=logger) schedule.every().day.at("05:00").do(run_semimonthly) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": # meml_main(log=logger) schedule_task()