Эх сурвалжийг харах

update req & cxx spider 1.30.1

charley 5 өдөр өмнө
parent
commit
697a26e10a

+ 10 - 0
breakninja_spider/requirements.txt

@@ -0,0 +1,10 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+pandas==2.3.3
+parsel==1.10.0
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+schedule==1.2.2
+tenacity==9.1.2

+ 10 - 0
cgc_spider/requirements.txt

@@ -0,0 +1,10 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+parsel==1.10.0
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+schedule==1.2.2
+tenacity==9.1.2
+user_agent==0.1.14

+ 12 - 0
courtyard_spider/requirements.txt

@@ -0,0 +1,12 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+curl_cffi==0.14.0
+DBUtils==3.1.2
+DrissionPage==4.1.0.18
+loguru==0.7.3
+parsel==1.10.0
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+schedule==1.2.2
+tenacity==9.1.2
+user_agent==0.1.14

+ 78 - 0
cxx_spider/YamlLoader.py

@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/12/22 10:44
+import os, re
+import yaml
+
+regex = re.compile(r'^\$\{(?P<ENV>[A-Z_\-]+:)?(?P<VAL>[\w.]+)}$')
+
+
+class YamlConfig:
+    def __init__(self, config):
+        self.config = config
+    
+    def get(self, key: str):
+        return YamlConfig(self.config.get(key))
+    
+    def getValueAsString(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return os.getenv(env, group['VAL'])
+            return None
+        except:
+            return self.config[key]
+    
+    def getValueAsInt(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return int(os.getenv(env, group['VAL']))
+            return 0
+        except:
+            return int(self.config[key])
+    
+    def getValueAsBool(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return bool(os.getenv(env, group['VAL']))
+            return False
+        except:
+            return bool(self.config[key])
+
+
+def readYaml(path: str = 'application.yml', profile: str = None) -> YamlConfig:
+    if os.path.exists(path):
+        with open(path) as fd:
+            conf = yaml.load(fd, Loader=yaml.FullLoader)
+    
+    if profile is not None:
+        result = path.split('.')
+        profiledYaml = f'{result[0]}-{profile}.{result[1]}'
+        if os.path.exists(profiledYaml):
+            with open(profiledYaml) as fd:
+                conf.update(yaml.load(fd, Loader=yaml.FullLoader))
+    
+    return YamlConfig(conf)
+
+# res = readYaml()
+# mysqlConf = res.get('mysql')
+# print(mysqlConf)
+
+# print(res.getValueAsString("host"))
+# mysqlYaml = mysqlConf.getValueAsString("host")
+# print(mysqlYaml)
+# host = mysqlYaml.get("host").split(':')[-1][:-1]
+# port = mysqlYaml.get("port").split(':')[-1][:-1]
+# username = mysqlYaml.get("username").split(':')[-1][:-1]
+# password = mysqlYaml.get("password").split(':')[-1][:-1]
+# mysql_db = mysqlYaml.get("db").split(':')[-1][:-1]
+# print(host,port,username,password)

+ 163 - 0
cxx_spider/add_report.py

@@ -0,0 +1,163 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/1/29 17:00
+"""
+1768
+"""
+import requests
+
+from mysql_pool import MySQLConnectionPool
+from super_vault_daily_spider import HEADERS
+
+
+def get_report_single_page(log, page_num, detail_id, token):
+    """
+    获取单页数据
+    :param log: logger对象
+    :param page_num: 页码
+    :param detail_id: 商品id
+    :param token: token
+    :return: 数据
+    """
+    log.debug(f"正在获取第 {page_num} 页的 <拆卡报告> 数据.................")
+    # token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJDSEFPWElOWElORyNBUFAiLCJhdWQiOiJDSEFPWElOWElORyIsIm5iZiI6MTc2OTU4MTI5NiwiZGF0YSI6Ijk1MjMiLCJpc3MiOiI3ViNweHlQZSIsImV4cCI6MTc3MDc4MTI5NiwiaWF0IjoxNzY5NTgxMjk2LCJqdGkiOiIyYjkwNzZhMS0wYjU1LTQ0ZjItOGZlZC0yMWZiZmI0ZjUyYWIifQ.iDzTZLDslCP0y2nc2Jp4TGEsNbQiCRKcUeRsIyG3iOg"
+
+    url = "https://cxx.cardsvault.net/app/teamup/report/list"
+    data = {
+        "pageSize": 20,
+        "my": 0,
+        "pageNum": page_num,
+        # "tid": 1780
+        "tid": detail_id
+    }
+    HEADERS["Authorization"] = token
+    response = requests.post(url, headers=HEADERS, json=data, timeout=22)
+    # print(response.text)
+    response.raise_for_status()
+
+    result = response.json()
+    if result.get("status") == 200:
+        data = result.get("data", {})
+        total = data.get("total", 0)
+        current_page = data.get("pageNum", 1)
+        items = data.get("data", [])
+
+        log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
+        log.debug(f"当前页数据数量: {len(items)}")
+
+        return {
+            "total": total,
+            "current_page": current_page,
+            "items": items
+        }
+    else:
+        log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
+        return None
+
+
+def parse_report_items(log, detail_id, items):
+    """
+    解析列表项
+    :param log: logger对象
+    :param detail_id: 商品id
+    :param items: 列表项
+    :return: 解析后的列表项
+    """
+    parsed_items = []
+    log.debug(f"正在解析 <拆卡报告> 列表项.................")
+    for item in items:
+        userName = item.get("userName")
+        level = item.get("level")
+        teamNameCn = item.get("teamNameCn")
+        teamNameEn = item.get("teamNameEn")
+        count = item.get("count")
+
+        picture_url = item.get("picture", {}).get("url")
+        alias = item.get("alias")  # 别名
+        createTime = item.get("createTime")
+
+        data_dict = {
+            "pid": detail_id,
+            "user_name": userName,
+            "level": level,
+            "team_name_cn": teamNameCn,
+            "team_name_en": teamNameEn,
+            "count": count,
+            "picture_url": picture_url,
+            "alias": alias,
+            "create_time": createTime
+        }
+        # print(data_dict)
+        parsed_items.append(data_dict)
+
+    return parsed_items
+
+
+def get_report_list(log, detail_id, token, sql_pool):
+    """
+    获取列表数据
+    :param log: logger对象
+    :param detail_id: 商品id
+    :param token: token
+    :param sql_pool: 数据库连接池
+    """
+    page_num = 1
+    total_pages = 99
+    items_per_page = 20  # pageSize
+
+    all_items = []
+    while page_num <= total_pages:
+        log.debug(f"正在获取第 {page_num} 页的数据.................")
+        page_result = get_report_single_page(log, page_num, detail_id, token)
+
+        if not page_result:
+            log.error(f"获取第 {page_num} 页失败 !!!")
+            break
+
+        # 第一次请求时更新真实的总页数
+        if page_num == 1:
+            total_count = page_result["total"]
+            if total_count == 0:
+                log.info("No new records found.")
+                # 更改状态为2
+                # sql_pool.update_one_or_dict(
+                #     table="super_vault_product_record",
+                #     data={"report_state": 2},
+                #     condition={"pid": detail_id}
+                # )
+                break
+
+            total_pages = (total_count + items_per_page - 1) // items_per_page
+            log.info(f"总共 {total_pages} 页")
+
+        items = parse_report_items(log, detail_id, page_result["items"])
+        all_items.extend(items)
+
+        # sql_pool.update_one_or_dict(
+        #     table="super_vault_product_record",
+        #     data={"report_state": 1},
+        #     condition={"pid": detail_id}
+        # )
+
+        page_num += 1
+
+    log.debug(f"所有数据获取完成,共 {len(all_items)} 条记录")
+    # sql_pool.insert_many(table="super_vault_report_record", data_list=all_items, ignore=True)
+
+    # 去重user_name字段  统计参与人数
+    unique_user_names = len(set(item["user_name"] for item in all_items))
+    log.info(f"---------------------------------------- 参与人数: {unique_user_names} ----------------------------------------")
+
+
+if __name__ == '__main__':
+    from loguru import logger
+
+    sql_pool = MySQLConnectionPool(log=logger)
+    token = sql_pool.select_one("SELECT token FROM super_vault_token")
+    # sql_detail_id_list = sql_pool.select_all("SELECT pid FROM super_vault_product_record WHERE report_state != 1")
+
+    # for detail_id in sql_detail_id_list:
+    #     get_report_list(logger, detail_id[0], token[0], None)
+    get_report_list(logger, 1774, token[0], sql_pool)
+

+ 6 - 0
cxx_spider/application.yml

@@ -0,0 +1,6 @@
+mysql:
+  host: ${MYSQL_HOST:100.64.0.25}
+  port: ${MYSQL_PROT:3306}
+  username: ${MYSQL_USERNAME:crawler}
+  password: ${MYSQL_PASSWORD:Pass2022}
+  db: ${MYSQL_DATABASE:crawler}

+ 56 - 0
cxx_spider/cxx_login.py

@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/1/30 11:43
+import requests
+import json
+
+
+headers = {
+    "User-Agent": "okhttp/4.9.0",
+    "Connection": "Keep-Alive",
+    "Accept-Encoding": "gzip",
+    "Authorization": "",
+    "CXX-APP-API-VERSION": "V2",
+    "deviceType": "2",
+    "udid": "20f902c10f6163a19bf137d801731d9f",
+    # "time": "1769751365500",
+    "Content-Type": "application/json; charset=UTF-8"
+}
+
+def send_code():
+
+    url = "https://cxx.cardsvault.net/app/py/login/mobile/code"
+    data = {
+        "zone": "86",
+        "mobile": "19521500850"
+    }
+    response = requests.post(url, headers=headers, json=data)
+    print(response.text)
+    print(response)
+
+
+def login():
+    url = "https://cxx.cardsvault.net/app/py/login/mobile"
+    data = {
+        # "code": "123123",
+        "code": code,
+        "mobile": "19521500850",
+        "region": {
+            "code": "86",
+            "code_en": "CN",
+            "country_cn": "中国",
+            "country_en": "China"
+        }
+    }
+    data = json.dumps(data, separators=(',', ':'))
+    response = requests.post(url, headers=headers, data=data)
+
+    print(response.text)
+    print(response)
+
+
+if __name__ == '__main__':
+    send_code()
+    code = input("请输入验证码:")
+    login()

+ 625 - 0
cxx_spider/mysql_pool.py

@@ -0,0 +1,625 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/3/25 14:14
+import re
+import pymysql
+import YamlLoader
+from loguru import logger
+from dbutils.pooled_db import PooledDB
+
+# 获取yaml配置
+yaml = YamlLoader.readYaml()
+mysqlYaml = yaml.get("mysql")
+sql_host = mysqlYaml.getValueAsString("host")
+sql_port = mysqlYaml.getValueAsInt("port")
+sql_user = mysqlYaml.getValueAsString("username")
+sql_password = mysqlYaml.getValueAsString("password")
+sql_db = mysqlYaml.getValueAsString("db")
+
+
+class MySQLConnectionPool:
+    """
+    MySQL连接池
+    """
+
+    def __init__(self, mincached=4, maxcached=5, maxconnections=10, log=None):
+        """
+        初始化连接池
+        :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
+        :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
+        :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
+        :param log: 自定义日志记录器
+        """
+        # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
+        self.log = log or logger
+        self.pool = PooledDB(
+            creator=pymysql,
+            mincached=mincached,
+            maxcached=maxcached,
+            maxconnections=maxconnections,
+            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
+            host=sql_host,
+            port=sql_port,
+            user=sql_user,
+            password=sql_password,
+            database=sql_db,
+            ping=0  # 每次连接使用时自动检查有效性(0=不检查,1=执行query前检查,2=每次执行前检查)
+        )
+
+    def _execute(self, query, args=None, commit=False):
+        """
+        执行SQL
+        :param query: SQL语句
+        :param args: SQL参数
+        :param commit: 是否提交事务
+        :return: 查询结果
+        """
+        try:
+            with self.pool.connection() as conn:
+                with conn.cursor() as cursor:
+                    cursor.execute(query, args)
+                    if commit:
+                        conn.commit()
+                    self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
+                    return cursor
+        except Exception as e:
+            if commit:
+                conn.rollback()
+            self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
+            raise e
+
+    def select_one(self, query, args=None):
+        """
+        执行查询,返回单个结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchone()
+
+    def select_all(self, query, args=None):
+        """
+        执行查询,返回所有结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchall()
+
+    def insert_one(self, query, args):
+        """
+        执行单条插入语句
+        :param query: 插入语句
+        :param args: 插入参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        cursor = self._execute(query, args, commit=True)
+        return cursor.lastrowid  # 返回插入的ID
+
+    def insert_all(self, query, args_list):
+        """
+        执行批量插入语句,如果失败则逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql insert_all, SQL: {query[:100]}..., Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except pymysql.err.IntegrityError as e:
+            if "Duplicate entry" in str(e):
+                conn.rollback()
+                self.log.warning(f"批量插入遇到重复,开始逐条插入。错误: {e}")
+                rowcount = 0
+                for args in args_list:
+                    try:
+                        self.insert_one(query, args)
+                        rowcount += 1
+                    except pymysql.err.IntegrityError as e2:
+                        if "Duplicate entry" in str(e2):
+                            self.log.debug(f"跳过重复条目: {e2}")
+                        else:
+                            self.log.error(f"插入失败: {e2}")
+                    except Exception as e2:
+                        self.log.error(f"插入失败: {e2}")
+                self.log.info(f"逐条插入完成: {rowcount}/{len(args_list)}条")
+            else:
+                conn.rollback()
+                self.log.exception(f"数据库完整性错误: {e}")
+                raise e
+        except Exception as e:
+            conn.rollback()
+            self.log.exception(f"批量插入失败: {e}")
+            raise e
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def insert_one_or_dict(self, table=None, data=None, query=None, args=None, commit=True, ignore=False):
+        """
+        单条插入(支持字典或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data: 字典数据 {列名: 值}
+        :param query: 直接SQL语句(与data二选一)
+        :param args: SQL参数(query使用时必需)
+        :param commit: 是否自动提交
+        :param ignore: 是否使用ignore
+        :return: 最后插入ID
+        """
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data.keys()])
+            values = ', '.join(['%s'] * len(data))
+
+            # 构建 INSERT IGNORE 语句
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args = tuple(data.values())
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        try:
+            cursor = self._execute(query, args, commit)
+            self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+            return cursor.lastrowid
+        except pymysql.err.IntegrityError as e:
+            if "Duplicate entry" in str(e):
+                self.log.warning(f"插入失败:重复条目,已跳过。错误详情: {e}")
+                return -1  # 返回 -1 表示重复条目被跳过
+            else:
+                self.log.exception(f"数据库完整性错误: {e}")
+                raise
+        except Exception as e:
+            self.log.exception(f"未知错误: {e}")
+            raise
+
+    def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
+                    ignore=False):
+        """
+        批量插入(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :param ignore: 是否使用ignore
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+
+            # 构建 INSERT IGNORE 语句
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+            except pymysql.err.IntegrityError as e:
+                # 处理唯一索引冲突
+                if "Duplicate entry" in str(e):
+                    if ignore:
+                        # 如果使用了 INSERT IGNORE,理论上不会进这里,但以防万一
+                        self.log.warning(f"批量插入遇到重复条目(ignore模式): {e}")
+                    else:
+                        # 没有使用 IGNORE,降级为逐条插入
+                        self.log.warning(f"批量插入遇到重复条目,开始逐条插入。错误: {e}")
+                        if commit:
+                            conn.rollback()
+                        
+                        rowcount = 0
+                        for j, args in enumerate(batch):
+                            try:
+                                if data_list:
+                                    # 字典模式
+                                    self.insert_one_or_dict(
+                                        table=table,
+                                        data=dict(zip(data_list[0].keys(), args)),
+                                        commit=commit,
+                                        ignore=False  # 单条插入时手动捕获重复
+                                    )
+                                else:
+                                    # 原始SQL模式
+                                    self.insert_one(query, args)
+                                rowcount += 1
+                            except pymysql.err.IntegrityError as e2:
+                                if "Duplicate entry" in str(e2):
+                                    self.log.debug(f"跳过重复条目[{i+j+1}]: {e2}")
+                                else:
+                                    self.log.error(f"插入失败[{i+j+1}]: {e2}")
+                            except Exception as e2:
+                                self.log.error(f"插入失败[{i+j+1}]: {e2}")
+                        total += rowcount
+                        self.log.info(f"批次逐条插入完成: 成功{rowcount}/{len(batch)}条")
+                else:
+                    # 其他完整性错误
+                    self.log.exception(f"数据库完整性错误: {e}")
+                    if commit:
+                        conn.rollback()
+                    raise e
+            except Exception as e:
+                # 其他数据库错误
+                self.log.exception(f"批量插入失败: {e}")
+                if commit:
+                    conn.rollback()
+                raise e
+        if table:
+            self.log.info(f"sql insert_many, Table: {table}, Total Rows: {total}")
+        else:
+            self.log.info(f"sql insert_many, Query: {query}, Total Rows: {total}")
+        return total
+
+    def insert_many_two(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
+                        ignore=False):
+        """
+        批量插入(支持字典列表或原始SQL) - 备用方法
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :param ignore: 是否使用INSERT IGNORE
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+    
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+            except pymysql.err.IntegrityError as e:
+                if "Duplicate entry" in str(e) and not ignore:
+                    self.log.warning(f"批量插入遇到重复,降级为逐条插入: {e}")
+                    if commit:
+                        conn.rollback()
+                    rowcount = 0
+                    for args in batch:
+                        try:
+                            self.insert_one(query, args)
+                            rowcount += 1
+                        except pymysql.err.IntegrityError as e2:
+                            if "Duplicate entry" in str(e2):
+                                self.log.debug(f"跳过重复条目: {e2}")
+                            else:
+                                self.log.error(f"插入失败: {e2}")
+                        except Exception as e2:
+                            self.log.error(f"插入失败: {e2}")
+                    total += rowcount
+                else:
+                    self.log.exception(f"数据库完整性错误: {e}")
+                    if commit:
+                        conn.rollback()
+                    raise e
+            except Exception as e:
+                self.log.exception(f"批量插入失败: {e}")
+                if commit:
+                    conn.rollback()
+                raise e
+        self.log.info(f"sql insert_many_two, Table: {table}, Total Rows: {total}")
+        return total
+
+    def insert_too_many(self, query, args_list, batch_size=1000):
+        """
+        执行批量插入语句,分片提交, 单次插入大于十万+时可用, 如果失败则降级为逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        :param batch_size: 每次插入的条数
+        """
+        self.log.info(f"sql insert_too_many, Query: {query}, Total Rows: {len(args_list)}")
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        conn.commit()
+                        self.log.debug(f"insert_too_many -> Total Rows: {len(batch)}")
+            except Exception as e:
+                self.log.error(f"insert_too_many error. Trying single insert. Error: {e}")
+                # 当前批次降级为单条插入
+                for args in batch:
+                    self.insert_one(query, args)
+
+    def update_one(self, query, args):
+        """
+        执行单条更新语句
+        :param query: 更新语句
+        :param args: 更新参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def update_all(self, query, args_list):
+        """
+        执行批量更新语句,如果失败则逐条更新
+        :param query: 更新语句
+        :param args_list: 更新参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql update_all, SQL: {query}, Rows: {len(args_list)}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            # 如果批量更新失败,则逐条更新
+            rowcount = 0
+            for args in args_list:
+                self.update_one(query, args)
+                rowcount += 1
+            self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def update_one_or_dict(self, table=None, data=None, condition=None, query=None, args=None, commit=True):
+        """
+        单条更新(支持字典或原始SQL)
+        :param table: 表名(字典模式必需)
+        :param data: 字典数据 {列名: 值}(与 query 二选一)
+        :param condition: 更新条件,支持以下格式:
+            - 字典: {"id": 1} → "WHERE id = %s"
+            - 字符串: "id = 1" → "WHERE id = 1"(需自行确保安全)
+            - 元组: ("id = %s", [1]) → "WHERE id = %s"(参数化查询)
+        :param query: 直接SQL语句(与 data 二选一)
+        :param args: SQL参数(query 模式下必需)
+        :param commit: 是否自动提交
+        :return: 影响行数
+        :raises: ValueError 参数校验失败时抛出
+        """
+        # 参数校验
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+            if table is None:
+                raise ValueError("Table name is required for dictionary update")
+            if condition is None:
+                raise ValueError("Condition is required for dictionary update")
+
+            # 构建 SET 子句
+            set_clause = ", ".join([f"{self._safe_identifier(k)} = %s" for k in data.keys()])
+            set_values = list(data.values())
+
+            # 解析条件
+            condition_clause, condition_args = self._parse_condition(condition)
+            query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
+            args = set_values + condition_args
+
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        # 执行更新
+        cursor = self._execute(query, args, commit)
+        # self.log.debug(
+        #     f"Updated table={table}, rows={cursor.rowcount}, query={query[:100]}...",
+        #     extra={"table": table, "rows": cursor.rowcount}
+        # )
+        return cursor.rowcount
+
+    def _parse_condition(self, condition):
+        """
+        解析条件为 (clause, args) 格式
+        :param condition: 字典/字符串/元组
+        :return: (str, list) SQL 子句和参数列表
+        """
+        if isinstance(condition, dict):
+            clause = " AND ".join([f"{self._safe_identifier(k)} = %s" for k in condition.keys()])
+            args = list(condition.values())
+        elif isinstance(condition, str):
+            clause = condition  # 注意:需调用方确保安全
+            args = []
+        elif isinstance(condition, (tuple, list)) and len(condition) == 2:
+            clause, args = condition[0], condition[1]
+            if not isinstance(args, (list, tuple)):
+                args = [args]
+        else:
+            raise ValueError("Condition must be dict/str/(clause, args)")
+        return clause, args
+
+    def update_many(self, table=None, data_list=None, condition_list=None, query=None, args_list=None, batch_size=500,
+                    commit=True):
+        """
+        批量更新(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param condition_list: 条件列表(必须为字典,与data_list等长)
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+            if condition_list is None or len(data_list) != len(condition_list):
+                raise ValueError("Condition_list must be provided and match the length of data_list")
+            if not all(isinstance(cond, dict) for cond in condition_list):
+                raise ValueError("All elements in condition_list must be dictionaries")
+
+            # 获取第一个数据项和条件项的键
+            first_data_keys = set(data_list[0].keys())
+            first_cond_keys = set(condition_list[0].keys())
+
+            # 构造基础SQL
+            set_clause = ', '.join([self._safe_identifier(k) + ' = %s' for k in data_list[0].keys()])
+            condition_clause = ' AND '.join([self._safe_identifier(k) + ' = %s' for k in condition_list[0].keys()])
+            base_query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
+            total = 0
+
+            # 分批次处理
+            for i in range(0, len(data_list), batch_size):
+                batch_data = data_list[i:i + batch_size]
+                batch_conds = condition_list[i:i + batch_size]
+                batch_args = []
+
+                # 检查当前批次的结构是否一致
+                can_batch = True
+                for data, cond in zip(batch_data, batch_conds):
+                    data_keys = set(data.keys())
+                    cond_keys = set(cond.keys())
+                    if data_keys != first_data_keys or cond_keys != first_cond_keys:
+                        can_batch = False
+                        break
+                    batch_args.append(tuple(data.values()) + tuple(cond.values()))
+
+                if not can_batch:
+                    # 结构不一致,转为单条更新
+                    for data, cond in zip(batch_data, batch_conds):
+                        self.update_one_or_dict(table=table, data=data, condition=cond, commit=commit)
+                        total += 1
+                    continue
+
+                # 执行批量更新
+                try:
+                    with self.pool.connection() as conn:
+                        with conn.cursor() as cursor:
+                            cursor.executemany(base_query, batch_args)
+                            if commit:
+                                conn.commit()
+                            total += cursor.rowcount
+                            self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
+                except Exception as e:
+                    if commit:
+                        conn.rollback()
+                    self.log.error(f"Batch update failed: {e}")
+                    # 降级为单条更新
+                    for args, data, cond in zip(batch_args, batch_data, batch_conds):
+                        try:
+                            self._execute(base_query, args, commit=commit)
+                            total += 1
+                        except Exception as e2:
+                            self.log.error(f"Single update failed: {e2}, Data: {data}, Condition: {cond}")
+            self.log.info(f"Total updated rows: {total}")
+            return total
+        elif query is not None:
+            # 处理原始SQL和参数列表
+            if args_list is None:
+                raise ValueError("args_list must be provided when using query")
+
+            total = 0
+            for i in range(0, len(args_list), batch_size):
+                batch_args = args_list[i:i + batch_size]
+                try:
+                    with self.pool.connection() as conn:
+                        with conn.cursor() as cursor:
+                            cursor.executemany(query, batch_args)
+                            if commit:
+                                conn.commit()
+                            total += cursor.rowcount
+                            self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
+                except Exception as e:
+                    if commit:
+                        conn.rollback()
+                    self.log.error(f"Batch update failed: {e}")
+                    # 降级为单条更新
+                    for args in batch_args:
+                        try:
+                            self._execute(query, args, commit=commit)
+                            total += 1
+                        except Exception as e2:
+                            self.log.error(f"Single update failed: {e2}, Args: {args}")
+            self.log.info(f"Total updated rows: {total}")
+            return total
+        else:
+            raise ValueError("Either data_list or query must be provided")
+
+    def check_pool_health(self):
+        """
+        检查连接池中有效连接数
+
+        # 使用示例
+        # 配置 MySQL 连接池
+        sql_pool = MySQLConnectionPool(log=log)
+        if not sql_pool.check_pool_health():
+            log.error("数据库连接池异常")
+            raise RuntimeError("数据库连接池异常")
+        """
+        try:
+            with self.pool.connection() as conn:
+                conn.ping(reconnect=True)
+                return True
+        except Exception as e:
+            self.log.error(f"Connection pool health check failed: {e}")
+            return False
+
+    def close(self):
+        """
+        关闭连接池,释放所有连接
+        """
+        try:
+            if hasattr(self, 'pool') and self.pool:
+                self.pool.close()
+                self.log.info("数据库连接池已关闭")
+        except Exception as e:
+            self.log.error(f"关闭连接池失败: {e}")
+
+    @staticmethod
+    def _safe_identifier(name):
+        """SQL标识符安全校验"""
+        if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
+            raise ValueError(f"Invalid SQL identifier: {name}")
+        return name
+
+
+if __name__ == '__main__':
+    sql_pool = MySQLConnectionPool()
+    data_dic = {'card_type_id': 111, 'card_type_name': '补充包 继承的意志【OPC-13】', 'card_type_position': 964,
+                'card_id': 5284, 'card_name': '蒙奇·D·路飞', 'card_number': 'OP13-001', 'card_rarity': 'L',
+                'card_img': 'https://source.windoent.com/OnePiecePc/Picture/1757929283612OP13-001.png',
+                'card_life': '4', 'card_attribute': '打', 'card_power': '5000', 'card_attack': '-',
+                'card_color': '红/绿', 'subscript': 4, 'card_features': '超新星/草帽一伙',
+                'card_text_desc': '【咚!!×1】【对方的攻击时】我方处于活跃状态的咚!!不多于5张的场合,可以将我方任意张数的咚!!转为休息状态。每有1张转为休息状态的咚!!,本次战斗中,此领袖或我方最多1张拥有《草帽一伙》特征的角色力量+2000。',
+                'card_offer_type': '补充包 继承的意志【OPC-13】', 'crawler_language': '简中'}
+    sql_pool.insert_one_or_dict(table="one_piece_record", data=data_dic)

+ 8 - 0
cxx_spider/requirements.txt

@@ -0,0 +1,8 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+schedule==1.2.2
+tenacity==9.1.2

+ 50 - 0
cxx_spider/start_cxx_spider.py

@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/1/29 16:31
+import time
+import schedule
+import threading
+from loguru import logger
+from super_vault_daily_spider import cxx_daily_main
+from super_vault_on_sale_spider import cxx_sale_main
+
+logger.remove()
+logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+def run_threaded(job_func, *args, **kwargs):
+    """
+    在新线程中运行给定的函数,并传递参数。
+
+    :param job_func: 要运行的目标函数
+    :param args: 位置参数
+    :param kwargs: 关键字参数
+    """
+    job_thread = threading.Thread(target=job_func, args=args, kwargs=kwargs)
+    job_thread.start()
+
+
+def schedule_task():
+    """
+    爬虫模块的启动文件
+    """
+    # 立即运行一次任务
+    run_threaded(cxx_daily_main, log=logger)
+    # time.sleep(5)
+
+    run_threaded(cxx_sale_main, log=logger)
+
+    # 设置定时任务
+    schedule.every().day.at("07:01").do(run_threaded, cxx_daily_main, log=logger)
+
+    schedule.every().hour.do(run_threaded, cxx_sale_main, log=logger)
+
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+if __name__ == '__main__':
+    schedule_task()

+ 424 - 0
cxx_spider/super_vault_daily_spider.py

@@ -0,0 +1,424 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/1/28 11:12
+import inspect
+import time
+import requests
+from loguru import logger
+from mysql_pool import MySQLConnectionPool
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+"""
+SuperVault
+"""
+# logger.remove()
+# logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+#            format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+#            level="DEBUG", retention="7 day")
+
+
+HEADERS = {
+    "User-Agent": "okhttp/4.9.0",
+    # "Connection": "Keep-Alive",
+    # "Accept-Encoding": "gzip",
+    "Authorization": "",
+    "CXX-APP-API-VERSION": "V2",  # 必须添加
+    # "deviceType": "2",
+    # "udid": "20f902c10f6163a19bf137d801731d9f",
+    # "time": str(int(time.time() * 1000)),
+    "Content-Type": "application/json; charset=UTF-8"
+}
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys(log):
+    """
+    获取代理
+    :return: 代理
+    """
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_video_url(log, pid):
+    """
+    获取视频地址
+    :param log: logger对象
+    :param pid: 视频地址
+    :return: 视频地址
+    """
+    log.debug(f"正在获取视频地址: {pid}")
+    url = "https://cxx.cardsvault.net/app/teamup/detail"
+    params = {
+        # "id": "1730"
+        "id": str(pid)
+    }
+    response = requests.get(url, headers=HEADERS, params=params, timeout=22)
+    response.raise_for_status()
+    result = response.json()
+    liveInfo = result.get("data", {}).get("liveInfo", {})
+    live_id = liveInfo.get("id") if liveInfo else None
+    vodUrl = liveInfo.get("vod_info", {}).get("vodUrl")
+    return live_id, vodUrl
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_vod_single_page(log, page_num=1, token=""):
+    """
+    获取单页数据
+    :param log: logger对象
+    :param page_num: 页码
+    :param token: token
+    :return: 数据
+    """
+    url = "https://cxx.cardsvault.net/app/teamup/vod/list"
+    data = {
+        "pageSize": 20,
+        "pageNum": page_num
+    }
+    HEADERS["Authorization"] = token
+    response = requests.post(url, headers=HEADERS, json=data, timeout=22)
+    response.raise_for_status()
+
+    result = response.json()
+    # print(result)
+    if result.get("status") == 200:
+        data = result.get("data", {})
+        total = data.get("total", 0)
+        current_page = data.get("pageNum", 1)
+        items = data.get("data", [])
+
+        log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
+        log.debug(f"当前页数据数量: {len(items)}")
+
+        return {
+            "total": total,
+            "current_page": current_page,
+            "items": items,
+        }
+    else:
+        log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
+        return None
+
+
+def parse_list_items(log, items):
+    """
+    解析列表项
+    :param log: logger对象
+    :param items: 列表项
+    :return: 解析后的列表项
+    """
+    parsed_items = []
+    log.debug(f"正在解析列表项.................")
+    for item in items:
+        pid = item.get("id")
+        serial = item.get("serial")  # 编号
+        title = item.get("title")
+        type_name = item.get("typeName")  # 随机卡种
+        isPre = item.get("isPre")
+        count = item.get("count")
+
+        totalPrice = item.get("totalPrice")
+        totalPrice = totalPrice / 100 if totalPrice else 0
+        signPrice = item.get("signPrice")
+        signPrice = signPrice / 100 if signPrice else 0
+
+        sellTime = item.get("sellTime")
+        sellDays = item.get("sellDays")
+        status = item.get("status")  # 9:完成 8:待发货
+        groupNum = item.get("groupNum")
+        description = item.get("description")
+        createTime = item.get("createTime")
+        completionTime = item.get("completionTime")  # 完成时间
+        cover_url = item.get("cover", {}).get("url")  # 封面图
+
+        anchor_id = item.get("anchor", {}).get("id")
+        anchor_userName = item.get("anchor", {}).get("userName")
+        soldCount = item.get("soldCount")
+        detailUrl = item.get("detailUrl")
+        goodsUrl = item.get("goodsUrl")
+        standardName = item.get("standardName")  # 规格
+        liveTaskTime = item.get("liveTaskTime")  # 直播时间
+
+        try:
+            live_id, vodUrl = get_video_url(log, pid)
+        except Exception as e:
+            log.error(f"Error getting video URL: {e}")
+            live_id, vodUrl = None, None
+
+        parsed_item = {
+            "pid": pid,
+            "title": title,
+            "serial": serial,
+            "type_name": type_name,
+            "is_pre": isPre,
+            "count": count,
+            "total_price": totalPrice,
+            "sign_price": signPrice,
+            "sell_time": sellTime,
+            "sell_days": sellDays,
+            "status": status,
+            "group_num": groupNum,
+            "description": description,
+            "create_time": createTime,
+            "completion_time": completionTime,
+            "cover_url": cover_url,
+            "anchor_id": anchor_id,
+            "anchor_username": anchor_userName,
+            "sold_count": soldCount,
+            "detail_url": detailUrl,
+            "goods_url": goodsUrl,
+            "standard_name": standardName,
+            "live_task_time": liveTaskTime,
+            "live_id": live_id,
+            "vod_url": vodUrl
+        }
+        # print(parsed_item)
+        parsed_items.append(parsed_item)
+    return parsed_items
+
+
+def get_vod_list(log, sql_pool, token):
+    """
+    获取列表数据
+    :param log: logger对象
+    :param sql_pool: 数据库连接池
+    :param token: token
+    """
+    page_num = 1
+    max_pages = 2
+
+    while page_num <= max_pages:
+        log.debug(f"正在获取第 {page_num} 页的数据.................")
+        page_result = get_vod_single_page(log, page_num, token)
+
+        if not page_result:
+            log.error(f"获取第 {page_num} 页失败 !!!")
+            break
+
+        # 每页获取后立即解析
+        items = parse_list_items(log, page_result["items"])
+        sql_pool.insert_many(table="super_vault_product_record", data_list=items, ignore=True)
+
+        page_num += 1
+
+
+# ----------------------------------------------------------------------------------------------------------------------
+
+def get_report_single_page(log, page_num, detail_id, token):
+    """
+    获取单页数据
+    :param log: logger对象
+    :param page_num: 页码
+    :param detail_id: 商品id
+    :param token: token
+    :return: 数据
+    """
+    log.debug(f"正在获取第 {page_num} 页的 <拆卡报告> 数据.................")
+    # token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJDSEFPWElOWElORyNBUFAiLCJhdWQiOiJDSEFPWElOWElORyIsIm5iZiI6MTc2OTU4MTI5NiwiZGF0YSI6Ijk1MjMiLCJpc3MiOiI3ViNweHlQZSIsImV4cCI6MTc3MDc4MTI5NiwiaWF0IjoxNzY5NTgxMjk2LCJqdGkiOiIyYjkwNzZhMS0wYjU1LTQ0ZjItOGZlZC0yMWZiZmI0ZjUyYWIifQ.iDzTZLDslCP0y2nc2Jp4TGEsNbQiCRKcUeRsIyG3iOg"
+
+    url = "https://cxx.cardsvault.net/app/teamup/report/list"
+    data = {
+        "pageSize": 20,
+        "my": 0,
+        "pageNum": page_num,
+        # "tid": 1780
+        "tid": detail_id
+    }
+    HEADERS["Authorization"] = token
+    response = requests.post(url, headers=HEADERS, json=data, timeout=22)
+    # print(response.text)
+    response.raise_for_status()
+
+    result = response.json()
+    if result.get("status") == 200:
+        data = result.get("data", {})
+        total = data.get("total", 0)
+        current_page = data.get("pageNum", 1)
+        items = data.get("data", [])
+
+        log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
+        log.debug(f"当前页数据数量: {len(items)}")
+
+        return {
+            "total": total,
+            "current_page": current_page,
+            "items": items
+        }
+    else:
+        log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
+        return None
+
+
+def parse_report_items(log, detail_id, items):
+    """
+    解析列表项
+    :param log: logger对象
+    :param detail_id: 商品id
+    :param items: 列表项
+    :return: 解析后的列表项
+    """
+    parsed_items = []
+    log.debug(f"正在解析 <拆卡报告> 列表项.................")
+    for item in items:
+        userName = item.get("userName")
+        level = item.get("level")
+        teamNameCn = item.get("teamNameCn")
+        teamNameEn = item.get("teamNameEn")
+        count = item.get("count")
+
+        picture_url = item.get("picture", {}).get("url")
+        alias = item.get("alias")  # 别名
+        createTime = item.get("createTime")
+
+        data_dict = {
+            "pid": detail_id,
+            "user_name": userName,
+            "level": level,
+            "team_name_cn": teamNameCn,
+            "team_name_en": teamNameEn,
+            "count": count,
+            "picture_url": picture_url,
+            "alias": alias,
+            "create_time": createTime
+        }
+        parsed_items.append(data_dict)
+
+    return parsed_items
+
+
+def get_report_list(log, detail_id, token, sql_pool):
+    """
+    获取列表数据
+    :param log: logger对象
+    :param detail_id: 商品id
+    :param token: token
+    :param sql_pool: 数据库连接池
+    """
+    page_num = 1
+    total_pages = 99
+    items_per_page = 20  # pageSize
+
+    while page_num <= total_pages:
+        log.debug(f"正在获取第 {page_num} 页的数据.................")
+        page_result = get_report_single_page(log, page_num, detail_id, token)
+
+        if not page_result:
+            log.error(f"获取第 {page_num} 页失败 !!!")
+            break
+
+        # 第一次请求时更新真实的总页数
+        if page_num == 1:
+            total_count = page_result["total"]
+            if total_count == 0:
+                log.info("No new records found.")
+                # 更改状态为2
+                sql_pool.update_one_or_dict(
+                    table="super_vault_product_record",
+                    data={"report_state": 2},
+                    condition={"pid": detail_id}
+                )
+                break
+
+            total_pages = (total_count + items_per_page - 1) // items_per_page
+            log.info(f"总共 {total_pages} 页")
+
+        items = parse_report_items(log, detail_id, page_result["items"])
+        sql_pool.insert_many(table="super_vault_report_record", data_list=items, ignore=True)
+        sql_pool.update_one_or_dict(
+            table="super_vault_product_record",
+            data={"report_state": 1},
+            condition={"pid": detail_id}
+        )
+
+        page_num += 1
+
+
+@retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
+def cxx_daily_main(log):
+    """
+    主函数
+    :param log: logger对象
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool.check_pool_health():
+        log.error("数据库连接池异常")
+        raise RuntimeError("数据库连接池异常")
+
+    try:
+        token = sql_pool.select_one("SELECT token FROM super_vault_token")
+
+        # 获取所有 pid
+        # try:
+        #     get_vod_list(log, sql_pool, token[0])
+        # except Exception as e:
+        #     log.error(f"Error fetching last_product_id: {e}")
+        #
+        # time.sleep(5)
+
+        # 获取所有 report_state = 0 的 pid
+        sql_detail_id_list = sql_pool.select_all("SELECT pid FROM super_vault_product_record WHERE report_state != 1")
+        if sql_detail_id_list:
+            sql_detail_id_list = [item[0] for item in sql_detail_id_list]
+            for detail_id in sql_detail_id_list:
+                try:
+                    get_report_list(log, detail_id, token[0], sql_pool)
+                except Exception as e:
+                    log.error(f"Error fetching last_product_id: {e}")
+                    # 更改状态为3
+                    sql_pool.update_one_or_dict(
+                        table="super_vault_product_record",
+                        data={"report_state": 3},
+                        condition={"pid": detail_id}
+                    )
+        else:
+            log.info("No new records found.")
+
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+
+if __name__ == '__main__':
+    # get_vod_list(logger, None, '')
+    # get_vod_single_page(logger, 1)
+    # get_report_single_page(logger,1, '','')
+
+    cxx_daily_main(logger)
+    # schedule_task()

+ 427 - 0
cxx_spider/super_vault_history_spider.py

@@ -0,0 +1,427 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/1/28 11:12
+import inspect
+import requests
+from loguru import logger
+from mysql_pool import MySQLConnectionPool
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+"""
+SuperVault
+"""
+# logger.remove()
+# logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+#            format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+#            level="DEBUG", retention="7 day")
+
+
+HEADERS = {
+    "User-Agent": "okhttp/4.9.0",
+    # "Connection": "Keep-Alive",
+    # "Accept-Encoding": "gzip",
+    "Authorization": "",
+    "CXX-APP-API-VERSION": "V2",  # 必须添加
+    # "deviceType": "2",
+    # "udid": "20f902c10f6163a19bf137d801731d9f",
+    # "time": str(int(time.time() * 1000)),
+    "Content-Type": "application/json; charset=UTF-8"
+}
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys(log):
+    """
+    获取代理
+    :return: 代理
+    """
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_video_url(log, pid):
+    """
+    获取视频地址
+    :param log: logger对象
+    :param pid: 视频地址
+    :return: 视频地址
+    """
+    log.debug(f"正在获取视频地址: {pid}")
+    url = "https://cxx.cardsvault.net/app/teamup/detail"
+    params = {
+        # "id": "1730"
+        "id": str(pid)
+    }
+    response = requests.get(url, headers=HEADERS, params=params, timeout=22)
+    response.raise_for_status()
+    result = response.json()
+    liveInfo = result.get("data", {}).get("liveInfo", {})
+    live_id = liveInfo.get("id") if liveInfo else None
+    vodUrl = liveInfo.get("vod_info", {}).get("vodUrl")
+    return live_id, vodUrl
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_vod_single_page(log, page_num=1, token=""):
+    """
+    获取单页数据
+    :param log: logger对象
+    :param page_num: 页码
+    :param token: token
+    :return: 数据
+    """
+    url = "https://cxx.cardsvault.net/app/teamup/vod/list"
+    data = {
+        "pageSize": 20,
+        "pageNum": page_num
+    }
+    HEADERS["Authorization"] = token
+    response = requests.post(url, headers=HEADERS, json=data, timeout=22)
+    response.raise_for_status()
+
+    result = response.json()
+    # print(result)
+    if result.get("status") == 200:
+        data = result.get("data", {})
+        total = data.get("total", 0)
+        current_page = data.get("pageNum", 1)
+        items = data.get("data", [])
+
+        log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
+        log.debug(f"当前页数据数量: {len(items)}")
+
+        return {
+            "total": total,
+            "current_page": current_page,
+            "items": items,
+        }
+    else:
+        log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
+        return None
+
+
+def parse_list_items(log, items):
+    """
+    解析列表项
+    :param log: logger对象
+    :param items: 列表项
+    :return: 解析后的列表项
+    """
+    parsed_items = []
+    log.debug(f"正在解析列表项.................")
+    for item in items:
+        pid = item.get("id")
+        serial = item.get("serial")  # 编号
+        title = item.get("title")
+        type_name = item.get("typeName")  # 随机卡种
+        isPre = item.get("isPre")
+        count = item.get("count")
+
+        totalPrice = item.get("totalPrice")
+        totalPrice = totalPrice / 100 if totalPrice else 0
+        signPrice = item.get("signPrice")
+        signPrice = signPrice / 100 if signPrice else 0
+
+        sellTime = item.get("sellTime")
+        sellDays = item.get("sellDays")
+        status = item.get("status")  # 9:完成 8:待发货
+        groupNum = item.get("groupNum")
+        description = item.get("description")
+        createTime = item.get("createTime")
+        completionTime = item.get("completionTime")  # 完成时间
+        cover_url = item.get("cover", {}).get("url")  # 封面图
+
+        anchor_id = item.get("anchor", {}).get("id")
+        anchor_userName = item.get("anchor", {}).get("userName")
+        soldCount = item.get("soldCount")
+        detailUrl = item.get("detailUrl")
+        goodsUrl = item.get("goodsUrl")
+        standardName = item.get("standardName")  # 规格
+        liveTaskTime = item.get("liveTaskTime")  # 直播时间
+
+        try:
+            live_id, vodUrl = get_video_url(log, pid)
+        except Exception as e:
+            log.error(f"Error getting video URL: {e}")
+            live_id, vodUrl = None, None
+
+        parsed_item = {
+            "pid": pid,
+            "title": title,
+            "serial": serial,
+            "type_name": type_name,
+            "is_pre": isPre,
+            "count": count,
+            "total_price": totalPrice,
+            "sign_price": signPrice,
+            "sell_time": sellTime,
+            "sell_days": sellDays,
+            "status": status,
+            "group_num": groupNum,
+            "description": description,
+            "create_time": createTime,
+            "completion_time": completionTime,
+            "cover_url": cover_url,
+            "anchor_id": anchor_id,
+            "anchor_username": anchor_userName,
+            "sold_count": soldCount,
+            "detail_url": detailUrl,
+            "goods_url": goodsUrl,
+            "standard_name": standardName,
+            "live_task_time": liveTaskTime,
+            "live_id": live_id,
+            "vod_url": vodUrl
+        }
+        # print(parsed_item)
+        parsed_items.append(parsed_item)
+    return parsed_items
+
+
+def get_vod_list(log, sql_pool, token):
+    """
+    获取列表数据
+    :param log: logger对象
+    :param sql_pool: 数据库连接池
+    :param token: token
+    """
+    page_num = 1
+    total_pages = 999
+    items_per_page = 20  # pageSize
+
+    while page_num <= total_pages:
+        log.debug(f"正在获取第 {page_num} 页的数据.................")
+        page_result = get_vod_single_page(log, page_num, token)
+
+        if not page_result:
+            log.error(f"获取第 {page_num} 页失败 !!!")
+            break
+
+        # 第一次请求时更新真实的总页数
+        if page_num == 1:
+            total_count = page_result["total"]
+            total_pages = (total_count + items_per_page - 1) // items_per_page
+            log.info(f"总共 {total_pages} 页")
+
+        # 每页获取后立即解析
+        items = parse_list_items(log, page_result["items"])
+        sql_pool.insert_many(table="super_vault_product_record", data_list=items, ignore=True)
+
+        page_num += 1
+
+
+# ----------------------------------------------------------------------------------------------------------------------
+
+def get_report_single_page(log, page_num, detail_id, token):
+    """
+    获取单页数据
+    :param log: logger对象
+    :param page_num: 页码
+    :param detail_id: 商品id
+    :param token: token
+    :return: 数据
+    """
+    log.debug(f"正在获取第 {page_num} 页的 <拆卡报告> 数据.................")
+    # token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJDSEFPWElOWElORyNBUFAiLCJhdWQiOiJDSEFPWElOWElORyIsIm5iZiI6MTc2OTU4MTI5NiwiZGF0YSI6Ijk1MjMiLCJpc3MiOiI3ViNweHlQZSIsImV4cCI6MTc3MDc4MTI5NiwiaWF0IjoxNzY5NTgxMjk2LCJqdGkiOiIyYjkwNzZhMS0wYjU1LTQ0ZjItOGZlZC0yMWZiZmI0ZjUyYWIifQ.iDzTZLDslCP0y2nc2Jp4TGEsNbQiCRKcUeRsIyG3iOg"
+
+    url = "https://cxx.cardsvault.net/app/teamup/report/list"
+    data = {
+        "pageSize": 20,
+        "my": 0,
+        "pageNum": page_num,
+        # "tid": 1780
+        "tid": detail_id
+    }
+    HEADERS["Authorization"] = token
+    response = requests.post(url, headers=HEADERS, json=data, timeout=22)
+    # print(response.text)
+    response.raise_for_status()
+
+    result = response.json()
+    if result.get("status") == 200:
+        data = result.get("data", {})
+        total = data.get("total", 0)
+        current_page = data.get("pageNum", 1)
+        items = data.get("data", [])
+
+        log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
+        log.debug(f"当前页数据数量: {len(items)}")
+
+        return {
+            "total": total,
+            "current_page": current_page,
+            "items": items
+        }
+    else:
+        log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
+        return None
+
+
+def parse_report_items(log, detail_id, items):
+    """
+    解析列表项
+    :param log: logger对象
+    :param detail_id: 商品id
+    :param items: 列表项
+    :return: 解析后的列表项
+    """
+    parsed_items = []
+    log.debug(f"正在解析 <拆卡报告> 列表项.................")
+    for item in items:
+        userName = item.get("userName")
+        level = item.get("level")
+        teamNameCn = item.get("teamNameCn")
+        teamNameEn = item.get("teamNameEn")
+        count = item.get("count")
+
+        picture_url = item.get("picture", {}).get("url")
+        alias = item.get("alias")  # 别名
+        createTime = item.get("createTime")
+
+        data_dict = {
+            "pid": detail_id,
+            "user_name": userName,
+            "level": level,
+            "team_name_cn": teamNameCn,
+            "team_name_en": teamNameEn,
+            "count": count,
+            "picture_url": picture_url,
+            "alias": alias,
+            "create_time": createTime
+        }
+        parsed_items.append(data_dict)
+
+    return parsed_items
+
+
+def get_report_list(log, detail_id, token, sql_pool):
+    """
+    获取列表数据
+    :param log: logger对象
+    :param detail_id: 商品id
+    :param token: token
+    :param sql_pool: 数据库连接池
+    """
+    page_num = 1
+    total_pages = 9
+    items_per_page = 20  # pageSize
+
+    while page_num <= total_pages:
+        log.debug(f"正在获取第 {page_num} 页的数据.................")
+        page_result = get_report_single_page(log, page_num, detail_id, token)
+
+        if not page_result:
+            log.error(f"获取第 {page_num} 页失败 !!!")
+            break
+
+        # 第一次请求时更新真实的总页数
+        if page_num == 1:
+            total_count = page_result["total"]
+            if total_count == 0:
+                log.info("No new records found.")
+                # 更改状态为2
+                sql_pool.update_one_or_dict(
+                    table="super_vault_product_record",
+                    data={"report_state": 2},
+                    condition={"pid": detail_id}
+                )
+                break
+
+            total_pages = (total_count + items_per_page - 1) // items_per_page
+            log.info(f"总共 {total_pages} 页")
+
+        items = parse_report_items(log, detail_id, page_result["items"])
+        sql_pool.insert_many(table="super_vault_report_record", data_list=items, ignore=True)
+
+        page_num += 1
+
+
+@retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
+def cxx_his_main(log):
+    """
+    主函数
+    :param log: logger对象
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool.check_pool_health():
+        log.error("数据库连接池异常")
+        raise RuntimeError("数据库连接池异常")
+
+    try:
+        token = sql_pool.select_one("SELECT token FROM super_vault_token")
+
+        # 获取所有 pid
+        try:
+            get_vod_list(log, sql_pool, token[0])
+        except Exception as e:
+            log.error(f"Error fetching last_product_id: {e}")
+
+        # time.sleep(5)
+
+        # 获取所有 report_state = 0 的 pid
+        sql_detail_id_list = sql_pool.select_all("SELECT pid FROM super_vault_product_record WHERE report_state = 0")
+        if sql_detail_id_list:
+            sql_detail_id_list = [item[0] for item in sql_detail_id_list]
+            for detail_id in sql_detail_id_list:
+                try:
+                    get_report_list(log, detail_id, token[0], sql_pool)
+                    sql_pool.update_one_or_dict(
+                        table="super_vault_product_record",
+                        data={"report_state": 1},
+                        condition={"pid": detail_id}
+                    )
+                except Exception as e:
+                    log.error(f"Error fetching last_product_id: {e}")
+                    # 更改状态为3
+                    sql_pool.update_one_or_dict(
+                        table="super_vault_product_record",
+                        data={"report_state": 3},
+                        condition={"pid": detail_id}
+                    )
+        else:
+            log.info("No new records found.")
+
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+if __name__ == '__main__':
+    # get_vod_list(logger, None, '')
+    # get_vod_single_page(logger, 1)
+    cxx_his_main(logger)
+    # get_report_single_page(logger,1, '','')

+ 220 - 0
cxx_spider/super_vault_on_sale_spider.py

@@ -0,0 +1,220 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/1/28 11:12
+import inspect
+import requests
+from loguru import logger
+from datetime import datetime
+from mysql_pool import MySQLConnectionPool
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+"""
+SuperVault
+"""
+# logger.remove()
+# logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+#            format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+#            level="DEBUG", retention="7 day")
+
+
+HEADERS = {
+    "User-Agent": "okhttp/4.9.0",
+    # "Connection": "Keep-Alive",
+    # "Accept-Encoding": "gzip",
+    "Authorization": "",
+    "CXX-APP-API-VERSION": "V2",  # 必须添加
+    # "deviceType": "2",
+    # "udid": "20f902c10f6163a19bf137d801731d9f",
+    # "time": str(int(time.time() * 1000)),
+    "Content-Type": "application/json; charset=UTF-8"
+}
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_vod_single_page(log, page_num=1):
+    """
+    获取单页数据
+    :param log: logger对象
+    :param page_num: 页码
+    :return: 数据
+    """
+    url = "https://cxx.cardsvault.net/app/teamup/list"
+    data = {
+        "pageSize": 20,
+        "pageNum": page_num
+    }
+    response = requests.post(url, headers=HEADERS, json=data, timeout=22)
+    response.raise_for_status()
+
+    result = response.json()
+    # print(result)
+    if result.get("status") == 200:
+        data = result.get("data", {})
+        total = data.get("total", 0)
+        current_page = data.get("pageNum", 1)
+        items = data.get("data", [])
+
+        log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
+        log.debug(f"当前页数据数量: {len(items)}")
+
+        return {
+            "total": total,
+            "current_page": current_page,
+            "items": items,
+        }
+    else:
+        log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
+        return None
+
+
+def parse_list_items(log, items):
+    """
+    解析列表项
+    :param log: logger对象
+    :param items: 列表项
+    :return: 解析后的列表项
+    """
+    parsed_items = []
+    log.debug(f"正在解析列表项.................")
+    for item in items:
+        pid = item.get("id")
+        serial = item.get("serial")  # 编号
+        title = item.get("title")
+        type_name = item.get("typeName")  # 随机卡种
+        isPre = item.get("isPre")
+        count = item.get("count")
+
+        totalPrice = item.get("totalPrice")
+        totalPrice = totalPrice / 100 if totalPrice else 0
+        signPrice = item.get("signPrice")
+        signPrice = signPrice / 100 if signPrice else 0
+
+        sellTime = item.get("sellTime")
+        sellDays = item.get("sellDays")
+        status = item.get("status")  # 9:完成 8:待发货
+        statusName = item.get("statusName")
+        description = item.get("description")
+        createTime = item.get("createTime")
+        cover_url = item.get("cover", {}).get("url")  # 封面图
+
+        anchor_id = item.get("anchor", {}).get("id")
+        anchor_userName = item.get("anchor", {}).get("userName")
+        soldCount = item.get("soldCount")
+        detailUrl = item.get("detailUrl")
+        goodsUrl = item.get("goodsUrl")
+        standardName = item.get("standardName")  # 规格
+
+        crawl_date = datetime.now().strftime("%Y-%m-%d")
+
+        parsed_item = {
+            "pid": pid,
+            "title": title,
+            "serial": serial,
+            "type_name": type_name,
+            "is_pre": isPre,
+            "count": count,
+            "total_price": totalPrice,
+            "sign_price": signPrice,
+            "sell_time": sellTime,
+            "sell_days": sellDays,
+            "status": status,
+            "status_name": statusName,
+            "description": description,
+            "create_time": createTime,
+            "cover_url": cover_url,
+            "anchor_id": anchor_id,
+            "anchor_username": anchor_userName,
+            "sold_count": soldCount,
+            "detail_url": detailUrl,
+            "goods_url": goodsUrl,
+            "standard_name": standardName,
+            "crawl_date": crawl_date
+        }
+        # print(parsed_item)
+        parsed_items.append(parsed_item)
+    return parsed_items
+
+
+def get_vod_list(log, sql_pool):
+    """
+    获取列表数据
+    :param log: logger对象
+    :param sql_pool: 数据库连接池
+    """
+    page_num = 1
+    total_pages = 9
+    items_per_page = 20  # pageSize
+
+    while page_num <= total_pages:
+        log.debug(f"正在获取第 {page_num} 页的数据.................")
+        page_result = get_vod_single_page(log, page_num)
+
+        if not page_result:
+            log.error(f"获取第 {page_num} 页失败 !!!")
+            break
+
+        # 第一次请求时更新真实的总页数
+        if page_num == 1:
+            total_count = page_result["total"]
+            total_pages = (total_count + items_per_page - 1) // items_per_page
+            log.info(f"总共 {total_pages} 页")
+
+        # 每页获取后立即解析
+        items = parse_list_items(log, page_result["items"])
+        sql_pool.insert_many(table="super_vault_on_sale_record", data_list=items, ignore=True)
+
+        page_num += 1
+
+
+@retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
+def cxx_sale_main(log):
+    """
+    主函数
+    :param log: logger对象
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool.check_pool_health():
+        log.error("数据库连接池异常")
+        raise RuntimeError("数据库连接池异常")
+
+    try:
+        # 获取所有 pid
+        try:
+            get_vod_list(log, sql_pool)
+        except Exception as e:
+            log.error(f"Error fetching last_product_id: {e}")
+
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+if __name__ == '__main__':
+    # get_vod_list(logger, None)
+    # get_vod_single_page(logger, 1)
+    cxx_sale_main(logger)
+    # schedule_task()

+ 7 - 0
feishezhang_spider/requirements.txt

@@ -0,0 +1,7 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+tenacity==9.1.2

+ 11 - 0
hoopi_spider/requirements.txt

@@ -0,0 +1,11 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+numpy==1.26.4
+opencv-contrib-python==4.6.0.66
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+schedule==1.2.2
+selenium==4.39.0
+tenacity==9.1.2

+ 9 - 0
magic_card_spider/requirements.txt

@@ -0,0 +1,9 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+parsel==1.10.0
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+tenacity==9.1.2
+user_agent==0.1.14

+ 9 - 0
one_piece_spider/requirements.txt

@@ -0,0 +1,9 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+parsel==1.10.0
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+tenacity==9.1.2
+user_agent==0.1.14

+ 8 - 0
pcg_spider/requirements.txt

@@ -0,0 +1,8 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+tenacity==9.1.2
+user_agent==0.1.14

+ 12 - 0
pokemon_tcg_spider/requirements.txt

@@ -0,0 +1,12 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+parsel==1.10.0
+pycryptodome==3.23.0
+PyExecJS==1.5.1
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+schedule==1.2.2
+tenacity==9.1.2
+user_agent==0.1.14

+ 106 - 0
popmart_spider/CloudflareBypasser.py

@@ -0,0 +1,106 @@
+import time
+from DrissionPage import ChromiumPage
+from loguru import logger
+
+
+class CloudflareBypasser:
+    """
+    用于绕过 Cloudflare 页面
+    :param driver: ChromiumPage 对象
+    :param max_retries: 最大重试次数,默认为 -1
+    :param log: 日志记录器,默认为 None
+    """
+    def __init__(self, driver: ChromiumPage, max_retries=-1, log=None):
+        self.driver = driver
+        self.max_retries = max_retries
+        # self.log = log
+        self.log = log or logger
+
+    def search_recursively_shadow_root_with_iframe(self,ele):
+        if ele.shadow_root:
+            if ele.shadow_root.child().tag == "iframe":
+                return ele.shadow_root.child()
+        else:
+            for child in ele.children():
+                result = self.search_recursively_shadow_root_with_iframe(child)
+                if result:
+                    return result
+        return None
+
+    def search_recursively_shadow_root_with_cf_input(self,ele):
+        if ele.shadow_root:
+            if ele.shadow_root.ele("tag:input"):
+                return ele.shadow_root.ele("tag:input")
+        else:
+            for child in ele.children():
+                result = self.search_recursively_shadow_root_with_cf_input(child)
+                if result:
+                    return result
+        return None
+    
+    def locate_cf_button(self):
+        button = None
+        eles = self.driver.eles("tag:input")
+        for ele in eles:
+            if "name" in ele.attrs.keys() and "type" in ele.attrs.keys():
+                if "turnstile" in ele.attrs["name"] and ele.attrs["type"] == "hidden":
+                    button = ele.parent().shadow_root.child()("tag:body").shadow_root("tag:input")
+                    break
+            
+        if button:
+            return button
+        else:
+            # If the button is not found, search it recursively
+            self.log_message("Basic search failed. Searching for button recursively.")
+            ele = self.driver.ele("tag:body")
+            iframe = self.search_recursively_shadow_root_with_iframe(ele)
+            if iframe:
+                button = self.search_recursively_shadow_root_with_cf_input(iframe("tag:body"))
+            else:
+                self.log_message("Iframe not found. Button search failed.")
+            return button
+
+    def log_message(self, message):
+        if self.log:
+            # print(message)
+            self.log.info(message)
+
+    def click_verification_button(self):
+        try:
+            button = self.locate_cf_button()
+            if button:
+                self.log_message("Verification button found. Attempting to click.")
+                button.click()
+            else:
+                self.log_message("Verification button not found.")
+
+        except Exception as e:
+            self.log_message(f"Error clicking verification button: {e}")
+
+    def is_bypassed(self):
+        try:
+            title = self.driver.title.lower()
+            return "just a moment" not in title
+        except Exception as e:
+            self.log_message(f"Error checking page title: {e}")
+            return False
+
+    def bypass(self):
+        
+        try_count = 0
+
+        while not self.is_bypassed():
+            if 0 < self.max_retries + 1 <= try_count:
+                self.log_message("Exceeded maximum retries. Bypass failed.")
+                break
+
+            self.log_message(f"Attempt {try_count + 1}: Verification page detected. Trying to bypass...")
+            self.click_verification_button()
+
+            try_count += 1
+            time.sleep(2)
+
+        if self.is_bypassed():
+            self.log_message("Bypass successful.")
+        else:
+            self.log_message("Bypass failed.")

+ 74 - 0
popmart_spider/YamlLoader.py

@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+#
+import os, re
+import yaml
+
+regex = re.compile(r'^\$\{(?P<ENV>[A-Z_\-]+\:)?(?P<VAL>[\w\.]+)\}$')
+
+class YamlConfig:
+    def __init__(self, config):
+        self.config = config
+
+    def get(self, key:str):
+        return YamlConfig(self.config.get(key))
+    
+    def getValueAsString(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return os.getenv(env, group['VAL'])
+            return None
+        except:
+            return self.config[key]
+    
+    def getValueAsInt(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return int(os.getenv(env, group['VAL']))
+            return 0
+        except:
+            return int(self.config[key])
+        
+    def getValueAsBool(self, key: str, env: str = None):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] != None:
+                env = group['ENV'][:-1]
+                return bool(os.getenv(env, group['VAL']))
+            return False
+        except:
+            return bool(self.config[key])
+    
+def readYaml(path:str = 'application.yml', profile:str = None) -> YamlConfig:
+    if os.path.exists(path):
+        with open(path) as fd:
+            conf = yaml.load(fd, Loader=yaml.FullLoader)
+
+    if profile != None:
+        result = path.split('.')
+        profiledYaml = f'{result[0]}-{profile}.{result[1]}'
+        if os.path.exists(profiledYaml):
+            with open(profiledYaml) as fd:
+                conf.update(yaml.load(fd, Loader=yaml.FullLoader))
+
+    return YamlConfig(conf)
+
+# res = readYaml()
+# mysqlConf = res.get('mysql')
+# print(mysqlConf)
+
+# print(res.getValueAsString("host"))
+# mysqlYaml = mysqlConf.getValueAsString("host")
+# print(mysqlYaml)
+# host = mysqlYaml.get("host").split(':')[-1][:-1]
+# port = mysqlYaml.get("port").split(':')[-1][:-1]
+# username = mysqlYaml.get("username").split(':')[-1][:-1]
+# password = mysqlYaml.get("password").split(':')[-1][:-1]
+# mysql_db = mysqlYaml.get("db").split(':')[-1][:-1]
+# print(host,port,username,password)

+ 6 - 0
popmart_spider/application.yml

@@ -0,0 +1,6 @@
+mysql:
+  host: ${MYSQL_HOST:100.64.0.21}
+  port: ${MYSQL_PROT:3306}
+  username: ${MYSQL_USERNAME:crawler}
+  password: ${MYSQL_PASSWORD:Pass2022}
+  db: ${MYSQL_DATABASE:crawler}

+ 531 - 0
popmart_spider/mysql_pool.py

@@ -0,0 +1,531 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/3/25 14:14
+import re
+import pymysql
+import YamlLoader
+from loguru import logger
+from dbutils.pooled_db import PooledDB
+
+# 获取yaml配置
+yaml = YamlLoader.readYaml()
+mysqlYaml = yaml.get("mysql")
+sql_host = mysqlYaml.getValueAsString("host")
+sql_port = mysqlYaml.getValueAsInt("port")
+sql_user = mysqlYaml.getValueAsString("username")
+sql_password = mysqlYaml.getValueAsString("password")
+sql_db = mysqlYaml.getValueAsString("db")
+
+
+class MySQLConnectionPool:
+    """
+    MySQL连接池
+    """
+
+    def __init__(self, mincached=4, maxcached=5, maxconnections=10, log=None):
+        """
+        初始化连接池
+        :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
+        :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
+        :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
+        :param log: 自定义日志记录器
+        """
+        # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
+        self.log = log or logger
+        self.pool = PooledDB(
+            creator=pymysql,
+            mincached=mincached,
+            maxcached=maxcached,
+            maxconnections=maxconnections,
+            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
+            host=sql_host,
+            port=sql_port,
+            user=sql_user,
+            password=sql_password,
+            database=sql_db,
+            ping=0  # 每次连接使用时自动检查有效性(0=不检查,1=执行query前检查,2=每次执行前检查)
+        )
+
+    def _execute(self, query, args=None, commit=False):
+        """
+        执行SQL
+        :param query: SQL语句
+        :param args: SQL参数
+        :param commit: 是否提交事务
+        :return: 查询结果
+        """
+        try:
+            with self.pool.connection() as conn:
+                with conn.cursor() as cursor:
+                    cursor.execute(query, args)
+                    if commit:
+                        conn.commit()
+                    self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
+                    return cursor
+        except Exception as e:
+            if commit:
+                conn.rollback()
+            self.log.error(f"Error executing query: {e}, Query: {query}, Args: {args}")
+            raise e
+
+    def select_one(self, query, args=None):
+        """
+        执行查询,返回单个结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchone()
+
+    def select_all(self, query, args=None):
+        """
+        执行查询,返回所有结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchall()
+
+    def insert_one(self, query, args):
+        """
+        执行单条插入语句
+        :param query: 插入语句
+        :param args: 插入参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        cursor = self._execute(query, args, commit=True)
+        return cursor.lastrowid  # 返回插入的ID
+
+    def insert_all(self, query, args_list):
+        """
+        执行批量插入语句,如果失败则逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql insert_all, SQL: {query}, Rows: {len(args_list)}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}")
+            # 如果批量插入失败,则逐条插入
+            rowcount = 0
+            for args in args_list:
+                self.insert_one(query, args)
+                rowcount += 1
+            self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.")
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def insert_one_or_dict(self, table=None, data=None, query=None, args=None, commit=True):
+        """
+        单条插入(支持字典或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data: 字典数据 {列名: 值}
+        :param query: 直接SQL语句(与data二选一)
+        :param args: SQL参数(query使用时必需)
+        :param commit: 是否自动提交
+        :return: 最后插入ID
+        """
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data.keys()])
+            values = ', '.join(['%s'] * len(data))
+            query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args = tuple(data.values())
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        cursor = self._execute(query, args, commit)
+        self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return cursor.lastrowid
+
+    def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=500, commit=True):
+        """
+        批量插入(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+            query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+            except pymysql.Error as e:
+                if "Duplicate entry" in str(e):
+                    # self.log.warning(f"检测到重复条目,开始逐条插入。错误详情: {e}")
+                    raise  e
+                    # rowcount = 0
+                    # for args in batch:
+                    #     try:
+                    #         self.insert_one_or_dict(table=table, data=dict(zip(data_list[0].keys(), args)),
+                    #                                 commit=commit)
+                    #         rowcount += 1
+                    #     except pymysql.err.IntegrityError as e2:
+                    #         if "Duplicate entry" in str(e2):
+                    #             self.log.warning(f"跳过重复条目: {args}")
+                    #         else:
+                    #             self.log.error(f"插入失败: {e2}, 参数: {args}")
+                    # total += rowcount
+                else:
+                    self.log.error(f"数据库错误: {e}")
+                    if commit:
+                        conn.rollback()
+                    raise e
+                # 重新抛出异常,供外部捕获
+                # 降级为单条插入
+                # for args in batch:
+                #     try:
+                #         self.insert_one_or_dict(table=None, query=query, args=args, commit=commit)
+                #         total += 1
+                #     except Exception as e2:
+                #         self.log.error(f"Single insert failed: {e2}")
+                        # continue
+        self.log.info(f"sql insert_many, Table: {table}, Total Rows: {total}")
+        return total
+
+    def insert_many_two(self, table=None, data_list=None, query=None, args_list=None, batch_size=500, commit=True):
+        """
+        批量插入(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+            query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        # 添加调试日志:输出 SQL 和参数示例
+                        # self.log.debug(f"Batch insert SQL: {query}")
+                        # self.log.debug(f"Sample args: {batch[0] if batch else 'None'}")
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+                        # self.log.debug(f"Batch insert succeeded. Rows: {cursor.rowcount}")
+            except Exception as e:  # 明确捕获数据库异常
+                self.log.exception(f"Batch insert failed: {e}")  # 使用 exception 记录堆栈
+                self.log.error(f"Failed SQL: {query}, Args count: {len(batch)}")
+                if commit:
+                    conn.rollback()
+                # 降级为单条插入,并记录每个错误
+                rowcount = 0
+                for args in batch:
+                    try:
+                        self.insert_one(query, args)
+                        rowcount += 1
+                    except Exception as e2:
+                        self.log.error(f"Single insert failed: {e2}, Args: {args}")
+                total += rowcount
+                self.log.debug(f"Inserted {rowcount}/{len(batch)} rows individually.")
+        self.log.info(f"sql insert_many, Table: {table}, Total Rows: {total}")
+        return total
+
+    def insert_too_many(self, query, args_list, batch_size=1000):
+        """
+        执行批量插入语句,分片提交, 单次插入大于十万+时可用, 如果失败则降级为逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        :param batch_size: 每次插入的条数
+        """
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        conn.commit()
+            except Exception as e:
+                self.log.error(f"insert_too_many error. Trying single insert. Error: {e}")
+                # 当前批次降级为单条插入
+                for args in batch:
+                    self.insert_one(query, args)
+
+    def update_one(self, query, args):
+        """
+        执行单条更新语句
+        :param query: 更新语句
+        :param args: 更新参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def update_all(self, query, args_list):
+        """
+        执行批量更新语句,如果失败则逐条更新
+        :param query: 更新语句
+        :param args_list: 更新参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql update_all, SQL: {query}, Rows: {len(args_list)}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            # 如果批量更新失败,则逐条更新
+            rowcount = 0
+            for args in args_list:
+                self.update_one(query, args)
+                rowcount += 1
+            self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def update_one_or_dict(self, table=None, data=None, condition=None, query=None, args=None, commit=True):
+        """
+        单条更新(支持字典或原始SQL)
+        :param table: 表名(字典模式必需)
+        :param data: 字典数据 {列名: 值}(与 query 二选一)
+        :param condition: 更新条件,支持以下格式:
+            - 字典: {"id": 1} → "WHERE id = %s"
+            - 字符串: "id = 1" → "WHERE id = 1"(需自行确保安全)
+            - 元组: ("id = %s", [1]) → "WHERE id = %s"(参数化查询)
+        :param query: 直接SQL语句(与 data 二选一)
+        :param args: SQL参数(query 模式下必需)
+        :param commit: 是否自动提交
+        :return: 影响行数
+        :raises: ValueError 参数校验失败时抛出
+        """
+        # 参数校验
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+            if table is None:
+                raise ValueError("Table name is required for dictionary update")
+            if condition is None:
+                raise ValueError("Condition is required for dictionary update")
+
+            # 构建 SET 子句
+            set_clause = ", ".join([f"{self._safe_identifier(k)} = %s" for k in data.keys()])
+            set_values = list(data.values())
+
+            # 解析条件
+            condition_clause, condition_args = self._parse_condition(condition)
+            query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
+            args = set_values + condition_args
+
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        # 执行更新
+        cursor = self._execute(query, args, commit)
+        # self.log.debug(
+        #     f"Updated table={table}, rows={cursor.rowcount}, query={query[:100]}...",
+        #     extra={"table": table, "rows": cursor.rowcount}
+        # )
+        return cursor.rowcount
+
+    def _parse_condition(self, condition):
+        """
+        解析条件为 (clause, args) 格式
+        :param condition: 字典/字符串/元组
+        :return: (str, list) SQL 子句和参数列表
+        """
+        if isinstance(condition, dict):
+            clause = " AND ".join([f"{self._safe_identifier(k)} = %s" for k in condition.keys()])
+            args = list(condition.values())
+        elif isinstance(condition, str):
+            clause = condition  # 注意:需调用方确保安全
+            args = []
+        elif isinstance(condition, (tuple, list)) and len(condition) == 2:
+            clause, args = condition[0], condition[1]
+            if not isinstance(args, (list, tuple)):
+                args = [args]
+        else:
+            raise ValueError("Condition must be dict/str/(clause, args)")
+        return clause, args
+
+    def update_many(self, table=None, data_list=None, condition_list=None, query=None, args_list=None, batch_size=500,
+                    commit=True):
+        """
+        批量更新(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param condition_list: 条件列表(必须为字典,与data_list等长)
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+            if condition_list is None or len(data_list) != len(condition_list):
+                raise ValueError("Condition_list must be provided and match the length of data_list")
+            if not all(isinstance(cond, dict) for cond in condition_list):
+                raise ValueError("All elements in condition_list must be dictionaries")
+
+            # 获取第一个数据项和条件项的键
+            first_data_keys = set(data_list[0].keys())
+            first_cond_keys = set(condition_list[0].keys())
+
+            # 构造基础SQL
+            set_clause = ', '.join([self._safe_identifier(k) + ' = %s' for k in data_list[0].keys()])
+            condition_clause = ' AND '.join([self._safe_identifier(k) + ' = %s' for k in condition_list[0].keys()])
+            base_query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
+            total = 0
+
+            # 分批次处理
+            for i in range(0, len(data_list), batch_size):
+                batch_data = data_list[i:i + batch_size]
+                batch_conds = condition_list[i:i + batch_size]
+                batch_args = []
+
+                # 检查当前批次的结构是否一致
+                can_batch = True
+                for data, cond in zip(batch_data, batch_conds):
+                    data_keys = set(data.keys())
+                    cond_keys = set(cond.keys())
+                    if data_keys != first_data_keys or cond_keys != first_cond_keys:
+                        can_batch = False
+                        break
+                    batch_args.append(tuple(data.values()) + tuple(cond.values()))
+
+                if not can_batch:
+                    # 结构不一致,转为单条更新
+                    for data, cond in zip(batch_data, batch_conds):
+                        self.update_one_or_dict(table=table, data=data, condition=cond, commit=commit)
+                        total += 1
+                    continue
+
+                # 执行批量更新
+                try:
+                    with self.pool.connection() as conn:
+                        with conn.cursor() as cursor:
+                            cursor.executemany(base_query, batch_args)
+                            if commit:
+                                conn.commit()
+                            total += cursor.rowcount
+                            self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
+                except Exception as e:
+                    if commit:
+                        conn.rollback()
+                    self.log.error(f"Batch update failed: {e}")
+                    # 降级为单条更新
+                    for args, data, cond in zip(batch_args, batch_data, batch_conds):
+                        try:
+                            self._execute(base_query, args, commit=commit)
+                            total += 1
+                        except Exception as e2:
+                            self.log.error(f"Single update failed: {e2}, Data: {data}, Condition: {cond}")
+            self.log.info(f"Total updated rows: {total}")
+            return total
+        elif query is not None:
+            # 处理原始SQL和参数列表
+            if args_list is None:
+                raise ValueError("args_list must be provided when using query")
+
+            total = 0
+            for i in range(0, len(args_list), batch_size):
+                batch_args = args_list[i:i + batch_size]
+                try:
+                    with self.pool.connection() as conn:
+                        with conn.cursor() as cursor:
+                            cursor.executemany(query, batch_args)
+                            if commit:
+                                conn.commit()
+                            total += cursor.rowcount
+                            self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
+                except Exception as e:
+                    if commit:
+                        conn.rollback()
+                    self.log.error(f"Batch update failed: {e}")
+                    # 降级为单条更新
+                    for args in batch_args:
+                        try:
+                            self._execute(query, args, commit=commit)
+                            total += 1
+                        except Exception as e2:
+                            self.log.error(f"Single update failed: {e2}, Args: {args}")
+            self.log.info(f"Total updated rows: {total}")
+            return total
+        else:
+            raise ValueError("Either data_list or query must be provided")
+
+    def check_pool_health(self):
+        """
+        检查连接池中有效连接数
+
+        # 使用示例
+        # 配置 MySQL 连接池
+        sql_pool = MySQLConnectionPool(log=log)
+        if not sql_pool.check_pool_health():
+            log.error("数据库连接池异常")
+            raise RuntimeError("数据库连接池异常")
+        """
+        try:
+            with self.pool.connection() as conn:
+                conn.ping(reconnect=True)
+                return True
+        except Exception as e:
+            self.log.error(f"Connection pool health check failed: {e}")
+            return False
+
+    @staticmethod
+    def _safe_identifier(name):
+        """SQL标识符安全校验"""
+        if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
+            raise ValueError(f"Invalid SQL identifier: {name}")
+        return name

+ 242 - 0
popmart_spider/popmart_lazada_spider.py

@@ -0,0 +1,242 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/6/16 13:48
+import random
+import time
+import inspect
+import requests
+import schedule
+from loguru import logger
+from mysql_pool import MySQLConnectionPool
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+logger.remove()
+logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys(log):
+    """
+    获取代理
+    :return: 代理
+    """
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+@retry(stop=stop_after_attempt(10), wait=wait_fixed(2), after=after_log)
+def get_resp_one_page(log, page):
+    log.debug(f"Getting page {page}..............................")
+    headers = {
+        "accept": "application/json, text/plain, */*",
+        "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
+        "bx-v": "2.5.31",
+        "priority": "u=1, i",
+        "referer": "https://www.lazada.com.my/",
+        "sec-ch-ua": "\"Google Chrome\";v=\"137\", \"Chromium\";v=\"137\", \"Not/A)Brand\";v=\"24\"",
+        "sec-ch-ua-mobile": "?0",
+        "sec-ch-ua-platform": "\"Windows\"",
+        "sec-fetch-dest": "empty",
+        "sec-fetch-mode": "cors",
+        "sec-fetch-site": "same-origin",
+        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
+        # "x-csrf-token": "e4b1ee587f333"
+    }
+    cookies = {
+        "__wpkreporterwid_": "9a807029-4faf-408f-8741-5bd7a5c9bb49",
+        "lazada_share_info": "1943013429_100_100_0_1943015429_null",
+        "t_fv": "1750044788744",
+        "t_uid": "IGNxirTJDPLVkegWMu0OjthsufRjWNgo",
+        "cna": "dn7WIL8YbU0CAd+mqILgq5w2",
+        "hng": "MY|en-MY|MYR|458",
+        "hng.sig": "cLZ14_ZixioeDzk3FYgOHWtqfULyySre3d96Ouq-H6k",
+        "EGG_SESS": "S_Gs1wHo9OvRHCMp98md7JBvEEnBpIGRATFScrAN0TSKROM9JS5ucnMrutmhJ6e0wTv_5ShoQDavnBd8GRhoufb8Sr_sUum68vg6woIDhiBw82Q8q_zFDAesYtMyDMb_pOyNXueCNJYEWgZDx1h2vQdEJsqeKN4AWpCQgGp-y5g=",
+        "lwrid": "AgGXdtASjDNi5VqqCd6sX39uIxoL",
+        "lzd_cid": "ab175d2a-6612-4a32-b759-475063ce8e55",
+        "lzd_sid": "1e76f843e0504fb8ee3940a1b599e03d",
+        "_tb_token_": "e4b1ee587f333",
+        "xlly_s": "1",
+        "lwrtk": "AAIEaFAB7osFX5Jf9a73Sg0oQN02mKk6aQXx1EKkYzrYCsXMq5K8D/I=",
+        "_bl_uid": "g3m1Xbthy3wj3Uo1tqv3bhjhjdp9",
+        "_gcl_au": "1.1.208579588.1750045133",
+        "_ga": "GA1.3.1640854487.1750045147",
+        "_gid": "GA1.3.661757065.1750045147",
+        "AMCVS_126E248D54200F960A4C98C6%40AdobeOrg": "1",
+        "AMCV_126E248D54200F960A4C98C6%40AdobeOrg": "-1124106680%7CMCIDTS%7C20256%7CMCMID%7C67086440579640317143819662540753251056%7CMCAAMLH-1750649953%7C11%7CMCAAMB-1750649953%7CRKhpRz8krg2tLO6pguXWp5olkAcUniQYPHaMWWgdJ3xzPWQmdj0y%7CMCOPTOUT-1750052353s%7CNONE%7CvVersion%7C5.2.0",
+        "_m_h5_tk": "a76b9e5019c389f94d6cdde2ad6ed1fd_1750060971812",
+        "_m_h5_tk_enc": "3cf92060e0c57a1f6a970599d58b33e5",
+        "t_sid": "Wc3j5Z64n0pv3RftLZRHnRvF9SWuMFH0",
+        "utm_channel": "NA",
+        "_uetsid": "7514f0104a6311f0be7d6b89a4c1e673",
+        "_uetvid": "75150b504a6311f0974d59c1d64da194",
+        "isg": "BGRk0ZPK0JhEdSRUv65oxzGYNWJW_Yhn-5NlwX6BsC9nKQfzpgyY9rub64kxysC_",
+        "_ga_6VT623WX3F": "GS2.3.s1750056508$o2$g1$t1750056548$j20$l0$h0",
+        "cto_bundle": "6xFME19zVHhMJTJGSGFlQ2lqZXRVNEZuZFhzd2VZMkNCRU1rTHRSeEJyYXQ5UUglMkYyTHRUQkFWeiUyQlV1OEI0aXV2JTJGa0FUNHQ1UlBVVTdBWnJxbzFuTGFaUDY3SEZ0TjRqdmElMkZxR05yT2dNOXNjUWNRTFVnOVZ3dmVqenpyZzZ4WUUxRmlYcDZxSXgyN2NWJTJGNnlERm1JUHlYZjBKbVRjZnNSMXRUTUxDMXIlMkZ6NjlMJTJCczhJJTNE",
+        "epssw": "9*mmCGJmjkWHDn9AvOutXURpqiQILO7tvzut2mZuTmu1HBO9JHdSZ3huBwdSa4dImm3tDm6umZPzy3AHysIiHmugXr0_mwvDom4MTI51KKu7zxCst0hjhSKSv92v2S2MqVWY3I23XQR26pVAufeJtQkKSyIfznmemTbDmm9LmOQiG3oIImuVuuaKmqtZKcKuWzBqSbyPfDarldaYPMun0GyDdPGXM_Zj5HpwuuxA0adYARJsm4miLR3AeYwqCVfjpFmmTMUAz71BGX80R14ITmrOPUGszCOak1FsVdYcXI6WY9HYJw3W3aURkH3fEXu3gbPS0PEmvPooIHqriXMYcCS_HAbI..",
+        "tfstk": "fxSraJ4cruEPPNVS5Q-EgbZ5SG-JqhF6KMOBK9XHFQAoV0GhusCEd8FLyZRFis8HP4tlK6WpgUwJRBsVT151Fg_7yJSVKOAWd8KoeZC1I0s7yHie2HK315Z_YT6JvHxbgP3owEvwpkikE0Dz7sn415Z_czDDY7P1O3HMJ49piLmkxp24mdJ9xLvkx-RDLd3nZ6xHPxSm_5MWyoBwiNdHdyegZxRxxDju2UAZAIpv3irWyC6ldDiQpL8ysaA0Zf3V0EXp8MHKYhWlW6pGZbVyxtJN_d5zRvdPoOs5-Tr4s3sRENYhjW3h_35eSgYqKDWJYQx2nG2IWIs2N6jyoRiwRnj6S3bbk7TB4d5h2_kEx1jPcXp0O2sdzX0erKp21-y2Ah5F7W_O3o0KJEYM3CwJ-23prKp21-yqJ2L-jKR_eef.."
+    }
+    url = "https://www.lazada.com.my/pop-mart-official-store/"
+    params = {
+        "ajax": "true",
+        "from": "wangpu",
+        "page": str(page),
+        "q": "All-Products"
+    }
+    response = requests.get(url, headers=headers, cookies=cookies, params=params, proxies=get_proxys(log), timeout=10)
+    # print(response.json())
+    # print(response)
+    response.raise_for_status()
+    resp_json = response.json()
+    return resp_json
+
+
+def parse_data(log, resp_json, sql_pool):
+    try:
+        listItems = resp_json.get("mods", {}).get("listItems", [])
+        if listItems:
+            info_list = []
+            for item in listItems:
+                title = item.get("name")
+                item_id = item.get("itemId")
+
+                images_list = item.get("thumbs", [{}])
+                images_list = [image.get("image") for image in images_list]
+                images = "|".join(images_list) if images_list else ""
+                # print(images)
+
+                if not images:
+                    images = item.get("image")
+
+                original_price_show = item.get("originalPriceShow")
+                price_show = item.get("priceShow")
+                review = item.get("review")
+                location = item.get("location")
+
+                # description = item.get("description", [])
+                # description = "|".join(description)
+
+                seller_name = item.get("sellerName")
+                seller_id = item.get("sellerId")
+                brand_name = item.get("brandName")
+                brand_id = item.get("brandId")
+                cheapest_sku = item.get("cheapest_sku")
+                # categories = item.get("categories", [])
+                # categories = "|".join(categories)
+
+                item_sold_show = item.get("itemSoldCntShow")
+                item_url = item.get("itemUrl")
+                if item_url:
+                    item_url = "https:" + item_url
+                in_stock = item.get("inStock")
+
+                info_dict = {
+                    "title": title,
+                    "item_id": item_id,
+                    "images": images,
+                    "original_price_show": original_price_show,
+                    "price_show": price_show,
+                    "review": review,
+                    "location": location,
+                    "seller_name": seller_name,
+                    "seller_id": seller_id,
+                    "brand_name": brand_name,
+                    "brand_id": brand_id,
+                    "cheapest_sku": cheapest_sku,
+                    "item_sold_show": item_sold_show,
+                    "item_url": item_url,
+                    "in_stock": in_stock  # 是否有货
+                }
+                # print(info_dict)
+                info_list.append(info_dict)
+            sql_pool.insert_many(table="popmart_lazada_record", data_list=info_list)
+
+        else:
+            log.info(f"No data found")
+    except Exception as e:
+        log.error(f"parse_data error: {e}")
+
+
+@retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
+def pop_lazada_main(log):
+    """
+    主函数
+    :param log: logger对象
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool.check_pool_health():
+        log.error("数据库连接池异常")
+        raise RuntimeError("数据库连接池异常")
+
+    try:
+        # 第一次抓取  共9页
+        for p in range(1, 10):
+            try:
+                resp_json = get_resp_one_page(log, p)
+                parse_data(log, resp_json, sql_pool)
+            except Exception as e:
+                log.error(f"Request get_resp_one_page page: {p}, error: {e}")
+
+            time.sleep(random.uniform(1, 2))
+
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+def schedule_task():
+    """
+    爬虫模块的定时任务启动文件
+    """
+    # 立即运行一次任务
+    # pop_lazada_main(log=logger)
+
+    # 设置定时任务
+    # schedule.every().day.at("00:01").do(pop_lazada_main, log=logger)
+    # schedule.every(30).minutes.do(pop_lazada_main, log=logger)
+    schedule.every(3).hours.do(pop_lazada_main, log=logger)
+
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+if __name__ == '__main__':
+    schedule_task()
+    # json_str = test_dict.test_dict
+    # parse_data(logger, json_str,None)

+ 168 - 0
popmart_spider/popmart_shopee_spider.py

@@ -0,0 +1,168 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/6/17 15:56
+import datetime
+import inspect
+import time
+import schedule
+from loguru import logger
+
+from mysql_pool import MySQLConnectionPool
+from DrissionPage import ChromiumPage, ChromiumOptions
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+logger.remove()
+logger.add("./logs/shopee_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_response(log, page_url) -> (None, None):
+    """
+    获取页面源码
+    :param log: log 对象
+    :param page_url: 页面 URL
+    :return: 页面源码, tag_turn_href
+    """
+    options = ChromiumOptions().set_paths(local_port=9130, user_data_path=r'D:\Drissionpage_temp\local_port_9130')
+    # options = ChromiumOptions()
+    options.set_argument("--disable-gpu")
+    options.set_argument("-accept-lang=en-US")
+    options.no_imgs(True)
+    page = ChromiumPage(options)
+    page.get(page_url)
+
+    # try:
+    page.listen.start('/api/v4/shop/rcmd_items')  # 开始监听
+
+    page.get(page_url)
+    # page.wait.load_start()  # 等待页面进入加载状态
+
+    # 等待符合条件的请求
+    for _ in range(10):  # 最多尝试10次
+        try:
+            res = page.listen.wait(timeout=10)  # 等待请求完成
+            # print("请求信息:", res.request.postData)
+
+            if res and res.method == 'POST':
+                # 获取请求体
+                post_data = res.request.postData
+                if not post_data:
+                    log.debug("请求体为空, 重新请求..........")
+                    continue
+
+                # 判断请求参数
+                if post_data.get('sort_type') == 2:
+                    log.debug("请求参数正确, 获取响应..........")
+                    # 获取响应内容
+                    response_body = res.response.body
+                    # print("找到符合条件的响应:", response_body)
+                    return response_body
+                else:
+                    log.debug("请求参数错误, 重新请求..........")
+                    continue
+
+        except Exception as e:
+            logger.error(f"等待请求超时或发生错误: {e}")
+            continue
+
+    #     time.sleep(111111)
+    #
+    # except Exception as e:
+    #     log.error(f'get_response error: {e}')
+    #     raise 'get_response error'
+    # finally:
+    #     page.close()
+    #     page.quit()
+    #     # driver.quit()
+
+
+def parse_data(log, resp_json, sql_pool):
+    res_error = resp_json.get('error')
+    if res_error == 0:
+        item_cards = resp_json.get('data', {}).get('centralize_item_card', {}).get('item_cards', [])
+        for item_card in item_cards:
+            # 详情链接是根据 item_id 和 shop_id 拼接
+            # title:POP MART Twinkle Twinkle Be a Little Star Series - Plush Pendant Blind Box
+            # shop_id:458606128
+            # item_id:28883839628
+            # https://my.xiapibuy.com/POP-MART-Twinkle-Twinkle-Be-a-Little-Star-Series-Plush-Pendant-Blind-Box-i.458606128.28883839628
+            item_id = item_card.get('itemid')
+            shop_id = item_card.get('shopid')
+
+            cat_id = item_card.get('catid')
+            item_card_displayed_asset = item_card.get('item_card_displayed_asset', {})
+            title = item_card_displayed_asset.get('name')
+            images = item_card_displayed_asset.get('images', [])  # 拼接 https://down-my.img.susercontent.com/file/
+            images = "|".join(images) if images else ''
+
+            liked_count = item_card.get('liked_count')
+            status = item_card.get('status')
+            ctime = item_card.get('ctime')
+            if ctime:
+                ctime_format = datetime.datetime.fromtimestamp(ctime).strftime('%Y-%m-%d %H:%M:%S')
+            else:
+                ctime_format = None
+
+            item_status = item_card.get('item_status')
+
+            price = item_card.get('item_card_display_price', {}).get('price')
+            if price:
+                price = float(price / 100000)
+                price = f"{price:.2f}"
+            historical_sold_count = item_card.get('item_card_display_sold_count', {}).get('historical_sold_count')  # 已售
+            monthly_sold_count = item_card.get('item_card_display_sold_count', {}).get('monthly_sold_count')  # 月销量
+            rating_text = item_card_displayed_asset.get('rating', {}).get('rating_text')  # 评分
+            if rating_text:
+                rating_text = rating_text.replace(' 商店评价', '')
+
+            info_dict = {
+                "item_id": item_id,
+                "shop_id": shop_id,
+                "cat_id": cat_id,
+                "title": title,
+                "images": images,
+                "liked_count": liked_count,
+                "status": status,
+                "ctime": ctime_format,
+                "item_status": item_status,
+                "price": price,
+                "historical_sold_count": historical_sold_count,
+                "monthly_sold_count": monthly_sold_count,
+                "rating_text": rating_text
+            }
+            print(info_dict)
+    else:
+        log.error(f"接口返回错误: {res_error}, error_msg:{resp_json.get('error_msg')}")
+
+
+def shopee_main(log):
+    # for i in range(1, 12):
+    i = 1
+    resp_dict = get_response(log, f'https://my.xiapibuy.com/popmartofficial.my?page={i}&sortBy=ctime&tab=0')
+    print(resp_dict)
+    parse_data(log, resp_dict, None)
+
+
+if __name__ == '__main__':
+    shopee_main(logger)

+ 9 - 0
popmart_spider/requirements.txt

@@ -0,0 +1,9 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+DrissionPage==4.1.0.18
+loguru==0.7.3
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+schedule==1.2.2
+tenacity==9.1.2

+ 9 - 0
rhyf_spider/requirements.txt

@@ -0,0 +1,9 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+pycryptodome==3.23.0
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+tenacity==9.1.2
+user_agent==0.1.14

+ 6 - 0
tcgplayer_spider/requirements.txt

@@ -0,0 +1,6 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5

+ 10 - 0
urbox_spider/requirements.txt

@@ -0,0 +1,10 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+pandas==2.3.3
+parsel==1.10.0
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+schedule==1.2.2
+tenacity==9.1.2

+ 9 - 0
veriswap_spider/requirements.txt

@@ -0,0 +1,9 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+schedule==1.2.2
+tenacity==9.1.2
+user_agent==0.1.14

+ 10 - 0
yueka_spider/requirements.txt

@@ -0,0 +1,10 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+bs4==0.0.2
+curl_cffi==0.14.0
+DBUtils==3.1.2
+loguru==0.7.3
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+schedule==1.2.2
+tenacity==9.1.2