Bladeren bron

add gea/zc spider 3.5.1

charley 2 weken geleden
bovenliggende
commit
d412ae78e4

+ 78 - 0
gea_spider/YamlLoader.py

@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/12/22 10:44
+import os, re
+import yaml
+
+regex = re.compile(r'^\$\{(?P<ENV>[A-Z_\-]+:)?(?P<VAL>[\w.]+)}$')
+
+
+class YamlConfig:
+    def __init__(self, config):
+        self.config = config
+    
+    def get(self, key: str):
+        return YamlConfig(self.config.get(key))
+    
+    def getValueAsString(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return os.getenv(env, group['VAL'])
+            return None
+        except:
+            return self.config[key]
+    
+    def getValueAsInt(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return int(os.getenv(env, group['VAL']))
+            return 0
+        except:
+            return int(self.config[key])
+    
+    def getValueAsBool(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return bool(os.getenv(env, group['VAL']))
+            return False
+        except:
+            return bool(self.config[key])
+
+
+def readYaml(path: str = 'application.yml', profile: str = None) -> YamlConfig:
+    if os.path.exists(path):
+        with open(path) as fd:
+            conf = yaml.load(fd, Loader=yaml.FullLoader)
+    
+    if profile is not None:
+        result = path.split('.')
+        profiledYaml = f'{result[0]}-{profile}.{result[1]}'
+        if os.path.exists(profiledYaml):
+            with open(profiledYaml) as fd:
+                conf.update(yaml.load(fd, Loader=yaml.FullLoader))
+    
+    return YamlConfig(conf)
+
+# res = readYaml()
+# mysqlConf = res.get('mysql')
+# print(mysqlConf)
+
+# print(res.getValueAsString("host"))
+# mysqlYaml = mysqlConf.getValueAsString("host")
+# print(mysqlYaml)
+# host = mysqlYaml.get("host").split(':')[-1][:-1]
+# port = mysqlYaml.get("port").split(':')[-1][:-1]
+# username = mysqlYaml.get("username").split(':')[-1][:-1]
+# password = mysqlYaml.get("password").split(':')[-1][:-1]
+# mysql_db = mysqlYaml.get("db").split(':')[-1][:-1]
+# print(host,port,username,password)

+ 11 - 0
gea_spider/application.yml

@@ -0,0 +1,11 @@
+mysql:
+  host: ${MYSQL_HOST:100.64.0.21}
+  port: ${MYSQL_PROT:3306}
+  username: ${MYSQL_USERNAME:crawler}
+  password: ${MYSQL_PASSWORD:Pass2022}
+  db: ${MYSQL_DATABASE:crawler}
+
+proxy:
+  tunnel: ${PROXY_TUNNEL:x371.kdltps.com:15818}
+  username: ${PROXY_USERNAME:t13753103189895}
+  password: ${PROXY_PASSWORD:o0yefv6z}

+ 219 - 0
gea_spider/gea_his_spider.py

@@ -0,0 +1,219 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/2/3 14:45
+import inspect
+import requests
+import user_agent
+from loguru import logger
+from mysql_pool import MySQLConnectionPool
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+"""
+https://grading11.com/cert
+2511000001-2511000020
+2512000001-2512000945
+2601000001-2601000779
+"""
+
+logger.remove()
+logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys(log):
+    """
+    获取代理配置
+
+    :param log: 日志对象
+    :return: 代理字典
+    """
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_cert_data(log, cert_id, sql_pool):
+    """
+    获取证书数据并保存到数据库
+
+    :param log: 日志对象
+    :param cert_id: 证书ID
+    :param sql_pool: 数据库连接池
+    """
+    log.debug(f'{inspect.currentframe().f_code.co_name} -> 开始获取证书数据:{cert_id}')
+    headers = {
+        "content-type": "application/json;charset=UTF-8",
+        "user-agent": user_agent.generate_user_agent()
+    }
+
+    url = "https://grading11.com/api/vendure/getData"
+    data = [{
+        "customQuery": {
+            "query": "ordercard-query",
+            "metadata": {
+                # "certNumber": "2511000001"
+                "certNumber": str(cert_id)
+            }
+        }
+    }]
+
+    try:
+        response = requests.post(url, headers=headers, json=data, timeout=22, proxies=get_proxys(log))
+        # response = requests.post(url, headers=headers, json=data, timeout=22)
+        # print(response.text)
+        response.raise_for_status()
+
+        resp_json = response.json()
+        orderCard = resp_json.get("orderCard", {})
+        if not orderCard:
+            log.error(f"Error getting orderCard: {resp_json}")
+            sql_pool.update_one("UPDATE gea_task SET state = 3 WHERE cert_id = %s", (cert_id,))
+            return
+
+        # 提取评分数据
+        totalGrade = orderCard.get("totalGrade")
+        centeringTotalSubGrade = orderCard.get("centeringTotalSubGrade")
+        surfacesTotalSubGrade = orderCard.get("surfacesTotalSubGrade")
+        edgesTotalSubGrade = orderCard.get("edgesTotalSubGrade")
+        cornersTotalSubGrade = orderCard.get("cornersTotalSubGrade")
+        createdAt = orderCard.get("createdAt")
+        date_graded = createdAt[:10] if createdAt else None
+        remark = orderCard.get("remark")
+
+        # 提取图片链接
+        try:
+            front_image = orderCard.get("frontSlabbedImage", {}).get("source")
+        except AttributeError:
+            front_image = None
+
+        try:
+            back_image = orderCard.get("backSlabbedImage", {}).get("source")
+        except AttributeError:
+            back_image = None
+
+        # 提取卡片信息
+        """
+        2023 Pokémon Trading Card Game English
+        【SVP】POKÉMON X VAN GOGH MUSEUM 085
+        Pikachu with Grey Felt Hat
+        PROMO
+        """
+        tag_card = orderCard.get("card", {})
+        cardName = tag_card.get("cardName")  # Pikachu with Grey Felt Hat
+        cardNumber = tag_card.get("cardNumber")  # 085
+        brand = tag_card.get("brand", {}).get("name")  # Pokémon Trading Card Game
+        releaseYear = tag_card.get("releaseYear")  # 2023
+        language = tag_card.get("language")  # en
+
+        try:
+            cardSet = tag_card.get("cardSet", {}).get("name")  # 【SVP】POKÉMON X VAN GOGH MUSEUM
+        except AttributeError:
+            cardSet = None
+
+        try:
+            rarity = tag_card.get("rarity", {}).get("name")  # PROMO
+        except AttributeError:
+            rarity = None
+        data_dict = {
+            "cert_id": cert_id,
+            "total_grade": totalGrade,
+            "centering_grade": centeringTotalSubGrade,
+            "surfaces_grade": surfacesTotalSubGrade,
+            "edges_grade": edgesTotalSubGrade,
+            "corners_grade": cornersTotalSubGrade,
+            "date_graded": date_graded,
+            "remark": remark,
+            "front_image": front_image,
+            "back_image": back_image,
+            "card_name": cardName,
+            "card_number": cardNumber,
+            "brand": brand,
+            "release_year": releaseYear,
+            "language": language,
+            "card_set": cardSet,
+            "rarity": rarity
+        }
+        # print(data_dict)
+        sql_pool.insert_one_or_dict(table="gea_record", data=data_dict)
+        sql_pool.update_one("UPDATE gea_task SET state = 1 WHERE cert_id = %s", (cert_id,))
+
+    except requests.RequestException as e:
+        log.error(f"Request failed for cert_id {cert_id}: {e}")
+        sql_pool.update_one(
+            "UPDATE gea_task SET state = 2 WHERE cert_id = %s",
+            (cert_id,)
+        )
+        raise
+    except Exception as e:
+        log.error(f"Error processing cert_id {cert_id}: {e}")
+        sql_pool.update_one(
+            "UPDATE gea_task SET state = 2 WHERE cert_id = %s",
+            (cert_id,)
+        )
+        raise
+
+@retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
+def gea_his_main(log):
+    """
+    主函数
+    :param log: logger对象
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool.check_pool_health():
+        log.error("数据库连接池异常")
+        raise RuntimeError("数据库连接池异常")
+
+    try:
+        sql_id_list = sql_pool.select_all("SELECT cert_id FROM gea_task WHERE state != 1")
+        sql_id_list = [item[0] for item in sql_id_list]
+        log.info(f'共 {len(sql_id_list)} 个证书待处理')
+
+        for cert_id in sql_id_list:
+            try:
+                get_cert_data(log, cert_id, sql_pool)
+            except Exception as e:
+                log.error(f'loop -> get_cert_data, error: {e}')
+                sql_pool.update_one("UPDATE gea_task SET state = 2 WHERE cert_id = %s", (cert_id,))
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+if __name__ == '__main__':
+    gea_his_main(logger)

+ 625 - 0
gea_spider/mysql_pool.py

@@ -0,0 +1,625 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/3/25 14:14
+import re
+import pymysql
+import YamlLoader
+from loguru import logger
+from dbutils.pooled_db import PooledDB
+
+# 获取yaml配置
+yaml = YamlLoader.readYaml()
+mysqlYaml = yaml.get("mysql")
+sql_host = mysqlYaml.getValueAsString("host")
+sql_port = mysqlYaml.getValueAsInt("port")
+sql_user = mysqlYaml.getValueAsString("username")
+sql_password = mysqlYaml.getValueAsString("password")
+sql_db = mysqlYaml.getValueAsString("db")
+
+
+class MySQLConnectionPool:
+    """
+    MySQL连接池
+    """
+
+    def __init__(self, mincached=4, maxcached=5, maxconnections=10, log=None):
+        """
+        初始化连接池
+        :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
+        :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
+        :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
+        :param log: 自定义日志记录器
+        """
+        # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
+        self.log = log or logger
+        self.pool = PooledDB(
+            creator=pymysql,
+            mincached=mincached,
+            maxcached=maxcached,
+            maxconnections=maxconnections,
+            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
+            host=sql_host,
+            port=sql_port,
+            user=sql_user,
+            password=sql_password,
+            database=sql_db,
+            ping=0  # 每次连接使用时自动检查有效性(0=不检查,1=执行query前检查,2=每次执行前检查)
+        )
+
+    def _execute(self, query, args=None, commit=False):
+        """
+        执行SQL
+        :param query: SQL语句
+        :param args: SQL参数
+        :param commit: 是否提交事务
+        :return: 查询结果
+        """
+        try:
+            with self.pool.connection() as conn:
+                with conn.cursor() as cursor:
+                    cursor.execute(query, args)
+                    if commit:
+                        conn.commit()
+                    self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
+                    return cursor
+        except Exception as e:
+            if commit:
+                conn.rollback()
+            self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
+            raise e
+
+    def select_one(self, query, args=None):
+        """
+        执行查询,返回单个结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchone()
+
+    def select_all(self, query, args=None):
+        """
+        执行查询,返回所有结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchall()
+
+    def insert_one(self, query, args):
+        """
+        执行单条插入语句
+        :param query: 插入语句
+        :param args: 插入参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        cursor = self._execute(query, args, commit=True)
+        return cursor.lastrowid  # 返回插入的ID
+
+    def insert_all(self, query, args_list):
+        """
+        执行批量插入语句,如果失败则逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql insert_all, SQL: {query[:100]}..., Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except pymysql.err.IntegrityError as e:
+            if "Duplicate entry" in str(e):
+                conn.rollback()
+                self.log.warning(f"批量插入遇到重复,开始逐条插入。错误: {e}")
+                rowcount = 0
+                for args in args_list:
+                    try:
+                        self.insert_one(query, args)
+                        rowcount += 1
+                    except pymysql.err.IntegrityError as e2:
+                        if "Duplicate entry" in str(e2):
+                            self.log.debug(f"跳过重复条目: {e2}")
+                        else:
+                            self.log.error(f"插入失败: {e2}")
+                    except Exception as e2:
+                        self.log.error(f"插入失败: {e2}")
+                self.log.info(f"逐条插入完成: {rowcount}/{len(args_list)}条")
+            else:
+                conn.rollback()
+                self.log.exception(f"数据库完整性错误: {e}")
+                raise e
+        except Exception as e:
+            conn.rollback()
+            self.log.exception(f"批量插入失败: {e}")
+            raise e
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def insert_one_or_dict(self, table=None, data=None, query=None, args=None, commit=True, ignore=False):
+        """
+        单条插入(支持字典或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data: 字典数据 {列名: 值}
+        :param query: 直接SQL语句(与data二选一)
+        :param args: SQL参数(query使用时必需)
+        :param commit: 是否自动提交
+        :param ignore: 是否使用ignore
+        :return: 最后插入ID
+        """
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data.keys()])
+            values = ', '.join(['%s'] * len(data))
+
+            # 构建 INSERT IGNORE 语句
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args = tuple(data.values())
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        try:
+            cursor = self._execute(query, args, commit)
+            self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+            return cursor.lastrowid
+        except pymysql.err.IntegrityError as e:
+            if "Duplicate entry" in str(e):
+                self.log.warning(f"插入失败:重复条目,已跳过。错误详情: {e}")
+                return -1  # 返回 -1 表示重复条目被跳过
+            else:
+                self.log.exception(f"数据库完整性错误: {e}")
+                raise
+        except Exception as e:
+            self.log.exception(f"未知错误: {e}")
+            raise
+
+    def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
+                    ignore=False):
+        """
+        批量插入(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :param ignore: 是否使用ignore
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+
+            # 构建 INSERT IGNORE 语句
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+            except pymysql.err.IntegrityError as e:
+                # 处理唯一索引冲突
+                if "Duplicate entry" in str(e):
+                    if ignore:
+                        # 如果使用了 INSERT IGNORE,理论上不会进这里,但以防万一
+                        self.log.warning(f"批量插入遇到重复条目(ignore模式): {e}")
+                    else:
+                        # 没有使用 IGNORE,降级为逐条插入
+                        self.log.warning(f"批量插入遇到重复条目,开始逐条插入。错误: {e}")
+                        if commit:
+                            conn.rollback()
+                        
+                        rowcount = 0
+                        for j, args in enumerate(batch):
+                            try:
+                                if data_list:
+                                    # 字典模式
+                                    self.insert_one_or_dict(
+                                        table=table,
+                                        data=dict(zip(data_list[0].keys(), args)),
+                                        commit=commit,
+                                        ignore=False  # 单条插入时手动捕获重复
+                                    )
+                                else:
+                                    # 原始SQL模式
+                                    self.insert_one(query, args)
+                                rowcount += 1
+                            except pymysql.err.IntegrityError as e2:
+                                if "Duplicate entry" in str(e2):
+                                    self.log.debug(f"跳过重复条目[{i+j+1}]: {e2}")
+                                else:
+                                    self.log.error(f"插入失败[{i+j+1}]: {e2}")
+                            except Exception as e2:
+                                self.log.error(f"插入失败[{i+j+1}]: {e2}")
+                        total += rowcount
+                        self.log.info(f"批次逐条插入完成: 成功{rowcount}/{len(batch)}条")
+                else:
+                    # 其他完整性错误
+                    self.log.exception(f"数据库完整性错误: {e}")
+                    if commit:
+                        conn.rollback()
+                    raise e
+            except Exception as e:
+                # 其他数据库错误
+                self.log.exception(f"批量插入失败: {e}")
+                if commit:
+                    conn.rollback()
+                raise e
+        if table:
+            self.log.info(f"sql insert_many, Table: {table}, Total Rows: {total}")
+        else:
+            self.log.info(f"sql insert_many, Query: {query}, Total Rows: {total}")
+        return total
+
+    def insert_many_two(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
+                        ignore=False):
+        """
+        批量插入(支持字典列表或原始SQL) - 备用方法
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :param ignore: 是否使用INSERT IGNORE
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+    
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+            except pymysql.err.IntegrityError as e:
+                if "Duplicate entry" in str(e) and not ignore:
+                    self.log.warning(f"批量插入遇到重复,降级为逐条插入: {e}")
+                    if commit:
+                        conn.rollback()
+                    rowcount = 0
+                    for args in batch:
+                        try:
+                            self.insert_one(query, args)
+                            rowcount += 1
+                        except pymysql.err.IntegrityError as e2:
+                            if "Duplicate entry" in str(e2):
+                                self.log.debug(f"跳过重复条目: {e2}")
+                            else:
+                                self.log.error(f"插入失败: {e2}")
+                        except Exception as e2:
+                            self.log.error(f"插入失败: {e2}")
+                    total += rowcount
+                else:
+                    self.log.exception(f"数据库完整性错误: {e}")
+                    if commit:
+                        conn.rollback()
+                    raise e
+            except Exception as e:
+                self.log.exception(f"批量插入失败: {e}")
+                if commit:
+                    conn.rollback()
+                raise e
+        self.log.info(f"sql insert_many_two, Table: {table}, Total Rows: {total}")
+        return total
+
+    def insert_too_many(self, query, args_list, batch_size=1000):
+        """
+        执行批量插入语句,分片提交, 单次插入大于十万+时可用, 如果失败则降级为逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        :param batch_size: 每次插入的条数
+        """
+        self.log.info(f"sql insert_too_many, Query: {query}, Total Rows: {len(args_list)}")
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        conn.commit()
+                        self.log.debug(f"insert_too_many -> Total Rows: {len(batch)}")
+            except Exception as e:
+                self.log.error(f"insert_too_many error. Trying single insert. Error: {e}")
+                # 当前批次降级为单条插入
+                for args in batch:
+                    self.insert_one(query, args)
+
+    def update_one(self, query, args):
+        """
+        执行单条更新语句
+        :param query: 更新语句
+        :param args: 更新参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def update_all(self, query, args_list):
+        """
+        执行批量更新语句,如果失败则逐条更新
+        :param query: 更新语句
+        :param args_list: 更新参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql update_all, SQL: {query}, Rows: {len(args_list)}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            # 如果批量更新失败,则逐条更新
+            rowcount = 0
+            for args in args_list:
+                self.update_one(query, args)
+                rowcount += 1
+            self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def update_one_or_dict(self, table=None, data=None, condition=None, query=None, args=None, commit=True):
+        """
+        单条更新(支持字典或原始SQL)
+        :param table: 表名(字典模式必需)
+        :param data: 字典数据 {列名: 值}(与 query 二选一)
+        :param condition: 更新条件,支持以下格式:
+            - 字典: {"id": 1} → "WHERE id = %s"
+            - 字符串: "id = 1" → "WHERE id = 1"(需自行确保安全)
+            - 元组: ("id = %s", [1]) → "WHERE id = %s"(参数化查询)
+        :param query: 直接SQL语句(与 data 二选一)
+        :param args: SQL参数(query 模式下必需)
+        :param commit: 是否自动提交
+        :return: 影响行数
+        :raises: ValueError 参数校验失败时抛出
+        """
+        # 参数校验
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+            if table is None:
+                raise ValueError("Table name is required for dictionary update")
+            if condition is None:
+                raise ValueError("Condition is required for dictionary update")
+
+            # 构建 SET 子句
+            set_clause = ", ".join([f"{self._safe_identifier(k)} = %s" for k in data.keys()])
+            set_values = list(data.values())
+
+            # 解析条件
+            condition_clause, condition_args = self._parse_condition(condition)
+            query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
+            args = set_values + condition_args
+
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        # 执行更新
+        cursor = self._execute(query, args, commit)
+        # self.log.debug(
+        #     f"Updated table={table}, rows={cursor.rowcount}, query={query[:100]}...",
+        #     extra={"table": table, "rows": cursor.rowcount}
+        # )
+        return cursor.rowcount
+
+    def _parse_condition(self, condition):
+        """
+        解析条件为 (clause, args) 格式
+        :param condition: 字典/字符串/元组
+        :return: (str, list) SQL 子句和参数列表
+        """
+        if isinstance(condition, dict):
+            clause = " AND ".join([f"{self._safe_identifier(k)} = %s" for k in condition.keys()])
+            args = list(condition.values())
+        elif isinstance(condition, str):
+            clause = condition  # 注意:需调用方确保安全
+            args = []
+        elif isinstance(condition, (tuple, list)) and len(condition) == 2:
+            clause, args = condition[0], condition[1]
+            if not isinstance(args, (list, tuple)):
+                args = [args]
+        else:
+            raise ValueError("Condition must be dict/str/(clause, args)")
+        return clause, args
+
+    def update_many(self, table=None, data_list=None, condition_list=None, query=None, args_list=None, batch_size=500,
+                    commit=True):
+        """
+        批量更新(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param condition_list: 条件列表(必须为字典,与data_list等长)
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+            if condition_list is None or len(data_list) != len(condition_list):
+                raise ValueError("Condition_list must be provided and match the length of data_list")
+            if not all(isinstance(cond, dict) for cond in condition_list):
+                raise ValueError("All elements in condition_list must be dictionaries")
+
+            # 获取第一个数据项和条件项的键
+            first_data_keys = set(data_list[0].keys())
+            first_cond_keys = set(condition_list[0].keys())
+
+            # 构造基础SQL
+            set_clause = ', '.join([self._safe_identifier(k) + ' = %s' for k in data_list[0].keys()])
+            condition_clause = ' AND '.join([self._safe_identifier(k) + ' = %s' for k in condition_list[0].keys()])
+            base_query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
+            total = 0
+
+            # 分批次处理
+            for i in range(0, len(data_list), batch_size):
+                batch_data = data_list[i:i + batch_size]
+                batch_conds = condition_list[i:i + batch_size]
+                batch_args = []
+
+                # 检查当前批次的结构是否一致
+                can_batch = True
+                for data, cond in zip(batch_data, batch_conds):
+                    data_keys = set(data.keys())
+                    cond_keys = set(cond.keys())
+                    if data_keys != first_data_keys or cond_keys != first_cond_keys:
+                        can_batch = False
+                        break
+                    batch_args.append(tuple(data.values()) + tuple(cond.values()))
+
+                if not can_batch:
+                    # 结构不一致,转为单条更新
+                    for data, cond in zip(batch_data, batch_conds):
+                        self.update_one_or_dict(table=table, data=data, condition=cond, commit=commit)
+                        total += 1
+                    continue
+
+                # 执行批量更新
+                try:
+                    with self.pool.connection() as conn:
+                        with conn.cursor() as cursor:
+                            cursor.executemany(base_query, batch_args)
+                            if commit:
+                                conn.commit()
+                            total += cursor.rowcount
+                            self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
+                except Exception as e:
+                    if commit:
+                        conn.rollback()
+                    self.log.error(f"Batch update failed: {e}")
+                    # 降级为单条更新
+                    for args, data, cond in zip(batch_args, batch_data, batch_conds):
+                        try:
+                            self._execute(base_query, args, commit=commit)
+                            total += 1
+                        except Exception as e2:
+                            self.log.error(f"Single update failed: {e2}, Data: {data}, Condition: {cond}")
+            self.log.info(f"Total updated rows: {total}")
+            return total
+        elif query is not None:
+            # 处理原始SQL和参数列表
+            if args_list is None:
+                raise ValueError("args_list must be provided when using query")
+
+            total = 0
+            for i in range(0, len(args_list), batch_size):
+                batch_args = args_list[i:i + batch_size]
+                try:
+                    with self.pool.connection() as conn:
+                        with conn.cursor() as cursor:
+                            cursor.executemany(query, batch_args)
+                            if commit:
+                                conn.commit()
+                            total += cursor.rowcount
+                            self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
+                except Exception as e:
+                    if commit:
+                        conn.rollback()
+                    self.log.error(f"Batch update failed: {e}")
+                    # 降级为单条更新
+                    for args in batch_args:
+                        try:
+                            self._execute(query, args, commit=commit)
+                            total += 1
+                        except Exception as e2:
+                            self.log.error(f"Single update failed: {e2}, Args: {args}")
+            self.log.info(f"Total updated rows: {total}")
+            return total
+        else:
+            raise ValueError("Either data_list or query must be provided")
+
+    def check_pool_health(self):
+        """
+        检查连接池中有效连接数
+
+        # 使用示例
+        # 配置 MySQL 连接池
+        sql_pool = MySQLConnectionPool(log=log)
+        if not sql_pool.check_pool_health():
+            log.error("数据库连接池异常")
+            raise RuntimeError("数据库连接池异常")
+        """
+        try:
+            with self.pool.connection() as conn:
+                conn.ping(reconnect=True)
+                return True
+        except Exception as e:
+            self.log.error(f"Connection pool health check failed: {e}")
+            return False
+
+    def close(self):
+        """
+        关闭连接池,释放所有连接
+        """
+        try:
+            if hasattr(self, 'pool') and self.pool:
+                self.pool.close()
+                self.log.info("数据库连接池已关闭")
+        except Exception as e:
+            self.log.error(f"关闭连接池失败: {e}")
+
+    @staticmethod
+    def _safe_identifier(name):
+        """SQL标识符安全校验"""
+        if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
+            raise ValueError(f"Invalid SQL identifier: {name}")
+        return name
+
+
+if __name__ == '__main__':
+    sql_pool = MySQLConnectionPool()
+    data_dic = {'card_type_id': 111, 'card_type_name': '补充包 继承的意志【OPC-13】', 'card_type_position': 964,
+                'card_id': 5284, 'card_name': '蒙奇·D·路飞', 'card_number': 'OP13-001', 'card_rarity': 'L',
+                'card_img': 'https://source.windoent.com/OnePiecePc/Picture/1757929283612OP13-001.png',
+                'card_life': '4', 'card_attribute': '打', 'card_power': '5000', 'card_attack': '-',
+                'card_color': '红/绿', 'subscript': 4, 'card_features': '超新星/草帽一伙',
+                'card_text_desc': '【咚!!×1】【对方的攻击时】我方处于活跃状态的咚!!不多于5张的场合,可以将我方任意张数的咚!!转为休息状态。每有1张转为休息状态的咚!!,本次战斗中,此领袖或我方最多1张拥有《草帽一伙》特征的角色力量+2000。',
+                'card_offer_type': '补充包 继承的意志【OPC-13】', 'crawler_language': '简中'}
+    sql_pool.insert_one_or_dict(table="one_piece_record", data=data_dic)

+ 78 - 0
zc_spider/YamlLoader.py

@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/12/22 10:44
+import os, re
+import yaml
+
+regex = re.compile(r'^\$\{(?P<ENV>[A-Z_\-]+:)?(?P<VAL>[\w.]+)}$')
+
+
+class YamlConfig:
+    def __init__(self, config):
+        self.config = config
+    
+    def get(self, key: str):
+        return YamlConfig(self.config.get(key))
+    
+    def getValueAsString(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return os.getenv(env, group['VAL'])
+            return None
+        except:
+            return self.config[key]
+    
+    def getValueAsInt(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return int(os.getenv(env, group['VAL']))
+            return 0
+        except:
+            return int(self.config[key])
+    
+    def getValueAsBool(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return bool(os.getenv(env, group['VAL']))
+            return False
+        except:
+            return bool(self.config[key])
+
+
+def readYaml(path: str = 'application.yml', profile: str = None) -> YamlConfig:
+    if os.path.exists(path):
+        with open(path) as fd:
+            conf = yaml.load(fd, Loader=yaml.FullLoader)
+    
+    if profile is not None:
+        result = path.split('.')
+        profiledYaml = f'{result[0]}-{profile}.{result[1]}'
+        if os.path.exists(profiledYaml):
+            with open(profiledYaml) as fd:
+                conf.update(yaml.load(fd, Loader=yaml.FullLoader))
+    
+    return YamlConfig(conf)
+
+# res = readYaml()
+# mysqlConf = res.get('mysql')
+# print(mysqlConf)
+
+# print(res.getValueAsString("host"))
+# mysqlYaml = mysqlConf.getValueAsString("host")
+# print(mysqlYaml)
+# host = mysqlYaml.get("host").split(':')[-1][:-1]
+# port = mysqlYaml.get("port").split(':')[-1][:-1]
+# username = mysqlYaml.get("username").split(':')[-1][:-1]
+# password = mysqlYaml.get("password").split(':')[-1][:-1]
+# mysql_db = mysqlYaml.get("db").split(':')[-1][:-1]
+# print(host,port,username,password)

+ 6 - 0
zc_spider/application.yml

@@ -0,0 +1,6 @@
+mysql:
+  host: ${MYSQL_HOST:100.64.0.25}
+  port: ${MYSQL_PROT:3306}
+  username: ${MYSQL_USERNAME:crawler}
+  password: ${MYSQL_PASSWORD:Pass2022}
+  db: ${MYSQL_DATABASE:crawler}

File diff suppressed because it is too large
+ 122 - 0
zc_spider/crypto_utils.py


+ 625 - 0
zc_spider/mysql_pool.py

@@ -0,0 +1,625 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/3/25 14:14
+import re
+import pymysql
+import YamlLoader
+from loguru import logger
+from dbutils.pooled_db import PooledDB
+
+# 获取yaml配置
+yaml = YamlLoader.readYaml()
+mysqlYaml = yaml.get("mysql")
+sql_host = mysqlYaml.getValueAsString("host")
+sql_port = mysqlYaml.getValueAsInt("port")
+sql_user = mysqlYaml.getValueAsString("username")
+sql_password = mysqlYaml.getValueAsString("password")
+sql_db = mysqlYaml.getValueAsString("db")
+
+
+class MySQLConnectionPool:
+    """
+    MySQL连接池
+    """
+
+    def __init__(self, mincached=4, maxcached=5, maxconnections=10, log=None):
+        """
+        初始化连接池
+        :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
+        :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
+        :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
+        :param log: 自定义日志记录器
+        """
+        # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
+        self.log = log or logger
+        self.pool = PooledDB(
+            creator=pymysql,
+            mincached=mincached,
+            maxcached=maxcached,
+            maxconnections=maxconnections,
+            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
+            host=sql_host,
+            port=sql_port,
+            user=sql_user,
+            password=sql_password,
+            database=sql_db,
+            ping=0  # 每次连接使用时自动检查有效性(0=不检查,1=执行query前检查,2=每次执行前检查)
+        )
+
+    def _execute(self, query, args=None, commit=False):
+        """
+        执行SQL
+        :param query: SQL语句
+        :param args: SQL参数
+        :param commit: 是否提交事务
+        :return: 查询结果
+        """
+        try:
+            with self.pool.connection() as conn:
+                with conn.cursor() as cursor:
+                    cursor.execute(query, args)
+                    if commit:
+                        conn.commit()
+                    self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
+                    return cursor
+        except Exception as e:
+            if commit:
+                conn.rollback()
+            self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
+            raise e
+
+    def select_one(self, query, args=None):
+        """
+        执行查询,返回单个结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchone()
+
+    def select_all(self, query, args=None):
+        """
+        执行查询,返回所有结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchall()
+
+    def insert_one(self, query, args):
+        """
+        执行单条插入语句
+        :param query: 插入语句
+        :param args: 插入参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        cursor = self._execute(query, args, commit=True)
+        return cursor.lastrowid  # 返回插入的ID
+
+    def insert_all(self, query, args_list):
+        """
+        执行批量插入语句,如果失败则逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql insert_all, SQL: {query[:100]}..., Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except pymysql.err.IntegrityError as e:
+            if "Duplicate entry" in str(e):
+                conn.rollback()
+                self.log.warning(f"批量插入遇到重复,开始逐条插入。错误: {e}")
+                rowcount = 0
+                for args in args_list:
+                    try:
+                        self.insert_one(query, args)
+                        rowcount += 1
+                    except pymysql.err.IntegrityError as e2:
+                        if "Duplicate entry" in str(e2):
+                            self.log.debug(f"跳过重复条目: {e2}")
+                        else:
+                            self.log.error(f"插入失败: {e2}")
+                    except Exception as e2:
+                        self.log.error(f"插入失败: {e2}")
+                self.log.info(f"逐条插入完成: {rowcount}/{len(args_list)}条")
+            else:
+                conn.rollback()
+                self.log.exception(f"数据库完整性错误: {e}")
+                raise e
+        except Exception as e:
+            conn.rollback()
+            self.log.exception(f"批量插入失败: {e}")
+            raise e
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def insert_one_or_dict(self, table=None, data=None, query=None, args=None, commit=True, ignore=False):
+        """
+        单条插入(支持字典或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data: 字典数据 {列名: 值}
+        :param query: 直接SQL语句(与data二选一)
+        :param args: SQL参数(query使用时必需)
+        :param commit: 是否自动提交
+        :param ignore: 是否使用ignore
+        :return: 最后插入ID
+        """
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data.keys()])
+            values = ', '.join(['%s'] * len(data))
+
+            # 构建 INSERT IGNORE 语句
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args = tuple(data.values())
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        try:
+            cursor = self._execute(query, args, commit)
+            self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+            return cursor.lastrowid
+        except pymysql.err.IntegrityError as e:
+            if "Duplicate entry" in str(e):
+                self.log.warning(f"插入失败:重复条目,已跳过。错误详情: {e}")
+                return -1  # 返回 -1 表示重复条目被跳过
+            else:
+                self.log.exception(f"数据库完整性错误: {e}")
+                raise
+        except Exception as e:
+            self.log.exception(f"未知错误: {e}")
+            raise
+
+    def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
+                    ignore=False):
+        """
+        批量插入(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :param ignore: 是否使用ignore
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+
+            # 构建 INSERT IGNORE 语句
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+            except pymysql.err.IntegrityError as e:
+                # 处理唯一索引冲突
+                if "Duplicate entry" in str(e):
+                    if ignore:
+                        # 如果使用了 INSERT IGNORE,理论上不会进这里,但以防万一
+                        self.log.warning(f"批量插入遇到重复条目(ignore模式): {e}")
+                    else:
+                        # 没有使用 IGNORE,降级为逐条插入
+                        self.log.warning(f"批量插入遇到重复条目,开始逐条插入。错误: {e}")
+                        if commit:
+                            conn.rollback()
+                        
+                        rowcount = 0
+                        for j, args in enumerate(batch):
+                            try:
+                                if data_list:
+                                    # 字典模式
+                                    self.insert_one_or_dict(
+                                        table=table,
+                                        data=dict(zip(data_list[0].keys(), args)),
+                                        commit=commit,
+                                        ignore=False  # 单条插入时手动捕获重复
+                                    )
+                                else:
+                                    # 原始SQL模式
+                                    self.insert_one(query, args)
+                                rowcount += 1
+                            except pymysql.err.IntegrityError as e2:
+                                if "Duplicate entry" in str(e2):
+                                    self.log.debug(f"跳过重复条目[{i+j+1}]: {e2}")
+                                else:
+                                    self.log.error(f"插入失败[{i+j+1}]: {e2}")
+                            except Exception as e2:
+                                self.log.error(f"插入失败[{i+j+1}]: {e2}")
+                        total += rowcount
+                        self.log.info(f"批次逐条插入完成: 成功{rowcount}/{len(batch)}条")
+                else:
+                    # 其他完整性错误
+                    self.log.exception(f"数据库完整性错误: {e}")
+                    if commit:
+                        conn.rollback()
+                    raise e
+            except Exception as e:
+                # 其他数据库错误
+                self.log.exception(f"批量插入失败: {e}")
+                if commit:
+                    conn.rollback()
+                raise e
+        if table:
+            self.log.info(f"sql insert_many, Table: {table}, Total Rows: {total}")
+        else:
+            self.log.info(f"sql insert_many, Query: {query}, Total Rows: {total}")
+        return total
+
+    def insert_many_two(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
+                        ignore=False):
+        """
+        批量插入(支持字典列表或原始SQL) - 备用方法
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :param ignore: 是否使用INSERT IGNORE
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+    
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+            except pymysql.err.IntegrityError as e:
+                if "Duplicate entry" in str(e) and not ignore:
+                    self.log.warning(f"批量插入遇到重复,降级为逐条插入: {e}")
+                    if commit:
+                        conn.rollback()
+                    rowcount = 0
+                    for args in batch:
+                        try:
+                            self.insert_one(query, args)
+                            rowcount += 1
+                        except pymysql.err.IntegrityError as e2:
+                            if "Duplicate entry" in str(e2):
+                                self.log.debug(f"跳过重复条目: {e2}")
+                            else:
+                                self.log.error(f"插入失败: {e2}")
+                        except Exception as e2:
+                            self.log.error(f"插入失败: {e2}")
+                    total += rowcount
+                else:
+                    self.log.exception(f"数据库完整性错误: {e}")
+                    if commit:
+                        conn.rollback()
+                    raise e
+            except Exception as e:
+                self.log.exception(f"批量插入失败: {e}")
+                if commit:
+                    conn.rollback()
+                raise e
+        self.log.info(f"sql insert_many_two, Table: {table}, Total Rows: {total}")
+        return total
+
+    def insert_too_many(self, query, args_list, batch_size=1000):
+        """
+        执行批量插入语句,分片提交, 单次插入大于十万+时可用, 如果失败则降级为逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        :param batch_size: 每次插入的条数
+        """
+        self.log.info(f"sql insert_too_many, Query: {query}, Total Rows: {len(args_list)}")
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        conn.commit()
+                        self.log.debug(f"insert_too_many -> Total Rows: {len(batch)}")
+            except Exception as e:
+                self.log.error(f"insert_too_many error. Trying single insert. Error: {e}")
+                # 当前批次降级为单条插入
+                for args in batch:
+                    self.insert_one(query, args)
+
+    def update_one(self, query, args):
+        """
+        执行单条更新语句
+        :param query: 更新语句
+        :param args: 更新参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def update_all(self, query, args_list):
+        """
+        执行批量更新语句,如果失败则逐条更新
+        :param query: 更新语句
+        :param args_list: 更新参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql update_all, SQL: {query}, Rows: {len(args_list)}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            # 如果批量更新失败,则逐条更新
+            rowcount = 0
+            for args in args_list:
+                self.update_one(query, args)
+                rowcount += 1
+            self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def update_one_or_dict(self, table=None, data=None, condition=None, query=None, args=None, commit=True):
+        """
+        单条更新(支持字典或原始SQL)
+        :param table: 表名(字典模式必需)
+        :param data: 字典数据 {列名: 值}(与 query 二选一)
+        :param condition: 更新条件,支持以下格式:
+            - 字典: {"id": 1} → "WHERE id = %s"
+            - 字符串: "id = 1" → "WHERE id = 1"(需自行确保安全)
+            - 元组: ("id = %s", [1]) → "WHERE id = %s"(参数化查询)
+        :param query: 直接SQL语句(与 data 二选一)
+        :param args: SQL参数(query 模式下必需)
+        :param commit: 是否自动提交
+        :return: 影响行数
+        :raises: ValueError 参数校验失败时抛出
+        """
+        # 参数校验
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+            if table is None:
+                raise ValueError("Table name is required for dictionary update")
+            if condition is None:
+                raise ValueError("Condition is required for dictionary update")
+
+            # 构建 SET 子句
+            set_clause = ", ".join([f"{self._safe_identifier(k)} = %s" for k in data.keys()])
+            set_values = list(data.values())
+
+            # 解析条件
+            condition_clause, condition_args = self._parse_condition(condition)
+            query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
+            args = set_values + condition_args
+
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        # 执行更新
+        cursor = self._execute(query, args, commit)
+        # self.log.debug(
+        #     f"Updated table={table}, rows={cursor.rowcount}, query={query[:100]}...",
+        #     extra={"table": table, "rows": cursor.rowcount}
+        # )
+        return cursor.rowcount
+
+    def _parse_condition(self, condition):
+        """
+        解析条件为 (clause, args) 格式
+        :param condition: 字典/字符串/元组
+        :return: (str, list) SQL 子句和参数列表
+        """
+        if isinstance(condition, dict):
+            clause = " AND ".join([f"{self._safe_identifier(k)} = %s" for k in condition.keys()])
+            args = list(condition.values())
+        elif isinstance(condition, str):
+            clause = condition  # 注意:需调用方确保安全
+            args = []
+        elif isinstance(condition, (tuple, list)) and len(condition) == 2:
+            clause, args = condition[0], condition[1]
+            if not isinstance(args, (list, tuple)):
+                args = [args]
+        else:
+            raise ValueError("Condition must be dict/str/(clause, args)")
+        return clause, args
+
+    def update_many(self, table=None, data_list=None, condition_list=None, query=None, args_list=None, batch_size=500,
+                    commit=True):
+        """
+        批量更新(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param condition_list: 条件列表(必须为字典,与data_list等长)
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+            if condition_list is None or len(data_list) != len(condition_list):
+                raise ValueError("Condition_list must be provided and match the length of data_list")
+            if not all(isinstance(cond, dict) for cond in condition_list):
+                raise ValueError("All elements in condition_list must be dictionaries")
+
+            # 获取第一个数据项和条件项的键
+            first_data_keys = set(data_list[0].keys())
+            first_cond_keys = set(condition_list[0].keys())
+
+            # 构造基础SQL
+            set_clause = ', '.join([self._safe_identifier(k) + ' = %s' for k in data_list[0].keys()])
+            condition_clause = ' AND '.join([self._safe_identifier(k) + ' = %s' for k in condition_list[0].keys()])
+            base_query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
+            total = 0
+
+            # 分批次处理
+            for i in range(0, len(data_list), batch_size):
+                batch_data = data_list[i:i + batch_size]
+                batch_conds = condition_list[i:i + batch_size]
+                batch_args = []
+
+                # 检查当前批次的结构是否一致
+                can_batch = True
+                for data, cond in zip(batch_data, batch_conds):
+                    data_keys = set(data.keys())
+                    cond_keys = set(cond.keys())
+                    if data_keys != first_data_keys or cond_keys != first_cond_keys:
+                        can_batch = False
+                        break
+                    batch_args.append(tuple(data.values()) + tuple(cond.values()))
+
+                if not can_batch:
+                    # 结构不一致,转为单条更新
+                    for data, cond in zip(batch_data, batch_conds):
+                        self.update_one_or_dict(table=table, data=data, condition=cond, commit=commit)
+                        total += 1
+                    continue
+
+                # 执行批量更新
+                try:
+                    with self.pool.connection() as conn:
+                        with conn.cursor() as cursor:
+                            cursor.executemany(base_query, batch_args)
+                            if commit:
+                                conn.commit()
+                            total += cursor.rowcount
+                            self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
+                except Exception as e:
+                    if commit:
+                        conn.rollback()
+                    self.log.error(f"Batch update failed: {e}")
+                    # 降级为单条更新
+                    for args, data, cond in zip(batch_args, batch_data, batch_conds):
+                        try:
+                            self._execute(base_query, args, commit=commit)
+                            total += 1
+                        except Exception as e2:
+                            self.log.error(f"Single update failed: {e2}, Data: {data}, Condition: {cond}")
+            self.log.info(f"Total updated rows: {total}")
+            return total
+        elif query is not None:
+            # 处理原始SQL和参数列表
+            if args_list is None:
+                raise ValueError("args_list must be provided when using query")
+
+            total = 0
+            for i in range(0, len(args_list), batch_size):
+                batch_args = args_list[i:i + batch_size]
+                try:
+                    with self.pool.connection() as conn:
+                        with conn.cursor() as cursor:
+                            cursor.executemany(query, batch_args)
+                            if commit:
+                                conn.commit()
+                            total += cursor.rowcount
+                            self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
+                except Exception as e:
+                    if commit:
+                        conn.rollback()
+                    self.log.error(f"Batch update failed: {e}")
+                    # 降级为单条更新
+                    for args in batch_args:
+                        try:
+                            self._execute(query, args, commit=commit)
+                            total += 1
+                        except Exception as e2:
+                            self.log.error(f"Single update failed: {e2}, Args: {args}")
+            self.log.info(f"Total updated rows: {total}")
+            return total
+        else:
+            raise ValueError("Either data_list or query must be provided")
+
+    def check_pool_health(self):
+        """
+        检查连接池中有效连接数
+
+        # 使用示例
+        # 配置 MySQL 连接池
+        sql_pool = MySQLConnectionPool(log=log)
+        if not sql_pool.check_pool_health():
+            log.error("数据库连接池异常")
+            raise RuntimeError("数据库连接池异常")
+        """
+        try:
+            with self.pool.connection() as conn:
+                conn.ping(reconnect=True)
+                return True
+        except Exception as e:
+            self.log.error(f"Connection pool health check failed: {e}")
+            return False
+
+    def close(self):
+        """
+        关闭连接池,释放所有连接
+        """
+        try:
+            if hasattr(self, 'pool') and self.pool:
+                self.pool.close()
+                self.log.info("数据库连接池已关闭")
+        except Exception as e:
+            self.log.error(f"关闭连接池失败: {e}")
+
+    @staticmethod
+    def _safe_identifier(name):
+        """SQL标识符安全校验"""
+        if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
+            raise ValueError(f"Invalid SQL identifier: {name}")
+        return name
+
+
+if __name__ == '__main__':
+    sql_pool = MySQLConnectionPool()
+    data_dic = {'card_type_id': 111, 'card_type_name': '补充包 继承的意志【OPC-13】', 'card_type_position': 964,
+                'card_id': 5284, 'card_name': '蒙奇·D·路飞', 'card_number': 'OP13-001', 'card_rarity': 'L',
+                'card_img': 'https://source.windoent.com/OnePiecePc/Picture/1757929283612OP13-001.png',
+                'card_life': '4', 'card_attribute': '打', 'card_power': '5000', 'card_attack': '-',
+                'card_color': '红/绿', 'subscript': 4, 'card_features': '超新星/草帽一伙',
+                'card_text_desc': '【咚!!×1】【对方的攻击时】我方处于活跃状态的咚!!不多于5张的场合,可以将我方任意张数的咚!!转为休息状态。每有1张转为休息状态的咚!!,本次战斗中,此领袖或我方最多1张拥有《草帽一伙》特征的角色力量+2000。',
+                'card_offer_type': '补充包 继承的意志【OPC-13】', 'crawler_language': '简中'}
+    sql_pool.insert_one_or_dict(table="one_piece_record", data=data_dic)

+ 537 - 0
zc_spider/zc_history_spider.py

@@ -0,0 +1,537 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/2/27 11:22
+import time
+import inspect
+import requests
+import user_agent
+from loguru import logger
+from crypto_utils import CryptoHelper
+from mysql_pool import MySQLConnectionPool
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+logger.remove()
+logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+# 基础配置
+BASE_URL = "https://cashier.yqszpay.com"
+PAGE_SIZE = 10
+
+headers = {
+    "User-Agent": user_agent.generate_user_agent(),
+    "Connection": "Keep-Alive",
+    "Accept-Encoding": "gzip",
+    "Content-Type": "application/json",
+    "channelNo": "88888888",
+    "pageSize": str(PAGE_SIZE),
+    # "pageNum": 1,
+    "version": "1.9.9.82537"
+}
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys(log):
+    """
+    获取代理配置
+
+    :param log: 日志对象
+    :return: 代理字典
+    """
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def make_encrypted_post_request(log, url: str, request_data: dict, extra_headers: dict = None):
+    """
+    通用加密POST请求函数(带重试机制)
+
+    :param log: 日志对象
+    :param url: 请求URL
+    :param request_data: 请求数据字典(会被加密)
+    :param extra_headers: 额外的请求头
+    :return: 解密后的响应数据,失败返回None
+    """
+    request_headers = headers.copy()
+    if extra_headers:
+        request_headers.update(extra_headers)
+
+    log.debug(f"Request URL: {url}, Data: {request_data}")
+
+    encrypted_body = CryptoHelper.encrypt_request_data(request_data)
+    # print(request_headers)
+    response = requests.post(url, headers=request_headers, json=encrypted_body, timeout=30)
+    # response.raise_for_status()
+
+    if response.status_code == 200:
+        response_json = response.json()
+        # log.debug(f"Raw response: {response_json}")
+
+        if 'data' in response_json:
+            decrypted = CryptoHelper.decrypt_response_data(response_json)
+            # log.debug(f"Decrypted response: {decrypted}")
+            return decrypted
+        return response_json
+    else:
+        log.error(f"请求失败: {response.status_code}, Response: {response.text}")
+        return None
+
+
+def get_shop_single_page(log, page_num, page_size=PAGE_SIZE):
+    """
+    获取商户列表(支持翻页)
+
+    :param log: 日志对象
+    :param page_num: 页码
+    :param page_size: 每页条数
+    """
+    log.debug(f"Getting shop list, page: {page_num}")
+    url = f"{BASE_URL}/zc-api/merchant/getMerMyList"
+    request_data = {'pageNum': page_num, 'pageSize': page_size}
+    try:
+        resp = make_encrypted_post_request(log, url, request_data, extra_headers={"pageNum": str(page_num)})
+    except Exception as e:
+        log.error(f"Error getting shop list: {e}")
+        resp = None
+    return resp
+
+
+def get_sold_single_page(log, mer_no, page_num):
+    """
+    获取商品列表(支持翻页)
+
+    :param log: 日志对象
+    :param mer_no: 商户编号
+    :param page_num: 页码
+    """
+    log.info(f"Getting sold items for mer_no: {mer_no}, page: {page_num}")
+    url = f"{BASE_URL}/zc-api/act/actProduct/getActList"
+    request_data = {
+        'merNo': mer_no,
+        'pageNum': page_num,
+        'pageSize': PAGE_SIZE,
+        'queryType': 1
+    }
+    return make_encrypted_post_request(log, url, request_data, extra_headers={"pageNum": str(page_num)})
+
+
+def get_player_single_page(log, act_id, token, page_num, page_size=PAGE_SIZE):
+    """
+    获取玩家列表(支持翻页)
+
+    :param log: 日志对象
+    :param act_id: 活动ID
+    :param token: Authorization token
+    :param page_num: 页码
+    :param page_size: 每页条数
+    """
+    log.debug(f"Getting player list for act_id: {act_id}, page: {page_num}")
+    url = f"{BASE_URL}/zc-api/act/actOrder/getActOrderPublicDetails"
+    request_data = {'actId': act_id, 'pageNum': page_num, 'pageSize': page_size}
+    return make_encrypted_post_request(
+        log, url, request_data,
+        extra_headers={"Authorization": token, "pageNum": str(page_num)}
+    )
+
+
+def parse_shop_data(log, items, sql_pool):
+    """
+    解析商户数据
+
+    :param log: 日志对象
+    :param items: 商户列表
+    :param sql_pool: MySQL连接池
+    :return: 解析后的数据列表
+    """
+    log.debug(f"Parsing shop data...........")
+    info_list = []
+    for item in items:
+        # log.debug(f"Processing shop item: {item}")
+        shop_id = item.get('merNo')
+        shop_name = item.get('merName')
+        sold_number = item.get('spell_number')
+        # link_man = item.get('linkMan')
+        # user_id = item.get('userId')
+        fans = item.get('attentionNumber')
+        data_dict = {
+            'shop_id': shop_id,
+            'shop_name': shop_name,
+            'sold_number': sold_number,
+            'fans': fans
+        }
+        log.debug(f"Parsed shop data: {data_dict}")
+        info_list.append(data_dict)
+
+    # 保存/更新 根据shop_id判断 是否存在,存在则更新,不存在则插入
+    sql = "INSERT INTO zc_shop_record (shop_id, shop_name, sold_number, fans) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE shop_name=VALUES(shop_name), sold_number=VALUES(sold_number), fans=VALUES(fans)"
+    # 将字典列表转换为元组列表
+    args_list = [tuple(d.values()) for d in info_list]
+    sql_pool.insert_many(query=sql, args_list=args_list)
+
+
+@retry(stop=stop_after_attempt(3), wait=wait_fixed(1), after=after_log)
+def get_video(log, token, pid):
+    """
+    获取活动视频信息
+
+    :param log: 日志对象
+    :param token: Authorization token
+    :param pid: 活动ID
+    :return: (live_id, open_time, close_time, video_url)
+    """
+    url = "https://cashier.yqszpay.com/zc-api/live/actLive/getMerLiveInfo"
+    request_data = {'actId': pid}
+    log.debug(f"获取视频信息,actId: {pid}")
+    resp_data = make_encrypted_post_request(
+        log, url, request_data,
+        extra_headers={"Authorization": token}
+    )
+    # log.debug(f"视频响应: {resp_data}")
+
+    live_id = resp_data.get('live', {}).get('liveId')
+    live_open_time = resp_data.get('live', {}).get('openTime')
+    live_close_time = resp_data.get('live', {}).get('closeTime')
+    video_url = resp_data.get('live', {}).get('videoUrl')
+    return live_id, live_open_time, live_close_time, video_url
+
+
+def parse_sold_data(log, token, items, sql_pool):
+    """
+    解析商品数据
+
+    :param log: 日志对象
+    :param token: Authorization token
+    :param items: 商品列表
+    :param sql_pool: MySQL连接池
+    :return: 解析后的数据列表
+    """
+    info_list = []
+    for item in items:
+        # log.debug(f"Processing sold item: {item}")
+        shop_id = item.get('merNo')  # 商户编号
+        pid = item.get('id')
+        act_day = item.get('actDay')  # 活动天数
+        act_logo = item.get('actLogo')
+        act_name = item.get('actName')  # 活动名称
+        act_no = item.get('actNo')  # 活动编号
+        act_status = item.get('actStatus')  # 活动状态
+        startDate = item.get('startDate')  # 开始时间
+        endDate = item.get('endDate')  # 结束时间
+        storageId = item.get('storageId')  # 存储ID
+        storageName = item.get('storageName')  # 存储名称
+        unitPrice = item.get('unitPrice')  # 单价
+        sumPrice = item.get('sumPrice')  # 总价
+        reality_price = item.get('realityPrice')  # 实际价格
+        packageNumber = item.get('packageNumber')  # 包配置
+        schedule = item.get('schedule')  # 库存
+
+        live_id, live_open_time, live_close_time, video_url = get_video(log, token, pid)
+
+        data_dict = {
+            'shop_id': shop_id,
+            'pid': pid,
+            'act_day': act_day,
+            'act_img': act_logo,
+            'act_name': act_name,
+            'act_no': act_no,
+            'act_status': act_status,
+            'start_date': startDate,
+            'end_date': endDate,
+            'storage_id': storageId,
+            'storage_name': storageName,
+            'unit_price': unitPrice,
+            'sum_price': sumPrice,
+            'reality_price': reality_price,
+            'package_number': packageNumber,
+            'schedule': schedule,
+            'live_id': live_id,
+            'live_open_time': live_open_time,
+            'live_close_time': live_close_time,
+            'video_url': video_url
+        }
+        # log.debug(f"Parsed sold data: {data_dict}")
+        # { 'live_close_time': None, 'video_url': None}
+        info_list.append(data_dict)
+
+    # 保存数据
+    sql_pool.insert_many(table='zc_product_record', data_list=info_list, ignore=True)
+
+
+def parse_player_data(log, items, sql_pool):
+    """
+    解析玩家数据
+
+    :param log: 日志对象
+    :param items: 玩家列表
+    :param sql_pool: MySQL连接池
+    :return: 解析后的数据列表
+    """
+    log.debug(f"Parsing player data...........")
+    info_list = []
+    for item in items:
+        # log.debug(f"Processing player item: {item}")
+        pid = item.get('actId')
+        player_id = item.get('id')
+        order_id = item.get('orderId')
+        secret_name = item.get('secretName')
+        add_time = item.get('addTime')
+        user_id = item.get('userId')
+        user_name = item.get('user_name')
+        data_dict = {
+            'pid': pid,
+            'player_id': player_id,
+            'order_id': order_id,
+            'secret_name': secret_name,
+            'add_time': add_time,
+            'user_id': user_id,
+            'user_name': user_name
+        }
+        # log.debug(f"Parsed player data: {data_dict}")
+        info_list.append(data_dict)
+
+    # 保存数据
+    sql_pool.insert_many(table='zc_player_record', data_list=info_list, ignore=True)
+
+
+def get_shop_list(log, sql_pool):
+    """
+    商户列表翻页生成器
+
+    :param log: 日志对象
+    :param sql_pool: MySQL连接池
+    """
+    page_num = 1
+    total = 0
+
+    while page_num <= 100:
+        result = get_shop_single_page(log, page_num, PAGE_SIZE)
+        # print(result)
+        if result is None:
+            log.error(f"第 {page_num} 页请求失败,停止翻页")
+            break
+
+        data_list = result.get('rows', [])
+        parse_shop_data(log, data_list, sql_pool)
+
+        # 获取总条数(第一页时获取)
+        if total is None and 'total' in result:
+            total = result['total']
+            log.info(f"总记录数: {total}")
+
+        # 检查是否有数据
+        if len(data_list) == 0:
+            log.info(f"第 {page_num} 页无数据,停止翻页")
+            break
+
+        # 根据total判断是否超出范围
+        if total is not None and (page_num - 1) * PAGE_SIZE >= total:
+            log.info(f"已遍历完所有数据,停止翻页")
+            break
+
+        log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}")
+
+        page_num += 1
+
+
+def get_sold_list(log, shop_id, token, sql_pool):
+    """
+    商品列表翻页生成器
+
+    :param log: 日志对象
+    :param shop_id: shop_id
+    :param token: Authorization token
+    :param sql_pool: MySQL连接池
+    """
+    page_num = 1
+    max_pages = 1000
+
+    while page_num <= max_pages:
+        result = get_sold_single_page(log, shop_id, page_num)
+        # print(result)
+        if result is None:
+            log.error(f"第 {page_num} 页请求失败,停止翻页")
+            break
+
+        data_list = result.get('rows', [])
+        parse_sold_data(log, token, data_list, sql_pool)
+
+        # 检查是否有数据
+        if len(data_list) < 10:
+            log.info(f"第 {page_num} 页无数据,停止翻页")
+            break
+
+        log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}")
+
+        page_num += 1
+
+
+def get_player_list(log, act_id, token, sql_pool):
+    """
+    玩家列表翻页生成器
+
+    :param log: 日志对象
+    :param act_id: 活动ID
+    :param token: Authorization token
+    :param sql_pool: MySQL连接池
+    :return: has_data (True: 有数据, False: 无数据)
+    """
+    page_num = 1
+    max_pages = 1000
+    has_data = False
+
+    while page_num <= max_pages:
+        result = get_player_single_page(log, act_id, token, page_num)
+        if result is None:
+            log.error(f"第 {page_num} 页请求失败,停止翻页")
+            break
+
+        data_list = result.get('rows', [])
+
+        # 如果有数据才解析
+        if len(data_list) > 0:
+            has_data = True
+            parse_player_data(log, data_list, sql_pool)
+
+        # 检查是否有数据
+        if len(data_list) < 10:
+            log.info(f"第 {page_num} 页无数据,停止翻页")
+            break
+
+        log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}")
+
+        page_num += 1
+
+    return has_data
+
+
+@retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
+def zc_main(log):
+    """
+    主函数
+
+    :param log: logger对象
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool.check_pool_health():
+        log.error("数据库连接池异常")
+        raise RuntimeError("数据库连接池异常")
+
+    try:
+        # 获取 token
+        token_row = sql_pool.select_one("SELECT token FROM zc_token WHERE id = 1")
+        if not token_row:
+            log.error("未查询到 token")
+            return
+        token = token_row[0]
+
+        # player test
+        # has_data = get_player_list(log, 1800, token, sql_pool)
+
+        # 获取shop data
+        try:
+            get_shop_list(logger, sql_pool)
+        except Exception as e:
+            log.error(f'iterate_shop_list error: {e}')
+
+        time.sleep(5)
+
+        # 获取sold data - 遍历所有商户
+        try:
+            # 从 shop 表查询所有 merNo
+            mer_no_rows = sql_pool.select_all("SELECT shop_id FROM zc_shop_record WHERE sold_number != 0")
+            mer_no_list = [row[0] for row in mer_no_rows] if mer_no_rows else []
+            log.info(f"查询到 {len(mer_no_list)} 个商户编号: {mer_no_list}")
+            for shop_id in mer_no_list:
+                log.info(f"开始爬取商户 {shop_id} 的商品数据")
+                get_sold_list(log, shop_id, token, sql_pool)
+        except Exception as e:
+            log.error(f'get_sold_list error: {e}')
+
+        time.sleep(5)
+
+        # 获取player data - 遍历所有活动
+        try:
+            # 从 sold 表查询所有 actId
+            act_id_rows = sql_pool.select_all("SELECT pid FROM zc_product_record WHERE player_state = 0")
+            act_id_list = [row[0] for row in act_id_rows] if act_id_rows else []
+            log.info(f"查询到 {len(act_id_list)} 个活动ID")
+
+            for act_id in act_id_list:
+                try:
+                    # 先将当前 pid 的状态改为 1,表示开始查询
+                    sql_pool.update_one("UPDATE zc_product_record SET player_state = 1 WHERE pid = %s", (act_id,))
+                    log.info(f"将 pid: {act_id} 的状态更新为 1(开始查询)")
+
+                    log.info(f"开始爬取pid: {act_id} 的玩家数据")
+                    has_data = get_player_list(log, act_id, token, sql_pool)
+
+                    # 根据是否有数据更新状态
+                    if has_data:
+                        log.info(f"pid: {act_id} 查询到数据,状态保持为 1")
+                    else:
+                        log.info(f"pid: {act_id} 没有数据,状态更新为 2")
+                        sql_pool.update_one("UPDATE zc_product_record SET player_state = 2 WHERE pid = %s", (act_id,))
+
+                except Exception as pid_error:
+                    # 如果查询失败,将状态改为 3
+                    log.error(f"pid: {act_id} 查询失败,错误: {pid_error}")
+                    try:
+                        sql_pool.update_one("UPDATE zc_product_record SET player_state = 3 WHERE pid = %s", (act_id,))
+                        log.info(f"已将 pid: {act_id} 的状态更新为 3(查询异常)")
+                    except Exception as update_error:
+                        log.error(f"更新 pid: {act_id} 状态失败: {update_error}")
+        except Exception as e:
+            log.error(f'iterate_player_list error: {e}')
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+if __name__ == '__main__':
+    # 获取单页数据
+    # result = get_sold_single_page(logger, 'ZC10264451', page_num=1, page_size=20)
+    # result = get_shop_single_page(logger, page_num=1, page_size=10)
+    # result = get_player_single_page(logger, 1520, 'your_token', page_num=1, page_size=10)
+    # print("单页数据:", result)
+
+    zc_main(logger)
+    # get_vodeo(logger, 'your_token', 1726)
+    # sql_pool = MySQLConnectionPool(log=logger)
+    # get_shop_list(logger, sql_pool)
+    # get_shop_single_page(logger, 1, 10)

+ 45 - 0
zc_spider/zc_login.py

@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/2/27 11:23
+import requests
+
+headers = {
+    "User-Agent": "Mozilla/5.0 (Linux; Android 11; Pixel 5 Build/RQ3A.211001.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/83.0.4103.106 Mobile Safari/537.36 uni-app Html5Plus/1.0 (Immersed/52.727272)",
+    "Connection": "Keep-Alive",
+    "Accept-Encoding": "gzip",
+    "Content-Type": "application/json",
+    "channelNo": "88888888",
+    "version": "1.9.9.82537"
+}
+
+
+def send_sms():
+
+    url = "https://cashier.yqszpay.com/zc-api/sms/sendAuthSms"
+    data = {
+        "data": "D6CCC8AFFD6F06E7D8E389265F24691BA706F607DF1C9E6AB7F6531D910C77F32B8CC460553BA9ADB42CD888FE6769EA"
+    }
+    # {'linkPhone': '19521500850', 'sendType': 1}
+    response = requests.post(url, headers=headers, json=data)
+
+    print(response.text)
+    print(response)
+
+
+def login():
+    url = "https://cashier.yqszpay.com/zc-api/login"
+    data = {
+        "data": "CCB9574EC5AE365A9BE0501C77E7C211F75F26DF58A4F0E83C5A853D367BBA71A77B1BF52A093F9CBA3381DA9DFBA964083C95B399B697AF243FF4DB0A2DDEF1"
+    }
+    # {'phone': '19521500850', 'smsCode': '695538', 'loginType': 2}
+    response = requests.post(url, headers=headers, json=data)
+
+    print(response.text)
+    print(response)
+
+
+if __name__ == '__main__':
+    send_sms()
+    sms_code = input('请输入验证码:')
+    login()

+ 545 - 0
zc_spider/zc_new_daily_spider.py

@@ -0,0 +1,545 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/2/27 11:22
+import time
+import inspect
+import requests
+import schedule
+import user_agent
+from loguru import logger
+from crypto_utils import CryptoHelper
+from mysql_pool import MySQLConnectionPool
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+logger.remove()
+logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+# 基础配置
+BASE_URL = "https://cashier.yqszpay.com"
+PAGE_SIZE = 10
+
+headers = {
+    "User-Agent": user_agent.generate_user_agent(),
+    "Connection": "Keep-Alive",
+    "Accept-Encoding": "gzip",
+    "Content-Type": "application/json",
+    "channelNo": "88888888",
+    "pageSize": str(PAGE_SIZE),
+    # "pageNum": 1,
+    "version": "1.9.9.82537"
+}
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys(log):
+    """
+    获取代理配置
+
+    :param log: 日志对象
+    :return: 代理字典
+    """
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def make_encrypted_post_request(log, url: str, request_data: dict, extra_headers: dict = None):
+    """
+    通用加密POST请求函数(带重试机制)
+
+    :param log: 日志对象
+    :param url: 请求URL
+    :param request_data: 请求数据字典(会被加密)
+    :param extra_headers: 额外的请求头
+    :return: 解密后的响应数据,失败返回None
+    """
+    request_headers = headers.copy()
+    if extra_headers:
+        request_headers.update(extra_headers)
+
+    log.debug(f"Request URL: {url}, Data: {request_data}")
+
+    encrypted_body = CryptoHelper.encrypt_request_data(request_data)
+    # print(request_headers)
+    response = requests.post(url, headers=request_headers, json=encrypted_body, timeout=30)
+    # response.raise_for_status()
+
+    if response.status_code == 200:
+        response_json = response.json()
+        # log.debug(f"Raw response: {response_json}")
+
+        if 'data' in response_json:
+            decrypted = CryptoHelper.decrypt_response_data(response_json)
+            # log.debug(f"Decrypted response: {decrypted}")
+            return decrypted
+        return response_json
+    else:
+        log.error(f"请求失败: {response.status_code}, Response: {response.text}")
+        return None
+
+
+def get_shop_single_page(log, page_num, page_size=PAGE_SIZE):
+    """
+    获取商户列表(支持翻页)
+
+    :param log: 日志对象
+    :param page_num: 页码
+    :param page_size: 每页条数
+    """
+    log.debug(f"Getting shop list, page: {page_num}")
+    url = f"{BASE_URL}/zc-api/merchant/getMerMyList"
+    request_data = {'pageNum': page_num, 'pageSize': page_size}
+    try:
+        resp = make_encrypted_post_request(log, url, request_data, extra_headers={"pageNum": str(page_num)})
+    except Exception as e:
+        log.error(f"Error getting shop list: {e}")
+        resp = None
+    return resp
+
+
+def get_sold_single_page(log, mer_no, page_num):
+    """
+    获取商品列表(支持翻页)
+
+    :param log: 日志对象
+    :param mer_no: 商户编号
+    :param page_num: 页码
+    """
+    log.info(f"Getting sold items for mer_no: {mer_no}, page: {page_num}")
+    url = f"{BASE_URL}/zc-api/act/actProduct/getActList"
+    request_data = {
+        'merNo': mer_no,
+        'pageNum': page_num,
+        'pageSize': PAGE_SIZE,
+        'queryType': 1
+    }
+    return make_encrypted_post_request(log, url, request_data, extra_headers={"pageNum": str(page_num)})
+
+
+def get_player_single_page(log, act_id, token, page_num, page_size=PAGE_SIZE):
+    """
+    获取玩家列表(支持翻页)
+
+    :param log: 日志对象
+    :param act_id: 活动ID
+    :param token: Authorization token
+    :param page_num: 页码
+    :param page_size: 每页条数
+    """
+    log.debug(f"Getting player list for act_id: {act_id}, page: {page_num}")
+    url = f"{BASE_URL}/zc-api/act/actOrder/getActOrderPublicDetails"
+    request_data = {'actId': act_id, 'pageNum': page_num, 'pageSize': page_size}
+    return make_encrypted_post_request(
+        log, url, request_data,
+        extra_headers={"Authorization": token, "pageNum": str(page_num)}
+    )
+
+
+def parse_shop_data(log, items, sql_pool):
+    """
+    解析商户数据
+
+    :param log: 日志对象
+    :param items: 商户列表
+    :param sql_pool: MySQL连接池
+    :return: 解析后的数据列表
+    """
+    log.debug(f"Parsing shop data...........")
+    info_list = []
+    for item in items:
+        # log.debug(f"Processing shop item: {item}")
+        shop_id = item.get('merNo')
+        shop_name = item.get('merName')
+        sold_number = item.get('spell_number')
+        # link_man = item.get('linkMan')
+        # user_id = item.get('userId')
+        fans = item.get('attentionNumber')
+        data_dict = {
+            'shop_id': shop_id,
+            'shop_name': shop_name,
+            'sold_number': sold_number,
+            'fans': fans
+        }
+        log.debug(f"Parsed shop data: {data_dict}")
+        info_list.append(data_dict)
+
+    # 保存/更新 根据shop_id判断 是否存在,存在则更新,不存在则插入
+    sql = "INSERT INTO zc_shop_record (shop_id, shop_name, sold_number, fans) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE shop_name=VALUES(shop_name), sold_number=VALUES(sold_number), fans=VALUES(fans)"
+    # 将字典列表转换为元组列表
+    args_list = [tuple(d.values()) for d in info_list]
+    sql_pool.insert_many(query=sql, args_list=args_list)
+
+
+@retry(stop=stop_after_attempt(3), wait=wait_fixed(1), after=after_log)
+def get_video(log, token, pid):
+    """
+    获取活动视频信息
+
+    :param log: 日志对象
+    :param token: Authorization token
+    :param pid: 活动ID
+    :return: (live_id, open_time, close_time, video_url)
+    """
+    url = "https://cashier.yqszpay.com/zc-api/live/actLive/getMerLiveInfo"
+    request_data = {'actId': pid}
+    log.debug(f"获取视频信息,actId: {pid}")
+    resp_data = make_encrypted_post_request(
+        log, url, request_data,
+        extra_headers={"Authorization": token}
+    )
+    # log.debug(f"视频响应: {resp_data}")
+
+    live_id = resp_data.get('live', {}).get('liveId')
+    live_open_time = resp_data.get('live', {}).get('openTime')
+    live_close_time = resp_data.get('live', {}).get('closeTime')
+    video_url = resp_data.get('live', {}).get('videoUrl')
+    return live_id, live_open_time, live_close_time, video_url
+
+
+def parse_sold_data(log, token, items, sql_pool):
+    """
+    解析商品数据
+
+    :param log: 日志对象
+    :param token: Authorization token
+    :param items: 商品列表
+    :param sql_pool: MySQL连接池
+    :return: 解析后的数据列表
+    """
+    info_list = []
+    for item in items:
+        # log.debug(f"Processing sold item: {item}")
+        shop_id = item.get('merNo')  # 商户编号
+        pid = item.get('id')
+        act_day = item.get('actDay')  # 活动天数
+        act_logo = item.get('actLogo')
+        act_name = item.get('actName')  # 活动名称
+        act_no = item.get('actNo')  # 活动编号
+        act_status = item.get('actStatus')  # 活动状态
+        startDate = item.get('startDate')  # 开始时间
+        endDate = item.get('endDate')  # 结束时间
+        storageId = item.get('storageId')  # 存储ID
+        storageName = item.get('storageName')  # 存储名称
+        unitPrice = item.get('unitPrice')  # 单价
+        sumPrice = item.get('sumPrice')  # 总价
+        reality_price = item.get('realityPrice')  # 实际价格
+        packageNumber = item.get('packageNumber')  # 包配置
+        schedule_ = item.get('schedule')  # 库存
+
+        live_id, live_open_time, live_close_time, video_url = get_video(log, token, pid)
+
+        data_dict = {
+            'shop_id': shop_id,
+            'pid': pid,
+            'act_day': act_day,
+            'act_img': act_logo,
+            'act_name': act_name,
+            'act_no': act_no,
+            'act_status': act_status,
+            'start_date': startDate,
+            'end_date': endDate,
+            'storage_id': storageId,
+            'storage_name': storageName,
+            'unit_price': unitPrice,
+            'sum_price': sumPrice,
+            'reality_price': reality_price,
+            'package_number': packageNumber,
+            'schedule': schedule_,
+            'live_id': live_id,
+            'live_open_time': live_open_time,
+            'live_close_time': live_close_time,
+            'video_url': video_url
+        }
+        # log.debug(f"Parsed sold data: {data_dict}")
+        # { 'live_close_time': None, 'video_url': None}
+        info_list.append(data_dict)
+
+    # 保存数据
+    sql_pool.insert_many(table='zc_product_record', data_list=info_list, ignore=True)
+
+
+def parse_player_data(log, items, sql_pool):
+    """
+    解析玩家数据
+
+    :param log: 日志对象
+    :param items: 玩家列表
+    :param sql_pool: MySQL连接池
+    :return: 解析后的数据列表
+    """
+    log.debug(f"Parsing player data...........")
+    info_list = []
+    for item in items:
+        # log.debug(f"Processing player item: {item}")
+        pid = item.get('actId')
+        player_id = item.get('id')
+        order_id = item.get('orderId')
+        secret_name = item.get('secretName')
+        add_time = item.get('addTime')
+        user_id = item.get('userId')
+        user_name = item.get('user_name')
+        data_dict = {
+            'pid': pid,
+            'player_id': player_id,
+            'order_id': order_id,
+            'secret_name': secret_name,
+            'add_time': add_time,
+            'user_id': user_id,
+            'user_name': user_name
+        }
+        # log.debug(f"Parsed player data: {data_dict}")
+        info_list.append(data_dict)
+
+    # 保存数据
+    sql_pool.insert_many(table='zc_player_record', data_list=info_list, ignore=True)
+
+
+def get_shop_list(log, sql_pool):
+    """
+    商户列表翻页生成器
+
+    :param log: 日志对象
+    :param sql_pool: MySQL连接池
+    """
+    page_num = 1
+    total = 0
+
+    while page_num <= 100:
+        result = get_shop_single_page(log, page_num, PAGE_SIZE)
+        # print(result)
+        if result is None:
+            log.error(f"第 {page_num} 页请求失败,停止翻页")
+            break
+
+        data_list = result.get('rows', [])
+        parse_shop_data(log, data_list, sql_pool)
+
+        # 获取总条数(第一页时获取)
+        if total is None and 'total' in result:
+            total = result['total']
+            log.info(f"总记录数: {total}")
+
+        # 检查是否有数据
+        if len(data_list) == 0:
+            log.info(f"第 {page_num} 页无数据,停止翻页")
+            break
+
+        # 根据total判断是否超出范围
+        if total is not None and (page_num - 1) * PAGE_SIZE >= total:
+            log.info(f"已遍历完所有数据,停止翻页")
+            break
+
+        log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}")
+
+        page_num += 1
+
+
+def get_sold_list(log, shop_id, token, sql_pool):
+    """
+    商品列表翻页生成器
+
+    :param log: 日志对象
+    :param shop_id: shop_id
+    :param token: Authorization token
+    :param sql_pool: MySQL连接池
+    """
+    page_num = 1
+    max_pages = 5
+
+    while page_num <= max_pages:
+        result = get_sold_single_page(log, shop_id, page_num)
+        # print(result)
+        if result is None:
+            log.error(f"第 {page_num} 页请求失败,停止翻页")
+            break
+
+        data_list = result.get('rows', [])
+        parse_sold_data(log, token, data_list, sql_pool)
+
+        # 检查是否有数据
+        if len(data_list) < 10:
+            log.info(f"第 {page_num} 页无数据,停止翻页")
+            break
+
+        log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}")
+
+        page_num += 1
+
+
+def get_player_list(log, act_id, token, sql_pool):
+    """
+    玩家列表翻页生成器
+
+    :param log: 日志对象
+    :param act_id: 活动ID
+    :param token: Authorization token
+    :param sql_pool: MySQL连接池
+    :return: has_data (True: 有数据, False: 无数据)
+    """
+    page_num = 1
+    max_pages = 1000
+    has_data = False
+
+    while page_num <= max_pages:
+        result = get_player_single_page(log, act_id, token, page_num)
+        if result is None:
+            log.error(f"第 {page_num} 页请求失败,停止翻页")
+            break
+
+        data_list = result.get('rows', [])
+
+        # 如果有数据才解析
+        if len(data_list) > 0:
+            has_data = True
+            parse_player_data(log, data_list, sql_pool)
+
+        # 检查是否有数据
+        if len(data_list) < 10:
+            log.info(f"第 {page_num} 页无数据,停止翻页")
+            break
+
+        log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}")
+
+        page_num += 1
+
+    return has_data
+
+
+@retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
+def zc_main(log):
+    """
+    主函数
+
+    :param log: logger对象
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool.check_pool_health():
+        log.error("数据库连接池异常")
+        raise RuntimeError("数据库连接池异常")
+
+    try:
+        # 获取 token
+        token_row = sql_pool.select_one("SELECT token FROM zc_token WHERE id = 1")
+        if not token_row:
+            log.error("未查询到 token")
+            return
+        token = token_row[0]
+
+        # player test
+        # has_data = get_player_list(log, 1800, token, sql_pool)
+
+        # 获取shop data
+        try:
+            get_shop_list(logger, sql_pool)
+        except Exception as e:
+            log.error(f'iterate_shop_list error: {e}')
+
+        time.sleep(5)
+
+        # 获取sold data - 遍历所有商户
+        try:
+            # 从 shop 表查询所有 merNo
+            mer_no_rows = sql_pool.select_all("SELECT shop_id FROM zc_shop_record WHERE sold_number != 0")
+            mer_no_list = [row[0] for row in mer_no_rows] if mer_no_rows else []
+            log.info(f"查询到 {len(mer_no_list)} 个商户编号: {mer_no_list}")
+            for shop_id in mer_no_list:
+                log.info(f"开始爬取商户 {shop_id} 的商品数据")
+                get_sold_list(log, shop_id, token, sql_pool)
+        except Exception as e:
+            log.error(f'get_sold_list error: {e}')
+
+        time.sleep(5)
+
+        # 获取player data - 遍历所有活动
+        try:
+            # 从 sold 表查询所有 actId
+            act_id_rows = sql_pool.select_all("SELECT pid FROM zc_product_record WHERE player_state = 0")
+            act_id_list = [row[0] for row in act_id_rows] if act_id_rows else []
+            log.info(f"查询到 {len(act_id_list)} 个活动ID")
+
+            for act_id in act_id_list:
+                try:
+                    # 先将当前 pid 的状态改为 1,表示开始查询
+                    sql_pool.update_one("UPDATE zc_product_record SET player_state = 1 WHERE pid = %s", (act_id,))
+                    log.info(f"将 pid: {act_id} 的状态更新为 1(开始查询)")
+
+                    log.info(f"开始爬取pid: {act_id} 的玩家数据")
+                    has_data = get_player_list(log, act_id, token, sql_pool)
+
+                    # 根据是否有数据更新状态
+                    if has_data:
+                        log.info(f"pid: {act_id} 查询到数据,状态保持为 1")
+                    else:
+                        log.info(f"pid: {act_id} 没有数据,状态更新为 2")
+                        sql_pool.update_one("UPDATE zc_product_record SET player_state = 2 WHERE pid = %s", (act_id,))
+
+                except Exception as pid_error:
+                    # 如果查询失败,将状态改为 3
+                    log.error(f"pid: {act_id} 查询失败,错误: {pid_error}")
+                    try:
+                        sql_pool.update_one("UPDATE zc_product_record SET player_state = 3 WHERE pid = %s", (act_id,))
+                        log.info(f"已将 pid: {act_id} 的状态更新为 3(查询异常)")
+                    except Exception as update_error:
+                        log.error(f"更新 pid: {act_id} 状态失败: {update_error}")
+        except Exception as e:
+            log.error(f'iterate_player_list error: {e}')
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+def schedule_task():
+    """
+    爬虫模块 定时任务 的启动文件
+    """
+    # 立即运行一次任务
+    zc_main(log=logger)
+
+    # 设置定时任务
+    schedule.every().day.at("00:01").do(zc_main, log=logger)
+
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+
+if __name__ == '__main__':
+    # zc_main(logger)
+    schedule_task()

Some files were not shown because too many files changed in this diff