| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.12.10
- # Date : 2026/5/28
- """
- 90sAuctions 增量爬虫(半月调度)
- 逻辑:
- 1. GET 首页解析当前网站全部 auction id
- 2. 查库 select distinct auction_id from auctions90s_record,得到已爬过的 auction
- 3. 差集 = 新增 auction
- 4. 没有新增 → 本轮无数据可抓,结束
- 5. 对每个新增 auction:postback 切换 → 翻页 → 写库
- 6. 补抓 state != 1 的详情页
- """
- import time
- import random
- import inspect
- import schedule
- from curl_cffi import requests
- from loguru import logger
- from tenacity import retry, stop_after_attempt, wait_fixed
- from mysql_pool import MySQLConnectionPool
- from auctions90s_core import (
- TABLE_NAME,
- client_identifier_list,
- crawl_one_auction,
- get_auction_list,
- update_details_for_pending,
- after_log,
- )
- logger.remove()
- logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- def get_existing_auction_ids(log, sql_pool):
- """
- 查库返回已爬过的 auction_id 集合。
- :param log: (loguru.Logger) 日志对象
- :param sql_pool: (MySQLConnectionPool) 数据库连接池
- :return: (set[str]) 已存在的 auction_id 集合(字符串形式,与首页解析值对齐)
- """
- rows = sql_pool.select_all(
- f"select distinct auction_id from {TABLE_NAME} where auction_id is not null"
- )
- ids = {str(r[0]) for r in rows} if rows else set()
- log.info(f"库中已存在 {len(ids)} 个 auction_id: {sorted(ids)}")
- return ids
- def diff_new_auctions(log, all_auctions, existing_ids):
- """
- 从首页解析的全部 auctions 中筛出库里没有的。
- :param log: (loguru.Logger) 日志对象
- :param all_auctions: (list[dict]) get_auction_list 返回的全部拍卖会列表
- :param existing_ids: (set[str]) 已存在的 auction_id 集合
- :return: (list[dict]) 新增待抓的 auction 列表
- """
- new_list = [a for a in all_auctions if a["id"] not in existing_ids]
- log.info(f"新增待抓取 auction 数: {len(new_list)} -> {[(a['id'], a['name']) for a in new_list]}")
- return new_list
- def run_incremental(log, sql_pool):
- """
- 增量抓取主流程:拉首页 → 差集 → 逐个抓新增 auction。
- :param log: (loguru.Logger) 日志对象
- :param sql_pool: (MySQLConnectionPool) 数据库连接池
- :return: None
- """
- impersonate = random.choice(client_identifier_list)
- with requests.Session() as session:
- try:
- all_auctions = get_auction_list(log, session, impersonate)
- except Exception as e:
- log.error(f"获取拍卖会列表失败: {e}")
- return
- existing_ids = get_existing_auction_ids(log, sql_pool)
- new_auctions = diff_new_auctions(log, all_auctions, existing_ids)
- if not new_auctions:
- log.info("本轮无新增 auction,跳过 list 抓取")
- return
- for idx, auc in enumerate(new_auctions, 1):
- aid, name = auc["id"], auc["name"]
- log.info(f"========== [{idx}/{len(new_auctions)}] 开始抓 auction={aid} ({name}) ==========")
- try:
- crawl_one_auction(log, sql_pool, session, impersonate,
- auction_id=aid, auction_name=name)
- except Exception as e:
- log.error(f"auction={aid} 抓取异常: {e}")
- continue
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def nineties_main(log):
- """
- 日调度主函数:增量 list + 补详情。失败时按小时级重试(最多 100 次)。
- :param log: (loguru.Logger) 日志对象
- :return: None
- :raises Exception: MySQL 连接失败时抛出,由 tenacity 触发整轮重试
- """
- log.info(f'开始运行 {inspect.currentframe().f_code.co_name} 增量爬虫任务 ...')
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool:
- log.error("MySQL数据库连接失败")
- raise Exception("MySQL数据库连接失败")
- try:
- try:
- run_incremental(log, sql_pool)
- except Exception as e:
- log.error(f'增量抓取失败: {e}')
- try:
- update_details_for_pending(log, sql_pool)
- except Exception as e:
- log.error(f'详情补抓失败: {e}')
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮采集 ...')
- def schedule_task():
- """
- 启动半月调度:脚本启动时先跑一次,之后每月 1 号和 15 号 05:00 各跑一次。
- :return: None(永不返回,内部死循环)
- """
- nineties_main(log=logger)
- def run_semimonthly():
- # 每月 1 号和 15 号执行(半月一次)
- from datetime import date
- if date.today().day in (1, 15):
- nineties_main(log=logger)
- schedule.every().day.at("05:00").do(run_semimonthly)
- while True:
- schedule.run_pending()
- time.sleep(1)
- if __name__ == '__main__':
- # nineties_main(log=logger)
- schedule_task()
|