Prechádzať zdrojové kódy

feat(memory_lane_spider): 实现 Memory Lane 拍卖数据增量爬虫框架

- 新增 meml_core.py,封装 HTTP 配置、session 管理、拍卖列表及详情解析
- 重构 meml_spdier.py,调整增量逻辑,支持差量拍卖会抓取及详情补抓
- 优化 mysql_pool.py,完善数据库操作断连重试及重复键处理
- 增加 YamlLoader.py 路径解析功能,支持按启动脚本目录查找配置文件
- 新增 requirements.txt,统一依赖版本管理
- 代码整体提升健壮性及日志规范,支持定时半月增量调度与异常重试机制
charley 1 týždeň pred
rodič
commit
7dacbc4d2d

+ 45 - 25
memory_lane_spider/YamlLoader.py

@@ -11,10 +11,10 @@ regex = re.compile(r'^\$\{(?P<ENV>[A-Z_\-]+:)?(?P<VAL>[\w.]+)}$')
 class YamlConfig:
     def __init__(self, config):
         self.config = config
-    
+
     def get(self, key: str):
         return YamlConfig(self.config.get(key))
-    
+
     def getValueAsString(self, key: str):
         try:
             match = regex.match(self.config[key])
@@ -25,7 +25,7 @@ class YamlConfig:
             return None
         except:
             return self.config[key]
-    
+
     def getValueAsInt(self, key: str):
         try:
             match = regex.match(self.config[key])
@@ -36,7 +36,7 @@ class YamlConfig:
             return 0
         except:
             return int(self.config[key])
-    
+
     def getValueAsBool(self, key: str):
         try:
             match = regex.match(self.config[key])
@@ -49,30 +49,50 @@ class YamlConfig:
             return bool(self.config[key])
 
 
-def readYaml(path: str = 'application.yml', profile: str = None) -> YamlConfig:
+def _resolve_path(path: str) -> str:
+    """
+    解析 yaml 文件路径,按优先级查找:
+      1) 绝对路径或 cwd 下存在 → 直接用(保留旧行为,向后兼容)
+      2) 调用方主脚本所在目录 → 兜底,方便打包后从任意 cwd 启动
+    :param path: (str) 用户传入的路径,默认 'application.yml'
+    :return: (str) 实际可读取的完整路径;找不到则返回原 path 让 open() 抛错
+    """
+    # 1) 旧行为:cwd 或绝对路径
     if os.path.exists(path):
-        with open(path) as fd:
-            conf = yaml.load(fd, Loader=yaml.FullLoader)
-    
+        return path
+
+    # 2) 主脚本目录(__main__.__file__)
+    try:
+        import __main__
+        main_file = getattr(__main__, '__file__', None)
+        if main_file:
+            candidate = os.path.join(os.path.dirname(os.path.abspath(main_file)), path)
+            if os.path.exists(candidate):
+                return candidate
+    except Exception:
+        pass
+
+    return path
+
+
+def readYaml(path: str = 'application.yml', profile: str = None) -> YamlConfig:
+    """
+    读取 yaml 配置。
+    :param path: (str) yaml 文件路径,默认 'application.yml'。
+                       优先 cwd / 绝对路径(保留旧行为),找不到再 fallback 到主脚本所在目录。
+    :param profile: (str) 可选环境后缀,如 'dev' 会额外加载 'application-dev.yml' 并 update
+    :return: (YamlConfig) 配置访问对象
+    :raises FileNotFoundError: cwd 和主脚本目录都找不到时抛出
+    """
+    real_path = _resolve_path(path)
+    with open(real_path, encoding='utf-8') as fd:
+        conf = yaml.load(fd, Loader=yaml.FullLoader)
+
     if profile is not None:
-        result = path.split('.')
+        result = real_path.rsplit('.', 1)
         profiledYaml = f'{result[0]}-{profile}.{result[1]}'
         if os.path.exists(profiledYaml):
-            with open(profiledYaml) as fd:
+            with open(profiledYaml, encoding='utf-8') as fd:
                 conf.update(yaml.load(fd, Loader=yaml.FullLoader))
-    
-    return YamlConfig(conf)
-
-# res = readYaml()
-# mysqlConf = res.get('mysql')
-# print(mysqlConf)
 
-# print(res.getValueAsString("host"))
-# mysqlYaml = mysqlConf.getValueAsString("host")
-# print(mysqlYaml)
-# host = mysqlYaml.get("host").split(':')[-1][:-1]
-# port = mysqlYaml.get("port").split(':')[-1][:-1]
-# username = mysqlYaml.get("username").split(':')[-1][:-1]
-# password = mysqlYaml.get("password").split(':')[-1][:-1]
-# mysql_db = mysqlYaml.get("db").split(':')[-1][:-1]
-# print(host,port,username,password)
+    return YamlConfig(conf)

+ 298 - 0
memory_lane_spider/meml_core.py

@@ -0,0 +1,298 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.12.10
+# Date   : 2026/5/21
+"""
+Memory Lane 公用模块:HTTP 配置、ASP.NET postback 切换 auction、单页解析、详情解析。
+被 meml_history.py / meml_spdier.py 复用。
+
+目标网站: https://bid.memorylaneinc.com/lots/gallery
+"""
+import random
+from curl_cffi import requests
+import user_agent
+from loguru import logger
+from parsel import Selector
+from curl_cffi.requests import BrowserType
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+GALLERY_URL = "https://bid.memorylaneinc.com/lots/gallery"
+
+# 直接用库内置的所有浏览器指纹
+client_identifier_list = [b.value for b in BrowserType]
+
+headers = {
+    "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
+    "user-agent": user_agent.generate_user_agent()
+}
+
+
+def after_log(retry_state):
+    """tenacity retry 回调"""
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]
+    else:
+        log = logger
+
+    if retry_state.outcome.failed:
+        log.warning(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(2), after=after_log)
+def get_proxys(log):
+    """
+    获取代理
+    :param log: logger 对象
+    :return: 代理字典
+    """
+    http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36931"
+    https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36931"
+    try:
+        return {"http": http_proxy, "https": https_proxy}
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+def _pick_hidden(selector, field_id):
+    """
+    从页面提取 ASP.NET 隐藏字段(__VIEWSTATE 等)
+    :param selector: parsel.Selector 对象
+    :param field_id: 隐藏字段的 id,如 __VIEWSTATE
+    :return: 隐藏字段的值,失败返回空字符串
+    """
+    return selector.xpath(f'//input[@id="{field_id}"]/@value').get() or ''
+
+
+def parse_auction_list(selector):
+    """
+    从 gallery 页面解析所有拍卖会下拉项
+    :param selector: parsel.Selector 对象
+    :return: [{"id": "-1", "name": "All Auctions"}, {"id": "162", "name": "Spring Rarities 2026 Auction"}, ...]
+    """
+    options = selector.xpath('//select[@id="Auction"]/option')
+    result = []
+    for opt in options:
+        aid = opt.xpath('./@value').get()
+        name = opt.xpath('./text()').get()
+        if aid is None:
+            continue
+        result.append({"id": aid.strip(), "name": (name or '').strip()})
+    return result
+
+
+@retry(stop=stop_after_attempt(3), wait=wait_fixed(2), after=after_log)
+def get_auction_list(log, session, impersonate):
+    """
+    GET gallery 首页,解析出全部拍卖会列表(排除 -1 All Auctions)
+    :param log: logger 对象
+    :param session: requests.Session 对象
+    :param impersonate: 浏览器指纹标识(与 setup 时一致)
+    :return: [{"id": "162", "name": "Spring Rarities 2026 Auction"}, ...]
+    """
+    log.info("获取全部拍卖会列表")
+    resp = session.get(GALLERY_URL, headers=headers, impersonate=impersonate,
+                       proxies=get_proxys(log), timeout=15)
+    resp.raise_for_status()
+    sel = Selector(resp.text)
+    all_opts = parse_auction_list(sel)
+    # 过滤掉 All Auctions(-1),只保留具体拍卖会
+    real = [o for o in all_opts if o["id"] != "-1"]
+    log.info(f"共解析到 {len(real)} 个拍卖会:{[(o['id'], o['name']) for o in real]}")
+    return real
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(2), after=after_log)
+def setup_auction_session(log, session, impersonate, auction_id):
+    """
+    通过 ASP.NET __doPostBack 将 Auction 筛选切换到指定 auction_id。
+    切换后服务端 session 记住该选择,后续 GET /lots/gallery?page=N 都返回该 auction 数据。
+    :param log: logger 对象
+    :param session: requests.Session 对象
+    :param impersonate: 浏览器指纹标识(与 setup 时一致)
+    :param auction_id: '-1'(All Auctions) 或具体 id 如 '162'
+    """
+    log.info(f"切换 Auction -> {auction_id}")
+    proxies = get_proxys(log)
+
+    # 1) 首次 GET 拿 ViewState
+    resp = session.get(GALLERY_URL, headers=headers, impersonate=impersonate,
+                       proxies=proxies, timeout=15)
+    resp.raise_for_status()
+    sel = Selector(resp.text)
+
+    form_data = {
+        '__EVENTTARGET': 'ctl00$Auction',
+        '__EVENTARGUMENT': '',
+        '__LASTFOCUS': '',
+        '__VIEWSTATE': _pick_hidden(sel, '__VIEWSTATE'),
+        '__VIEWSTATEGENERATOR': _pick_hidden(sel, '__VIEWSTATEGENERATOR'),
+        '__EVENTVALIDATION': _pick_hidden(sel, '__EVENTVALIDATION'),
+        'ctl00$SearchIn': 'title',
+        'ctl00$SearchText': '',
+        'ctl00$BrowseBy': 'gallery',
+        'ctl00$Auction': str(auction_id),
+    }
+
+    post_headers = {
+        **headers,
+        'Content-Type': 'application/x-www-form-urlencoded',
+        'Referer': GALLERY_URL,
+        'Origin': 'https://bid.memorylaneinc.com',
+    }
+
+    resp = session.post(GALLERY_URL, headers=post_headers, data=form_data,
+                        impersonate=impersonate, proxies=proxies, timeout=20)
+    resp.raise_for_status()
+
+    # 验证切换是否成功
+    sel2 = Selector(resp.text)
+    selected_val = sel2.xpath('//select[@id="Auction"]/option[@selected]/@value').get()
+    log.info(f"切换后 Auction 选中值: {selected_val}")
+    if selected_val != str(auction_id):
+        raise RuntimeError(f"切换 Auction 失败,预期 {auction_id} 实际 {selected_val}")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(2), after=after_log)
+def get_single_page(log, page, sql_pool, session, impersonate,
+                    auction_id=None, auction_name=None):
+    """
+    获取单页数据
+    :param log: logger 对象
+    :param page: 页码
+    :param sql_pool: mysql 连接池
+    :param session: requests.Session 对象
+    :param impersonate: 浏览器指纹标识(与 setup 时一致)
+    :param auction_id: 当前 session 切换到的 auction id,会写入 memory_lane_record.auction_id
+    :param auction_name: 同上,写入 memory_lane_record.auction_name
+    :return: 该页解析到的条数
+    """
+    log.info(f">>>>>>>>>>>>>> 正在爬取 auction={auction_id}({auction_name}) 第 {page} 页 <<<<<<<<<<<<<<")
+
+    response = session.get(GALLERY_URL, impersonate=impersonate, headers=headers,
+                           params={"page": f"{page}"},
+                           proxies=get_proxys(log), timeout=10, allow_redirects=False)
+    response.raise_for_status()
+
+    selector = Selector(response.text)
+    tag_div_list = selector.xpath(
+        '//div[@class="items"]/div/div[@class="row"]//div[@class="col-lg-3 col-md-4 col-sm-6"]')
+
+    if not tag_div_list or len(tag_div_list) == 0:
+        log.warning(f"--------------- 第 {page} 页无数据 ---------------")
+        return 0
+
+    info_list = []
+    for tag_div in tag_div_list:
+        title = tag_div.xpath('.//p/a/text()').get()
+        detail_url = tag_div.xpath('.//p/a/@href').get()
+
+        tag_div_p = tag_div.xpath('.//div/p[2]/strong/text()').getall()
+        bids = tag_div_p[0] if tag_div_p else None
+        opening_bid = tag_div_p[1] if len(tag_div_p) > 1 else None
+        opening_bid = opening_bid.replace('$', '').replace(',', '').strip() if opening_bid else None
+
+        status = tag_div_p[2] if len(tag_div_p) > 2 else None
+        current_bid = tag_div.xpath('.//div[@class="item-price"]/a/text()').get()
+        current_bid = current_bid.replace('CURRENT BID $', '').replace(',', '').strip() if current_bid else None
+
+        data_dict = {
+            "title": title,
+            "detail_url": detail_url,
+            "bids": bids,
+            "opening_bid": opening_bid,
+            "status": status,
+            "current_bid": current_bid,
+            "auction_id": int(auction_id) if auction_id is not None else None,
+            "auction_name": auction_name,
+        }
+        info_list.append(data_dict)
+
+    if info_list and sql_pool is not None:
+        sql_pool.insert_many(table="memory_lane_record", data_list=info_list, ignore=True)
+    return len(info_list)
+
+
+def crawl_one_auction(log, sql_pool, session, impersonate,
+                      auction_id, auction_name, max_page=460):
+    """
+    抓取单个拍卖会的全部页(switch 到该 auction → 翻页直到无数据)
+    :param log: logger 对象
+    :param sql_pool: mysql 连接池
+    :param session: requests.Session 对象
+    :param impersonate: 浏览器指纹标识(与 setup 时一致)
+    :param auction_id: 当前 session 切换到的 auction id,会写入 memory_lane_record.auction_id
+    :param auction_name: 同上,写入 memory_lane_record.auction_name
+    :param max_page: 最大页码
+    :return: 该 auction 抓到的总条数
+    """
+    setup_auction_session(log, session, impersonate, auction_id)
+
+    page = 1
+    total = 0
+    while page <= max_page:
+        try:
+            n = get_single_page(log, page, sql_pool, session, impersonate,
+                                auction_id=auction_id, auction_name=auction_name)
+        except Exception as e:
+            log.error(f"auction={auction_id} page={page} 抓取失败: {e}")
+            break
+        if n == 0:
+            log.info(f"auction={auction_id} 翻到第 {page} 页无数据,结束")
+            break
+        total += n
+        page += 1
+    log.info(f"auction={auction_id}({auction_name}) 共抓取 {total} 条")
+    return total
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(2), after=after_log)
+def get_details(log, url, sql_pool, sql_id):
+    """
+    获取详情页:分类、描述、图片列表,写回数据库
+    :param log: logger 对象
+    :param url: 详情页 URL
+    :param sql_pool: mysql 连接池
+    :param sql_id: 数据库记录 ID
+    """
+    log.info(f">>>>>>>>>>>>>> 正在爬取详情数据URL: {url} <<<<<<<<<<<<<<")
+    response = requests.get(url, headers=headers,
+                            impersonate=random.choice(client_identifier_list),
+                            timeout=10, proxies=get_proxys(log))
+    response.raise_for_status()
+    selector = Selector(response.text)
+    category = selector.xpath('//a[@id="MainContent_hCategory"]/text()').get()
+    # description = selector.xpath('//*[@id="MainContent_lblOldAuction"]/text()').getall()
+    # description = ' '.join(description).strip() if description else None
+    imgs = selector.xpath('//div[@class="col-md-5 col-sm-5"]//a[not(@id="Zoomer")]/@href').getall()
+    imgs = ','.join(imgs) if imgs else None
+
+    sql_pool.update_one_or_dict(
+        table="memory_lane_record",
+        data={"category": category, "imgs": imgs, "state": 1},
+        condition={"id": sql_id}
+    )
+
+
+def update_details_for_pending(log, sql_pool):
+    """
+    扫描库里 state != 1 的记录,逐条抓详情
+    :param log: logger 对象
+    :param sql_pool: mysql 连接池
+    """
+    log.debug('Updating detail pages ...........................')
+    sql_result = sql_pool.select_all(
+        'select id, detail_url from memory_lane_record where state != 1 order by id')
+    for row in sql_result:
+        sql_id, detail_url = row[0], row[1]
+        try:
+            get_details(log, detail_url, sql_pool, sql_id)
+        except Exception as e:
+            log.error(f'Error getting details for {detail_url}: {e}')
+            sql_pool.update_one_or_dict(
+                table="memory_lane_record",
+                data={"state": 2},
+                condition={"id": sql_id}
+            )

+ 75 - 186
memory_lane_spider/meml_spdier.py

@@ -2,191 +2,89 @@
 # Author : Charley
 # Python : 3.12.10
 # Date   : 2026/5/18 15:17
-import random
+"""
+Memory Lane 增量爬虫(日调度)
+逻辑:
+  1. GET 首页解析当前网站全部 auction id
+  2. 查库 select distinct auction_id from memory_lane_record,得到已爬过的 auction
+  3. 差集 = 新增 auction
+  4. 没有新增 → 本轮无数据可抓,结束
+  5. 对每个新增 auction:postback 切换 → 翻页 → 写库
+  6. 补抓 state != 1 的详情页
+"""
 import time
+import random
 import inspect
 import schedule
-import user_agent
-from loguru import logger
-from parsel import Selector
 from curl_cffi import requests
-from curl_cffi.requests import BrowserType
-from mysql_pool import MySQLConnectionPool
+from loguru import logger
 from tenacity import retry, stop_after_attempt, wait_fixed
 
-"""
-目标网站:https://bid.memorylaneinc.com/lots/gallery?page=2
-"""
+from mysql_pool import MySQLConnectionPool
+from meml_core import (
+    client_identifier_list,
+    crawl_one_auction,
+    get_auction_list,
+    update_details_for_pending,
+    after_log,
+)
 
 logger.remove()
 logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
            format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
            level="DEBUG", retention="7 day")
 
-# 直接用库内置的所有浏览器类型,不用手动维护列表
-client_identifier_list = [b.value for b in BrowserType]
-# print(client_identifier_list)
-
-headers = {
-    "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
-    "user-agent": user_agent.generate_user_agent()
-}
-
-
-def after_log(retry_state):
-    """
-    retry 回调
-    :param retry_state: RetryCallState 对象
-    """
-    # 检查 args 是否存在且不为空
-    if retry_state.args and len(retry_state.args) > 0:
-        log = retry_state.args[0]  # 获取传入的 logger
-    else:
-        log = logger  # 使用全局 logger
-
-    if retry_state.outcome.failed:
-        log.warning(
-            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
-    else:
-        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
 
+def get_existing_auction_ids(log, sql_pool):
+    """查库返回已爬过的 auction_id 集合"""
+    rows = sql_pool.select_all(
+        "select distinct auction_id from memory_lane_record where auction_id is not null"
+    )
+    ids = {str(r[0]) for r in rows} if rows else set()
+    log.info(f"库中已存在 {len(ids)} 个 auction_id: {sorted(ids)}")
+    return ids
 
-@retry(stop=stop_after_attempt(5), wait=wait_fixed(2), after=after_log)
-def get_proxys(log):
-    http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36931"
-    https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36931"
 
-    try:
-        proxySettings = {
-            "http": http_proxy,
-            "https": https_proxy,
-        }
-        return proxySettings
-    except Exception as e:
-        log.error(f"Error getting proxy: {e}")
-        raise e
-
-
-@retry(stop=stop_after_attempt(5), wait=wait_fixed(2), after=after_log)
-def get_details(log, url, sql_pool, sql_id):
-    """
-    获取详情数据
-    :param log: logger对象
-    :param url: 详情页URL
-    :param sql_pool: MySQL连接池
-    :param sql_id: 数据ID
-    :return: 标题和描述
-    """
-    log.info(f">>>>>>>>>>>>>> 正在爬取详情数据URL: {url} <<<<<<<<<<<<<<")
-    response = requests.get(url, headers=headers, impersonate=random.choice(client_identifier_list), timeout=10,
-                            proxies=get_proxys(log))
-    response.raise_for_status()
-    selector = Selector(response.text)
-    category = selector.xpath('//a[@id="MainContent_hCategory"]/text()').get()
-    description = selector.xpath('//*[@id="MainContent_lblOldAuction"]/text()').getall()
-    description = ' '.join(description).strip() if description else None
-    # imgs = selector.xpath('//div[@class="col-md-5 col-sm-5"]//a[@class="MagicThumb-swap"]/@href').getall()
-    imgs = selector.xpath('//div[@class="col-md-5 col-sm-5"]//a[not(@id="Zoomer")]/@href').getall()
-    imgs = ','.join(imgs) if imgs else None
-    # print(category, description, imgs)
-
-    # 更新数据和状态
-    sql_pool.update_one_or_dict(
-        table="memory_lane_record",
-        data={"category": category, "description": description, "imgs": imgs, "state": 1},
-        condition={"id": sql_id}
-    )
+def diff_new_auctions(log, all_auctions, existing_ids):
+    """从首页解析的全部 auctions 中筛出库里没有的"""
+    new_list = [a for a in all_auctions if a["id"] not in existing_ids]
+    log.info(f"新增待抓取 auction 数: {len(new_list)} -> {[(a['id'], a['name']) for a in new_list]}")
+    return new_list
 
 
-def get_single_page(log, page, sql_pool):
-    """
-    获取单页数据
-    :param log: logger对象
-    :param page: 页码
-    :param sql_pool: MySQL连接池
-    :return: 该页数据条数
-    """
-    log.info(f"Getting page -> {page} data....................................................")
-    url = "https://bid.memorylaneinc.com/lots/gallery"
-    params = {"page": page}
-    # response = requests.get(url, headers=headers, impersonate="chrome124", params=params, timeout=10)
-    response = requests.get(url, headers=headers, impersonate=random.choice(client_identifier_list), params=params,
-                            proxies=get_proxys(log), timeout=10)
-    # print(response.text)
-    response.raise_for_status()
-
-    selector = Selector(response.text)
-    tag_div_list = selector.xpath(
-        '//div[@class="items"]/div/div[@class="row"]//div[@class="col-lg-3 col-md-4 col-sm-6"]')
-    # print('tag_div_list:', tag_div_list)
-
-    info_list = []
-    for tag_div in tag_div_list:
-        title = tag_div.xpath('.//p/a/text()').get()
-        detail_url = tag_div.xpath('.//p/a/@href').get()
-        # img = tag_div.xpath('.//div[@class="item-image"]/a/img/@src').get()
-
-        tag_div_p = tag_div.xpath('.//div/p[2]/strong/text()').getall()
-        bids = tag_div_p[0] if tag_div_p else None
-        opening_bid = tag_div_p[1] if len(tag_div_p) > 1 else None
-        opening_bid = opening_bid.replace('$', '').replace(',', '').strip() if opening_bid else None
-
-        status = tag_div_p[2] if len(tag_div_p) > 2 else None
-        current_bid = tag_div.xpath('.//div[@class="item-price"]/a/text()').get()
-        current_bid = current_bid.replace('CURRENT BID $', '').replace(',', '').strip() if current_bid else None
-
-        data_dict = {
-            "title": title,
-            "detail_url": detail_url,
-            # "img": img,
-            "bids": bids,
-            "opening_bid": opening_bid,
-            "status": status,
-            "current_bid": current_bid
-        }
-        # print('data_dict:', data_dict)
-        info_list.append(data_dict)
-
-    # 保存数据到数据库
-    if info_list:
-        sql_pool.insert_many(table="memory_lane_record", data_list=info_list, ignore=True)
-    return len(info_list)
-
-
-def get_sold_list(log, sql_pool):
-    """
-    获取已售列表
-    :param log: logger对象
-    :param sql_pool: MySQL连接池
-    :return: 无
-    """
-    page = 1
-    max_page = 10
-
-    while page <= max_page:
+def run_incremental(log, sql_pool):
+    """增量抓取主流程"""
+    impersonate = random.choice(client_identifier_list)
+    with requests.Session() as session:
         try:
-            len_list = get_single_page(log, page, sql_pool)
+            all_auctions = get_auction_list(log, session, impersonate)
         except Exception as e:
-            log.error(f"Error getting page {page}: {e}")
-            continue
+            log.error(f"获取拍卖会列表失败: {e}")
+            return
 
-        if len_list == 0:
-            log.warning(f"No data on page {page}, stopping further requests")
-            break
+        existing_ids = get_existing_auction_ids(log, sql_pool)
+        new_auctions = diff_new_auctions(log, all_auctions, existing_ids)
 
-        page += 1
+        if not new_auctions:
+            log.info("本轮无新增 auction,跳过 list 抓取")
+            return
+
+        for idx, auc in enumerate(new_auctions, 1):
+            aid, name = auc["id"], auc["name"]
+            log.info(f"========== [{idx}/{len(new_auctions)}] 开始抓 auction={aid} ({name}) ==========")
+            try:
+                crawl_one_auction(log, sql_pool, session, impersonate,
+                                  auction_id=aid, auction_name=name)
+            except Exception as e:
+                log.error(f"auction={aid} 抓取异常: {e}")
+                continue
 
 
 @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
 def meml_main(log):
-    """
-    主函数
-    :param log: logger对象
-    """
-    log.info(
-        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
-
-    # 配置 MySQL 连接池
+    """日调度主函数:增量 list + 补详情"""
+    log.info(f'开始运行 {inspect.currentframe().f_code.co_name} 增量爬虫任务 ...')
+
     sql_pool = MySQLConnectionPool(log=log)
     if not sql_pool:
         log.error("MySQL数据库连接失败")
@@ -194,46 +92,37 @@ def meml_main(log):
 
     try:
         try:
-            get_sold_list(log, sql_pool)
+            run_incremental(log, sql_pool)
         except Exception as e:
-            log.error(f'Error getting sold list: {e}')
-
-        # 更新详情页
-        log.debug('Updating detail pages........................... started')
-        # sql_result = sql_pool.select_all('select id, detail_url from memory_lane_record where state = 0')
-        sql_result = sql_pool.select_all('select id, detail_url from memory_lane_record where state != 1 order by id')
-        for row in sql_result:
-            sql_id = row[0]
-            detail_url = row[1]
-            try:
-                get_details(log, detail_url, sql_pool, sql_id)
-            except Exception as e:
-                log.error(f'Error getting details for {detail_url}: {e}')
-                # 更新数据和状态
-                sql_pool.update_one_or_dict(
-                    table="memory_lane_record",
-                    data={"state": 2},
-                    condition={"id": sql_id}
-                )
+            log.error(f'增量抓取失败: {e}')
+
+        try:
+            update_details_for_pending(log, sql_pool)
+        except Exception as e:
+            log.error(f'详情补抓失败: {e}')
 
     except Exception as e:
         log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
     finally:
-        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮采集 ...')
 
 
 def schedule_task():
-    """
-    设置定时任务
-    """
+    """每半个月跑一次增量"""
     meml_main(log=logger)
 
-    schedule.every().day.at("05:00").do(meml_main, log=logger)
+    def run_semimonthly():
+        # 每月 1 号和 15 号执行(半月一次)
+        from datetime import date
+        if date.today().day in (1, 15):
+            meml_main(log=logger)
+
+    schedule.every().day.at("05:00").do(run_semimonthly)
     while True:
         schedule.run_pending()
         time.sleep(1)
 
 
 if __name__ == "__main__":
-    # get_single_page(log=logger, page=1, sql_pool=None)
+    # meml_main(log=logger)
     schedule_task()

+ 69 - 48
memory_lane_spider/mysql_pool.py

@@ -44,36 +44,75 @@ class MySQLConnectionPool:
             user=sql_user,
             password=sql_password,
             database=sql_db,
-            charset="utf8mb4",
-            use_unicode=True,
-            init_command="SET NAMES utf8mb4",
-            ping=1,  # 0:完全关闭(更快), 1:仅在取连接时检查, 2:每次执行前检查连接有效性,防止使用已断开的连接
+            ping=2,  # 每次执行前检查连接有效性,防止使用已断开的连接
             connect_timeout=5,  # 连接超时时间(秒)
             # read_timeout=30,  # 读取超时时间(秒)
             write_timeout=30  # 写入超时时间(秒)
         )
 
+    # def _execute(self, query, args=None, commit=False):
+    #     """
+    #     执行SQL
+    #     :param query: SQL语句
+    #     :param args: SQL参数
+    #     :param commit: 是否提交事务
+    #     :return: 查询结果
+    #     """
+    #     try:
+    #         with self.pool.connection() as conn:
+    #             with conn.cursor() as cursor:
+    #                 cursor.execute(query, args)
+    #                 if commit:
+    #                     conn.commit()
+    #                 self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
+    #                 return cursor
+    #     except Exception as e:
+    #         if commit and conn:
+    #             conn.rollback()
+    #         self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
+    #         raise e
+
     def _execute(self, query, args=None, commit=False):
         """
-        执行SQL
+        执行SQL(带断连重试)
         :param query: SQL语句
         :param args: SQL参数
         :param commit: 是否提交事务
         :return: 查询结果
         """
-        try:
-            with self.pool.connection() as conn:
-                with conn.cursor() as cursor:
-                    cursor.execute(query, args)
-                    if commit:
-                        conn.commit()
-                    self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
-                    return cursor
-        except Exception as e:
-            if commit and conn:
-                conn.rollback()
-            self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
-            raise e
+        conn = None
+        for attempt in range(2):  # 最多重试1次
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.execute(query, args)
+                        if commit:
+                            conn.commit()
+                        self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
+                        return cursor
+            except pymysql.err.InterfaceError as e:
+                # 连接已断开,重试一次
+                if attempt == 0:
+                    self.log.warning(f"数据库连接断开,正在重试... Error: {e}")
+                    continue
+                self.log.error(f"重试后仍失败: {e}, Query: {query}")
+                raise e
+            except pymysql.err.IntegrityError:
+                # 完整性错误(如重复条目)交由上层处理,避免在此打印完整堆栈污染日志
+                if commit and conn:
+                    try:
+                        conn.rollback()
+                    except Exception:
+                        pass
+                raise
+            except Exception as e:
+                if commit and conn:
+                    try:
+                        conn.rollback()
+                    except Exception:
+                        pass
+                self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
+                raise e
 
     def select_one(self, query, args=None):
         """
@@ -183,13 +222,14 @@ class MySQLConnectionPool:
             return cursor.lastrowid
         except pymysql.err.IntegrityError as e:
             if "Duplicate entry" in str(e):
-                self.log.warning(f"插入失败:重复条目,已跳过。错误详情: {e}")
+                # 重复条目用 warning 简短输出,不打印堆栈
+                self.log.warning(f"插入跳过-重复条目 Table: {table}, {e.args[1] if len(e.args) > 1 else e}")
                 return -1  # 返回 -1 表示重复条目被跳过
             else:
-                self.log.exception(f"数据库完整性错误: {e}")
+                self.log.error(f"数据库完整性错误 Table: {table}, Error: {e}")
                 raise
         except Exception as e:
-            self.log.exception(f"未知错误: {e}")
+            self.log.error(f"insert_one_or_dict 失败 Table: {table}, Error: {e}")
             raise
 
     def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
@@ -621,30 +661,11 @@ class MySQLConnectionPool:
 
 if __name__ == '__main__':
     sql_pool = MySQLConnectionPool()
-    # data_dic = {'card_type_id': 111, 'card_type_name': '补充包 继承的意志【OPC-13】', 'card_type_position': 964,
-    #             'card_id': 5284, 'card_name': '蒙奇·D·路飞', 'card_number': 'OP13-001', 'card_rarity': 'L',
-    #             'card_img': 'https://source.windoent.com/OnePiecePc/Picture/1757929283612OP13-001.png',
-    #             'card_life': '4', 'card_attribute': '打', 'card_power': '5000', 'card_attack': '-',
-    #             'card_color': '红/绿', 'subscript': 4, 'card_features': '超新星/草帽一伙',
-    #             'card_text_desc': '【咚!!×1】【对方的攻击时】我方处于活跃状态的咚!!不多于5张的场合,可以将我方任意张数的咚!!转为休息状态。每有1张转为休息状态的咚!!,本次战斗中,此领袖或我方最多1张拥有《草帽一伙》特征的角色力量+2000。',
-    #             'card_offer_type': '补充包 继承的意志【OPC-13】', 'crawler_language': '简中'}
-    # sql_pool.insert_one_or_dict(table="one_piece_record", data=data_dic)
-
-    sql_pool.insert_many(
-        table="jhs_product_record",
-        data_list=[
-            {
-                "product_id": 99999991,
-                "seller_username": "浣熊小助理(裸卡版)",
-                "auction_product_name": "2000 日文 无编号 #175 U 波克比 有瑕疵",
-            },
-            {
-                "product_id": 99999992,
-                "seller_username": "测试商家二号",
-                "auction_product_name": "中文批量插入测试",
-            },
-        ],
-        ignore=False
-    )
-
-
+    data_dic = {'card_type_id': 111, 'card_type_name': '补充包 继承的意志【OPC-13】', 'card_type_position': 964,
+                'card_id': 5284, 'card_name': '蒙奇·D·路飞', 'card_number': 'OP13-001', 'card_rarity': 'L',
+                'card_img': 'https://source.windoent.com/OnePiecePc/Picture/1757929283612OP13-001.png',
+                'card_life': '4', 'card_attribute': '打', 'card_power': '5000', 'card_attack': '-',
+                'card_color': '红/绿', 'subscript': 4, 'card_features': '超新星/草帽一伙',
+                'card_text_desc': '【咚!!×1】【对方的攻击时】我方处于活跃状态的咚!!不多于5张的场合,可以将我方任意张数的咚!!转为休息状态。每有1张转为休息状态的咚!!,本次战斗中,此领袖或我方最多1张拥有《草帽一伙》特征的角色力量+2000。',
+                'card_offer_type': '补充包 继承的意志【OPC-13】', 'crawler_language': '简中'}
+    sql_pool.insert_one_or_dict(table="one_piece_record", data=data_dic)

+ 10 - 0
memory_lane_spider/requirements.txt

@@ -0,0 +1,10 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+curl_cffi==0.15.1b1
+DBUtils==3.1.2
+loguru==0.7.3
+parsel==1.11.0
+PyMySQL==1.1.2
+PyYAML==6.0.3
+schedule==1.2.2
+tenacity==9.1.4
+user_agent==0.1.14