瀏覽代碼

feat(mysql): 新增MySQL连接池模块并支持多种数据库操作接口

- 实现MySQL连接池类,支持连接复用与池管理
- 添加单条和批量插入接口,支持字典和原始SQL
- 实现插入冲突时批量降级为单条插入策略
- 添加单条和批量更新接口,支持条件构造与多种数据格式
- 提供连接池健康检测和安全的SQL标识符校验
- 配置MySQL连接信息读取自YAML文件
- 引入必要依赖库版本管理文件requirements.txt
- 完成新增爬虫获取评级日期和分类统计的功能脚本
- 评级日期爬虫支持多XPath路径匹配及自动重试机制
- 分类统计爬虫支持多类别周期爬取与异常处理
- 使用Chromium浏览器驱动进行页面数据抓取
- 所有模块日志均配置循环文件输出与详细日志等级管理
charley 1 月之前
父節點
當前提交
b694895247

+ 78 - 0
tag_spider/YamlLoader.py

@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/12/22 10:44
+import os, re
+import yaml
+
+regex = re.compile(r'^\$\{(?P<ENV>[A-Z_\-]+:)?(?P<VAL>[\w.]+)}$')
+
+
+class YamlConfig:
+    def __init__(self, config):
+        self.config = config
+    
+    def get(self, key: str):
+        return YamlConfig(self.config.get(key))
+    
+    def getValueAsString(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return os.getenv(env, group['VAL'])
+            return None
+        except:
+            return self.config[key]
+    
+    def getValueAsInt(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return int(os.getenv(env, group['VAL']))
+            return 0
+        except:
+            return int(self.config[key])
+    
+    def getValueAsBool(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return bool(os.getenv(env, group['VAL']))
+            return False
+        except:
+            return bool(self.config[key])
+
+
+def readYaml(path: str = 'application.yml', profile: str = None) -> YamlConfig:
+    if os.path.exists(path):
+        with open(path) as fd:
+            conf = yaml.load(fd, Loader=yaml.FullLoader)
+    
+    if profile is not None:
+        result = path.split('.')
+        profiledYaml = f'{result[0]}-{profile}.{result[1]}'
+        if os.path.exists(profiledYaml):
+            with open(profiledYaml) as fd:
+                conf.update(yaml.load(fd, Loader=yaml.FullLoader))
+    
+    return YamlConfig(conf)
+
+# res = readYaml()
+# mysqlConf = res.get('mysql')
+# print(mysqlConf)
+
+# print(res.getValueAsString("host"))
+# mysqlYaml = mysqlConf.getValueAsString("host")
+# print(mysqlYaml)
+# host = mysqlYaml.get("host").split(':')[-1][:-1]
+# port = mysqlYaml.get("port").split(':')[-1][:-1]
+# username = mysqlYaml.get("username").split(':')[-1][:-1]
+# password = mysqlYaml.get("password").split(':')[-1][:-1]
+# mysql_db = mysqlYaml.get("db").split(':')[-1][:-1]
+# print(host,port,username,password)

+ 11 - 0
tag_spider/application.yml

@@ -0,0 +1,11 @@
+mysql:
+  host: ${MYSQL_HOST:100.64.0.21}
+  port: ${MYSQL_PROT:3306}
+  username: ${MYSQL_USERNAME:crawler}
+  password: ${MYSQL_PASSWORD:Pass2022}
+  db: ${MYSQL_DATABASE:crawler}
+
+fluent:
+  host: ${FIUENT_HOST:192.168.66.152}
+  port: ${FIUENT_PORT:24225}
+  appname: ${FIUENT_APPNAME:psa_spider.log}

+ 625 - 0
tag_spider/mysql_pool.py

@@ -0,0 +1,625 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/3/25 14:14
+import re
+import pymysql
+import YamlLoader
+from loguru import logger
+from dbutils.pooled_db import PooledDB
+
+# 获取yaml配置
+yaml = YamlLoader.readYaml()
+mysqlYaml = yaml.get("mysql")
+sql_host = mysqlYaml.getValueAsString("host")
+sql_port = mysqlYaml.getValueAsInt("port")
+sql_user = mysqlYaml.getValueAsString("username")
+sql_password = mysqlYaml.getValueAsString("password")
+sql_db = mysqlYaml.getValueAsString("db")
+
+
+class MySQLConnectionPool:
+    """
+    MySQL连接池
+    """
+
+    def __init__(self, mincached=4, maxcached=5, maxconnections=10, log=None):
+        """
+        初始化连接池
+        :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
+        :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
+        :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
+        :param log: 自定义日志记录器
+        """
+        # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
+        self.log = log or logger
+        self.pool = PooledDB(
+            creator=pymysql,
+            mincached=mincached,
+            maxcached=maxcached,
+            maxconnections=maxconnections,
+            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
+            host=sql_host,
+            port=sql_port,
+            user=sql_user,
+            password=sql_password,
+            database=sql_db,
+            ping=0  # 每次连接使用时自动检查有效性(0=不检查,1=执行query前检查,2=每次执行前检查)
+        )
+
+    def _execute(self, query, args=None, commit=False):
+        """
+        执行SQL
+        :param query: SQL语句
+        :param args: SQL参数
+        :param commit: 是否提交事务
+        :return: 查询结果
+        """
+        try:
+            with self.pool.connection() as conn:
+                with conn.cursor() as cursor:
+                    cursor.execute(query, args)
+                    if commit:
+                        conn.commit()
+                    self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
+                    return cursor
+        except Exception as e:
+            if commit:
+                conn.rollback()
+            self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
+            raise e
+
+    def select_one(self, query, args=None):
+        """
+        执行查询,返回单个结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchone()
+
+    def select_all(self, query, args=None):
+        """
+        执行查询,返回所有结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchall()
+
+    def insert_one(self, query, args):
+        """
+        执行单条插入语句
+        :param query: 插入语句
+        :param args: 插入参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        cursor = self._execute(query, args, commit=True)
+        return cursor.lastrowid  # 返回插入的ID
+
+    def insert_all(self, query, args_list):
+        """
+        执行批量插入语句,如果失败则逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql insert_all, SQL: {query[:100]}..., Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except pymysql.err.IntegrityError as e:
+            if "Duplicate entry" in str(e):
+                conn.rollback()
+                self.log.warning(f"批量插入遇到重复,开始逐条插入。错误: {e}")
+                rowcount = 0
+                for args in args_list:
+                    try:
+                        self.insert_one(query, args)
+                        rowcount += 1
+                    except pymysql.err.IntegrityError as e2:
+                        if "Duplicate entry" in str(e2):
+                            self.log.debug(f"跳过重复条目: {e2}")
+                        else:
+                            self.log.error(f"插入失败: {e2}")
+                    except Exception as e2:
+                        self.log.error(f"插入失败: {e2}")
+                self.log.info(f"逐条插入完成: {rowcount}/{len(args_list)}条")
+            else:
+                conn.rollback()
+                self.log.exception(f"数据库完整性错误: {e}")
+                raise e
+        except Exception as e:
+            conn.rollback()
+            self.log.exception(f"批量插入失败: {e}")
+            raise e
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def insert_one_or_dict(self, table=None, data=None, query=None, args=None, commit=True, ignore=False):
+        """
+        单条插入(支持字典或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data: 字典数据 {列名: 值}
+        :param query: 直接SQL语句(与data二选一)
+        :param args: SQL参数(query使用时必需)
+        :param commit: 是否自动提交
+        :param ignore: 是否使用ignore
+        :return: 最后插入ID
+        """
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data.keys()])
+            values = ', '.join(['%s'] * len(data))
+
+            # 构建 INSERT IGNORE 语句
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args = tuple(data.values())
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        try:
+            cursor = self._execute(query, args, commit)
+            self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+            return cursor.lastrowid
+        except pymysql.err.IntegrityError as e:
+            if "Duplicate entry" in str(e):
+                self.log.warning(f"插入失败:重复条目,已跳过。错误详情: {e}")
+                return -1  # 返回 -1 表示重复条目被跳过
+            else:
+                self.log.exception(f"数据库完整性错误: {e}")
+                raise
+        except Exception as e:
+            self.log.exception(f"未知错误: {e}")
+            raise
+
+    def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
+                    ignore=False):
+        """
+        批量插入(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :param ignore: 是否使用ignore
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+
+            # 构建 INSERT IGNORE 语句
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+            except pymysql.err.IntegrityError as e:
+                # 处理唯一索引冲突
+                if "Duplicate entry" in str(e):
+                    if ignore:
+                        # 如果使用了 INSERT IGNORE,理论上不会进这里,但以防万一
+                        self.log.warning(f"批量插入遇到重复条目(ignore模式): {e}")
+                    else:
+                        # 没有使用 IGNORE,降级为逐条插入
+                        self.log.warning(f"批量插入遇到重复条目,开始逐条插入。错误: {e}")
+                        if commit:
+                            conn.rollback()
+                        
+                        rowcount = 0
+                        for j, args in enumerate(batch):
+                            try:
+                                if data_list:
+                                    # 字典模式
+                                    self.insert_one_or_dict(
+                                        table=table,
+                                        data=dict(zip(data_list[0].keys(), args)),
+                                        commit=commit,
+                                        ignore=False  # 单条插入时手动捕获重复
+                                    )
+                                else:
+                                    # 原始SQL模式
+                                    self.insert_one(query, args)
+                                rowcount += 1
+                            except pymysql.err.IntegrityError as e2:
+                                if "Duplicate entry" in str(e2):
+                                    self.log.debug(f"跳过重复条目[{i+j+1}]: {e2}")
+                                else:
+                                    self.log.error(f"插入失败[{i+j+1}]: {e2}")
+                            except Exception as e2:
+                                self.log.error(f"插入失败[{i+j+1}]: {e2}")
+                        total += rowcount
+                        self.log.info(f"批次逐条插入完成: 成功{rowcount}/{len(batch)}条")
+                else:
+                    # 其他完整性错误
+                    self.log.exception(f"数据库完整性错误: {e}")
+                    if commit:
+                        conn.rollback()
+                    raise e
+            except Exception as e:
+                # 其他数据库错误
+                self.log.exception(f"批量插入失败: {e}")
+                if commit:
+                    conn.rollback()
+                raise e
+        if table:
+            self.log.info(f"sql insert_many, Table: {table}, Total Rows: {total}")
+        else:
+            self.log.info(f"sql insert_many, Query: {query}, Total Rows: {total}")
+        return total
+
+    def insert_many_two(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
+                        ignore=False):
+        """
+        批量插入(支持字典列表或原始SQL) - 备用方法
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :param ignore: 是否使用INSERT IGNORE
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+    
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+            except pymysql.err.IntegrityError as e:
+                if "Duplicate entry" in str(e) and not ignore:
+                    self.log.warning(f"批量插入遇到重复,降级为逐条插入: {e}")
+                    if commit:
+                        conn.rollback()
+                    rowcount = 0
+                    for args in batch:
+                        try:
+                            self.insert_one(query, args)
+                            rowcount += 1
+                        except pymysql.err.IntegrityError as e2:
+                            if "Duplicate entry" in str(e2):
+                                self.log.debug(f"跳过重复条目: {e2}")
+                            else:
+                                self.log.error(f"插入失败: {e2}")
+                        except Exception as e2:
+                            self.log.error(f"插入失败: {e2}")
+                    total += rowcount
+                else:
+                    self.log.exception(f"数据库完整性错误: {e}")
+                    if commit:
+                        conn.rollback()
+                    raise e
+            except Exception as e:
+                self.log.exception(f"批量插入失败: {e}")
+                if commit:
+                    conn.rollback()
+                raise e
+        self.log.info(f"sql insert_many_two, Table: {table}, Total Rows: {total}")
+        return total
+
+    def insert_too_many(self, query, args_list, batch_size=1000):
+        """
+        执行批量插入语句,分片提交, 单次插入大于十万+时可用, 如果失败则降级为逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        :param batch_size: 每次插入的条数
+        """
+        self.log.info(f"sql insert_too_many, Query: {query}, Total Rows: {len(args_list)}")
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        conn.commit()
+                        self.log.debug(f"insert_too_many -> Total Rows: {len(batch)}")
+            except Exception as e:
+                self.log.error(f"insert_too_many error. Trying single insert. Error: {e}")
+                # 当前批次降级为单条插入
+                for args in batch:
+                    self.insert_one(query, args)
+
+    def update_one(self, query, args):
+        """
+        执行单条更新语句
+        :param query: 更新语句
+        :param args: 更新参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def update_all(self, query, args_list):
+        """
+        执行批量更新语句,如果失败则逐条更新
+        :param query: 更新语句
+        :param args_list: 更新参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql update_all, SQL: {query}, Rows: {len(args_list)}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            # 如果批量更新失败,则逐条更新
+            rowcount = 0
+            for args in args_list:
+                self.update_one(query, args)
+                rowcount += 1
+            self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def update_one_or_dict(self, table=None, data=None, condition=None, query=None, args=None, commit=True):
+        """
+        单条更新(支持字典或原始SQL)
+        :param table: 表名(字典模式必需)
+        :param data: 字典数据 {列名: 值}(与 query 二选一)
+        :param condition: 更新条件,支持以下格式:
+            - 字典: {"id": 1} → "WHERE id = %s"
+            - 字符串: "id = 1" → "WHERE id = 1"(需自行确保安全)
+            - 元组: ("id = %s", [1]) → "WHERE id = %s"(参数化查询)
+        :param query: 直接SQL语句(与 data 二选一)
+        :param args: SQL参数(query 模式下必需)
+        :param commit: 是否自动提交
+        :return: 影响行数
+        :raises: ValueError 参数校验失败时抛出
+        """
+        # 参数校验
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+            if table is None:
+                raise ValueError("Table name is required for dictionary update")
+            if condition is None:
+                raise ValueError("Condition is required for dictionary update")
+
+            # 构建 SET 子句
+            set_clause = ", ".join([f"{self._safe_identifier(k)} = %s" for k in data.keys()])
+            set_values = list(data.values())
+
+            # 解析条件
+            condition_clause, condition_args = self._parse_condition(condition)
+            query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
+            args = set_values + condition_args
+
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        # 执行更新
+        cursor = self._execute(query, args, commit)
+        # self.log.debug(
+        #     f"Updated table={table}, rows={cursor.rowcount}, query={query[:100]}...",
+        #     extra={"table": table, "rows": cursor.rowcount}
+        # )
+        return cursor.rowcount
+
+    def _parse_condition(self, condition):
+        """
+        解析条件为 (clause, args) 格式
+        :param condition: 字典/字符串/元组
+        :return: (str, list) SQL 子句和参数列表
+        """
+        if isinstance(condition, dict):
+            clause = " AND ".join([f"{self._safe_identifier(k)} = %s" for k in condition.keys()])
+            args = list(condition.values())
+        elif isinstance(condition, str):
+            clause = condition  # 注意:需调用方确保安全
+            args = []
+        elif isinstance(condition, (tuple, list)) and len(condition) == 2:
+            clause, args = condition[0], condition[1]
+            if not isinstance(args, (list, tuple)):
+                args = [args]
+        else:
+            raise ValueError("Condition must be dict/str/(clause, args)")
+        return clause, args
+
+    def update_many(self, table=None, data_list=None, condition_list=None, query=None, args_list=None, batch_size=500,
+                    commit=True):
+        """
+        批量更新(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param condition_list: 条件列表(必须为字典,与data_list等长)
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+            if condition_list is None or len(data_list) != len(condition_list):
+                raise ValueError("Condition_list must be provided and match the length of data_list")
+            if not all(isinstance(cond, dict) for cond in condition_list):
+                raise ValueError("All elements in condition_list must be dictionaries")
+
+            # 获取第一个数据项和条件项的键
+            first_data_keys = set(data_list[0].keys())
+            first_cond_keys = set(condition_list[0].keys())
+
+            # 构造基础SQL
+            set_clause = ', '.join([self._safe_identifier(k) + ' = %s' for k in data_list[0].keys()])
+            condition_clause = ' AND '.join([self._safe_identifier(k) + ' = %s' for k in condition_list[0].keys()])
+            base_query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
+            total = 0
+
+            # 分批次处理
+            for i in range(0, len(data_list), batch_size):
+                batch_data = data_list[i:i + batch_size]
+                batch_conds = condition_list[i:i + batch_size]
+                batch_args = []
+
+                # 检查当前批次的结构是否一致
+                can_batch = True
+                for data, cond in zip(batch_data, batch_conds):
+                    data_keys = set(data.keys())
+                    cond_keys = set(cond.keys())
+                    if data_keys != first_data_keys or cond_keys != first_cond_keys:
+                        can_batch = False
+                        break
+                    batch_args.append(tuple(data.values()) + tuple(cond.values()))
+
+                if not can_batch:
+                    # 结构不一致,转为单条更新
+                    for data, cond in zip(batch_data, batch_conds):
+                        self.update_one_or_dict(table=table, data=data, condition=cond, commit=commit)
+                        total += 1
+                    continue
+
+                # 执行批量更新
+                try:
+                    with self.pool.connection() as conn:
+                        with conn.cursor() as cursor:
+                            cursor.executemany(base_query, batch_args)
+                            if commit:
+                                conn.commit()
+                            total += cursor.rowcount
+                            self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
+                except Exception as e:
+                    if commit:
+                        conn.rollback()
+                    self.log.error(f"Batch update failed: {e}")
+                    # 降级为单条更新
+                    for args, data, cond in zip(batch_args, batch_data, batch_conds):
+                        try:
+                            self._execute(base_query, args, commit=commit)
+                            total += 1
+                        except Exception as e2:
+                            self.log.error(f"Single update failed: {e2}, Data: {data}, Condition: {cond}")
+            self.log.info(f"Total updated rows: {total}")
+            return total
+        elif query is not None:
+            # 处理原始SQL和参数列表
+            if args_list is None:
+                raise ValueError("args_list must be provided when using query")
+
+            total = 0
+            for i in range(0, len(args_list), batch_size):
+                batch_args = args_list[i:i + batch_size]
+                try:
+                    with self.pool.connection() as conn:
+                        with conn.cursor() as cursor:
+                            cursor.executemany(query, batch_args)
+                            if commit:
+                                conn.commit()
+                            total += cursor.rowcount
+                            self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
+                except Exception as e:
+                    if commit:
+                        conn.rollback()
+                    self.log.error(f"Batch update failed: {e}")
+                    # 降级为单条更新
+                    for args in batch_args:
+                        try:
+                            self._execute(query, args, commit=commit)
+                            total += 1
+                        except Exception as e2:
+                            self.log.error(f"Single update failed: {e2}, Args: {args}")
+            self.log.info(f"Total updated rows: {total}")
+            return total
+        else:
+            raise ValueError("Either data_list or query must be provided")
+
+    def check_pool_health(self):
+        """
+        检查连接池中有效连接数
+
+        # 使用示例
+        # 配置 MySQL 连接池
+        sql_pool = MySQLConnectionPool(log=log)
+        if not sql_pool.check_pool_health():
+            log.error("数据库连接池异常")
+            raise RuntimeError("数据库连接池异常")
+        """
+        try:
+            with self.pool.connection() as conn:
+                conn.ping(reconnect=True)
+                return True
+        except Exception as e:
+            self.log.error(f"Connection pool health check failed: {e}")
+            return False
+
+    def close(self):
+        """
+        关闭连接池,释放所有连接
+        """
+        try:
+            if hasattr(self, 'pool') and self.pool:
+                self.pool.close()
+                self.log.info("数据库连接池已关闭")
+        except Exception as e:
+            self.log.error(f"关闭连接池失败: {e}")
+
+    @staticmethod
+    def _safe_identifier(name):
+        """SQL标识符安全校验"""
+        if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
+            raise ValueError(f"Invalid SQL identifier: {name}")
+        return name
+
+
+if __name__ == '__main__':
+    sql_pool = MySQLConnectionPool()
+    data_dic = {'card_type_id': 111, 'card_type_name': '补充包 继承的意志【OPC-13】', 'card_type_position': 964,
+                'card_id': 5284, 'card_name': '蒙奇·D·路飞', 'card_number': 'OP13-001', 'card_rarity': 'L',
+                'card_img': 'https://source.windoent.com/OnePiecePc/Picture/1757929283612OP13-001.png',
+                'card_life': '4', 'card_attribute': '打', 'card_power': '5000', 'card_attack': '-',
+                'card_color': '红/绿', 'subscript': 4, 'card_features': '超新星/草帽一伙',
+                'card_text_desc': '【咚!!×1】【对方的攻击时】我方处于活跃状态的咚!!不多于5张的场合,可以将我方任意张数的咚!!转为休息状态。每有1张转为休息状态的咚!!,本次战斗中,此领袖或我方最多1张拥有《草帽一伙》特征的角色力量+2000。',
+                'card_offer_type': '补充包 继承的意志【OPC-13】', 'crawler_language': '简中'}
+    sql_pool.insert_one_or_dict(table="one_piece_record", data=data_dic)

+ 10 - 0
tag_spider/requirements.txt

@@ -0,0 +1,10 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+parsel==1.10.0
+PyMySQL==1.1.2
+PyYAML==6.0.1
+requests==2.32.5
+retrying==1.4.2
+schedule==1.2.2
+tenacity==9.1.2

+ 200 - 0
tag_spider/tag_add_date.py

@@ -0,0 +1,200 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/1/22 15:08
+import time
+import inspect
+from loguru import logger
+from parsel import Selector
+from mysq_pool import MySQLConnectionPool
+from DrissionPage import Chromium, ChromiumOptions
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+logger.remove()
+logger.add("./date_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+def update_grade_date(sql_pool, info):
+    """
+    更新数据库中的评级日期
+    :param sql_pool: MySQL连接池
+    :param info: 包含id和grade_date的元组
+    """
+    sql = """
+        UPDATE tag_record SET grade_date=%s WHERE id=%s
+    """
+    sql_pool.update_one(sql, info)
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_one_page(log, d_url):
+    """
+    获取页面内容
+    :param log: 日志记录器
+    :param d_url: 目标URL
+    :return: Selector对象或None
+    """
+    options = ChromiumOptions()
+    options.set_paths(local_port=9108, user_data_path=r'D:\Drissionpage_temp\tag_date_port_9108')
+    options.set_argument("-accept-lang=en-US")
+    browser = Chromium(options)
+    tab = browser.latest_tab
+    try:
+        tab.get(d_url)
+        time.sleep(2)
+        tab.wait.load_start()  # 等待页面进入加载状态
+        selector = Selector(tab.html)
+        return selector
+    except Exception as e:
+        log.error(f"查询失败,错误信息:{e}")
+        return None
+    finally:
+        tab.close()
+        browser.quit()
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_grade_date(log, card_id):
+    """
+    获取卡片的评级日期
+    :param log: 日志记录器
+    :param card_id: 卡片ID
+    :return: 评级日期字符串或None
+    """
+    try:
+        log.debug(f"Getting grade date for card_id: {card_id}")
+        view_url = f'https://my.taggrading.com/card/{card_id}'
+        selector = get_one_page(log, view_url)
+        
+        if not selector:
+            log.warning(f"Selector is None for card_id: {card_id}")
+            return None
+            
+        # 尝试多种可能的XPath表达式来获取评级日期
+        grade_date = selector.xpath(
+            "//div[contains(text(), 'Grade Date:')]/following-sibling::div/text() | "
+            "//div[contains(text(), 'Grade Date')]/following-sibling::div/text() | "
+            "//div[contains(text(), '评级日期')]/following-sibling::div/text() | "
+            "//span[contains(text(), 'Grade Date:')]/following-sibling::span/text() | "
+            "//span[contains(text(), 'Grade Date')]/following-sibling::span/text() | "
+            "//span[contains(text(), '评级日期')]/following-sibling::span/text()"
+        ).get()
+        
+        # 如果上述方法未找到,尝试更通用的方法
+        if not grade_date:
+            # 尝试查找包含日期格式的文本
+            date_patterns = selector.xpath(
+                "//div[contains(text(), 'Date')]/text() | "
+                "//div[contains(text(), '日期')]/text() | "
+                "//span[contains(text(), 'Date')]/text() | "
+                "//span[contains(text(), '日期')]/text()"
+            ).getall()
+            
+            for pattern in date_patterns:
+                # 简单的日期格式检查 (MM/DD/YYYY 或 YYYY-MM-DD)
+                if any(char in pattern for char in ['/']) and len(pattern) >= 8:
+                    grade_date = pattern.strip()
+                    break
+        
+        # 清理日期字符串
+        if grade_date:
+            grade_date = grade_date.strip()
+            log.debug(f"Found grade date: {grade_date} for card_id: {card_id}")
+        else:
+            log.warning(f"Could not find grade date for card_id: {card_id}")
+            
+        return grade_date
+    except Exception as e:
+        log.error(f"Error getting grade date for card_id {card_id}: {e}")
+        return None
+
+
+@retry(stop=stop_after_attempt(50), wait=wait_fixed(1800), after=after_log)
+def tag_date_main(log):
+    """
+    主函数 - 更新已有数据的评级日期
+    :param log: 日志记录器
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 评级日期更新任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool:
+        log.error("MySQL数据库连接失败")
+        raise Exception("MySQL数据库连接失败")
+
+    try:
+        # 查询所有没有评级日期的记录
+        # 首先检查grade_date字段是否存在
+        try:
+            sql_pool.select_one("SELECT grade_date FROM tag_record LIMIT 1")
+            log.info("grade_date字段已存在")
+        except Exception as e:
+            log.warning(f"grade_date字段可能不存在,尝试添加: {e}")
+            try:
+                # 添加grade_date字段
+                sql_pool.update_one("ALTER TABLE tag_record ADD COLUMN grade_date VARCHAR(50) DEFAULT NULL", ())
+                log.info("成功添加grade_date字段")
+            except Exception as alter_error:
+                log.error(f"添加grade_date字段失败: {alter_error}")
+                raise Exception("无法添加grade_date字段,请手动添加后重试")
+        
+        while True:
+            # 查询所有没有评级日期的记录
+            no_date_list = sql_pool.select_all("SELECT id, card_id FROM tag_record WHERE grade_date IS NULL OR grade_date = '' LIMIT 1000")
+            
+            if no_date_list:
+                log.info(f"找到 {len(no_date_list)} 条没有评级日期的记录")
+                for item in no_date_list:
+                    try:
+                        record_id = item[0]
+                        card_id = item[1]
+                        
+                        # 获取评级日期
+                        grade_date = get_grade_date(log, card_id)
+                        
+                        # 更新数据库
+                        if grade_date:
+                            update_grade_date(sql_pool, (grade_date, record_id))
+                            log.debug(f"更新记录 {record_id} 的评级日期为: {grade_date}")
+                        else:
+                            # 如果没有找到日期,可以设置一个默认值或跳过
+                            # 这里我们跳过,保持NULL
+                            log.warning(f"无法获取卡片 {card_id} 的评级日期,跳过")
+                            
+                    except Exception as e:
+                        log.error(f"处理记录 {item[0]} 时出错: {e}")
+                        continue
+            else:
+                log.info("所有记录都已更新评级日期,等待新数据...")
+                time.sleep(3600)  # 等待1小时后再次检查
+                
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'评级日期更新程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+if __name__ == '__main__':
+    tag_date_main(logger)

+ 147 - 0
tag_spider/tag_category_statistics_spider.py

@@ -0,0 +1,147 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/3/27 15:44
+import time
+import inspect
+import schedule
+from loguru import logger
+from parsel import Selector
+from urllib.parse import unquote
+from mysql_pool import MySQLConnectionPool
+from DrissionPage import Chromium, ChromiumOptions
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+CATEGORY_LIST = ["Baseball", "Basketball", "Football", "Hockey", "MMA", "Soccer", "Racing", "Golf", "Tennis", "Boxing",
+                 "Other%20Sports", "Multi-Sport", "Marvel%2FDC", "Star%20Wars", "Fortnite", "Garbage%20Pail%20Kids",
+                 "Music", "TV%2FMovies", "Wrestling", "Video%20Games", "Nature", "Pop%20Culture", "Disney",
+                 "Pok%C3%A9mon", "Magic%20the%20Gathering", "Dragon%20Ball", "Metazoo", "Wei%C3%9F%20Schwarz",
+                 "One%20Piece", "Lorcana", "Digimon", "Other%20TCG"]
+
+logger.remove()
+logger.add("./category_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(10), wait=wait_fixed(2), after=after_log)
+def get_one_page(log, d_url, category, sql_pool):
+    options = ChromiumOptions()
+    options.no_imgs(True)
+    options.set_paths(local_port=9109, user_data_path=r'D:\Drissionpage_temp\tag_port_9109')
+    # options.set_argument("--disable-gpu")
+
+    # 隧道域名:端口号
+    # tunnel = "x371.kdltps.com:15818"
+    # options.set_proxy("http://" + tunnel)
+    options.set_argument("-accept-lang=en-US")
+    browser = Chromium(options)
+    tab = browser.latest_tab
+    try:
+        # 获取 stealth.min.js 的路径
+        # stealth_path = os.path.join(os.path.dirname(__file__), 'utils', 'stealth.min.js')
+
+        # 注入 stealth.min.js 到上下文中
+        # context.add_init_script(path=stealth_path)
+
+        tab.get(d_url)
+        time.sleep(1)
+        tab.wait.load_start()  # 等待页面进入加载状态
+        selector = Selector(tab.html)
+        if not selector:  # 检查返回值是否为 None
+            log.warning(f"Selector is None for category: {category}")
+            return []  # 返回默认值
+
+        tag_tr = selector.xpath('//table[@class="MuiTable-root"]/tbody[1]/tr')
+
+        number_of_sets = tag_tr.xpath("./td[2]/text()").get()
+        if number_of_sets:
+            number_of_sets = number_of_sets.replace(",", "")
+
+        total_items = tag_tr.xpath("./td[3]/text()").get()
+        if total_items:
+            total_items = total_items.replace(",", "")
+
+        total_graded = tag_tr.xpath("./td[4]/text()").get()
+        if total_graded:
+            total_graded = total_graded.replace(",", "")
+
+        insert_data = {
+            "category": unquote(category),
+            "number_of_sets": number_of_sets,
+            "total_items": total_items,
+            "total_graded": total_graded,
+            "crawler_date": time.strftime("%Y-%m-%d", time.localtime())
+        }
+        sql_pool.insert_one_or_dict(table="tag_category_statistics_record", data=insert_data)
+    except Exception as e:
+        log.error(f"查询失败,错误信息:{e}")
+        # return None
+        raise  e
+    finally:
+        tab.close()
+        browser.quit()
+
+
+@retry(stop=stop_after_attempt(50), wait=wait_fixed(1800), after=after_log)
+def tag_category_main(log):
+    """
+    主函数
+    :param log: logger对象
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool.check_pool_health():
+        log.error("数据库连接池异常")
+        raise RuntimeError("数据库连接池异常")
+
+    try:
+        for category_ in CATEGORY_LIST:
+            try:
+                log.debug(f"Getting years for category: {category_}")
+                category_url = f'https://my.taggrading.com/pop-report/{category_}'
+                get_one_page(log, category_url, category_, sql_pool)
+            except Exception as e:
+                log.error(f"tag_category_main -> get_category_num: {e}")
+                continue
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+def stati_schedule_task():
+    """
+    设置定时任务
+    """
+    # tag_category_main(log=logger)
+
+    schedule.every().days.at("05:31").do(tag_category_main, log=logger)
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+if __name__ == '__main__':
+    stati_schedule_task()

+ 218 - 0
tag_spider/tag_detail_spider.py

@@ -0,0 +1,218 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/3/7 14:52
+import time
+import inspect
+from loguru import logger
+from parsel import Selector
+from mysq_pool import MySQLConnectionPool
+from DrissionPage import Chromium, ChromiumOptions
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+logger.remove()
+logger.add("./detail_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+def update_data(sql_pool, info):
+    sql = """
+            UPDATE tag_record SET 
+                view_score=%s, mint=%s, grayscale_img_front=%s, grayscale_img_back=%s, 
+                original_img_front=%s, original_img_back=%s, graded_img_front=%s, 
+                graded_img_back=%s, summary_front=%s, summary_back=%s, 
+                summary_centering_front=%s, summary_centering_back=%s, 
+                summary_corners_front=%s, summary_corners_back=%s, 
+                summary_surface_front=%s, summary_surface_back=%s, 
+                summary_edges_front=%s, summary_edges_back=%s, 
+                summary_dimensions_h=%s, summary_dimensions_w=%s 
+            WHERE id=%s
+            """
+    sql_pool.update_one(sql, info)
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_one_page(log, d_url):
+    options = ChromiumOptions()
+    options.set_paths(local_port=9107, user_data_path=r'D:\Drissionpage_temp\tag_detail_port_9107')
+    # options.set_argument("--disable-gpu")
+
+    # 隧道域名:端口号
+    # tunnel = "x371.kdltps.com:15818"
+    # options.set_proxy("http://" + tunnel)
+    options.set_argument("-accept-lang=en-US")
+    browser = Chromium(options)
+    tab = browser.latest_tab
+    try:
+        # 获取 stealth.min.js 的路径
+        # stealth_path = os.path.join(os.path.dirname(__file__), 'utils', 'stealth.min.js')
+
+        # 注入 stealth.min.js 到上下文中
+        # context.add_init_script(path=stealth_path)
+        # page = context.new_page()
+
+        tab.get(d_url)
+        # tab.wait.load_start()  # 等待页面进入加载状态
+        # if 'https://my.taggrading.com/card/' in d_url:
+        #     time.sleep(3)
+        time.sleep(3)
+        tab.wait.load_start()  # 等待页面进入加载状态
+
+        tab.scroll.to_bottom()
+        # tab.run_js("window.scrollTo(0, document.body.scrollHeight);")  # 滚动到底部
+        tab.wait.load_start()  # 等待页面进入加载状态
+
+        selector = Selector(tab.html)
+        return selector
+
+    except Exception as e:
+        log.error(f"查询失败,错误信息:{e}")
+        raise e
+    finally:
+        tab.close()
+        browser.quit()
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_card_id_detail(log, sql_pool, cid):
+    try:
+        sql_id = cid[0]
+        card_id = cid[1]
+        log.debug(f"Getting card_id detail: {card_id}")
+        view_url = f'https://my.taggrading.com/card/{card_id}'
+        # resp = requests.get(view_url, headers=headers, timeout=10, proxies=get_proxys(log))
+        # resp.raise_for_status()
+        # selector = Selector(text=resp.text)
+        selector = get_one_page(log, view_url)
+        view_score = selector.xpath("//div[@class='MuiBox-root-123 jss153']/text()").get()
+        # mint = selector.xpath("//div[@class='MuiBox-root-123 jss157']/text()").get()
+        mint = selector.xpath(
+            '//div[@class="MuiBox-root-123 jss154"]/div/div/text() | //div[@class="MuiBox-root-123 jss155"]/div/div[@class="MuiBox-root-123 jss157"]/text() | //*[@id="root"]/div[1]/div/section/div/div[1]/div[1]/div[2]/div[2]/div[1]/div/text()').get()
+
+        # grayscale_img_front = selector.xpath(
+        #     '//div[@class="MuiBox-root-123 jss263"]/div/div[1]//img[@class="jss134 jss136"]/@src | //div[@class="MuiBox-root-123 jss262"]/div/div[1]//img[@class="jss134 jss136"]/@src').get()
+        # grayscale_img_back = selector.xpath(
+        #     '//div[@class="MuiBox-root-123 jss263"]/div/div[2]//img[@class="jss134 jss136"]/@src | //div[@class="MuiBox-root-123 jss262"]/div/div[2]//img[@class="jss134 jss136"]/@src').get()
+        #
+        # original_img_front = selector.xpath(
+        #     '//div[@class="MuiBox-root-123 jss263"]/div/div[1]//img[@class="jss131 jss134 jss136"]/@src | //div[@class="MuiBox-root-123 jss262"]/div/div[1]//img[@class="jss131 jss134 jss136"]/@src').get()
+        # original_img_back = selector.xpath(
+        #     '//div[@class="MuiBox-root-123 jss263"]/div/div[2]//img[@class="jss131 jss134 jss136"]/@src | //div[@class="MuiBox-root-123 jss262"]/div/div[2]//img[@class="jss131 jss134 jss136"]/@src').get()
+        #
+        # graded_img_front = selector.xpath(
+        #     '//div[@class="MuiBox-root-123 jss1030"]/div/div[1]//img[@class="jss134 jss136"]/@src | //div[@class="MuiBox-root-123 jss887"]/div/div[1]//img[@class="jss134 jss136"]/@src').get()
+        # graded_img_back = selector.xpath(
+        #     '//div[@class="MuiBox-root-123 jss1030"]/div/div[2]//img[@class="jss134 jss136"]/@src | //div[@class="MuiBox-root-123 jss887"]/div/div[2]//img[@class="jss134 jss136"]/@src').get()
+
+        # 标签变换比较频繁, 提取所有再处理
+        img_list = selector.xpath(
+            "//img[contains(@src, '.jpg') and substring(@src, string-length(@src) - 3) = '.jpg']/@src").getall()
+        grayscale_img_front = None
+        grayscale_img_back = None
+        original_img_front = None
+        original_img_back = None
+        graded_img_front = None
+        graded_img_back = None
+        if img_list:
+            for img in img_list:
+                if img.endswith('FRONT_SFX.jpg'):
+                    grayscale_img_front = img
+                elif img.endswith('BACK_SFX.jpg'):
+                    grayscale_img_back = img
+                elif img.endswith('FRONT_MAIN.jpg'):
+                    original_img_front = img
+                elif img.endswith('BACK_MAIN.jpg'):
+                    original_img_back = img
+                elif img.endswith('Slabbed_FRONT.jpg'):
+                    graded_img_front = img
+                elif img.endswith('Slabbed_BACK.jpg'):
+                    graded_img_back = img
+        else:
+            log.warning(f"No images found for view: {view_url}")
+
+        summary_front = selector.xpath("//div[@class='MuiBox-root-123 jss978']/text()").get()
+        summary_back = selector.xpath("//div[@class='MuiBox-root-123 jss980']/text()").get()
+
+        summary_centering_front = selector.xpath("//b[@class='MuiBox-root-123 jss399']/text()").get()
+        summary_centering_back = selector.xpath("//b[@class='MuiBox-root-123 jss402']/text()").get()
+
+        summary_corners_front = selector.xpath("//b[@class='MuiBox-root-123 jss409']/text()").get()
+        summary_corners_back = selector.xpath("//b[@class='MuiBox-root-123 jss412']/text()").get()
+
+        summary_surface_front = selector.xpath("//b[@class='MuiBox-root-123 jss419']/text()").get()
+        summary_surface_back = selector.xpath("//b[@class='MuiBox-root-123 jss422']/text()").get()
+
+        summary_edges_front = selector.xpath("//b[@class='MuiBox-root-123 jss429']/text()").get()
+        summary_edges_back = selector.xpath("//b[@class='MuiBox-root-123 jss432']/text()").get()
+
+        summary_dimensions_h = selector.xpath(
+            "//b[@class='MuiBox-root-123 jss985']/text() | //b[@class='MuiBox-root-123 jss429']/text()").get()
+        summary_dimensions_w = selector.xpath(
+            "//b[@class='MuiBox-root-123 jss988']/text() | //b[@class='MuiBox-root-123 jss432']/text()").get()
+        info = (view_score, mint, grayscale_img_front, grayscale_img_back, original_img_front, original_img_back,
+                graded_img_front, graded_img_back, summary_front, summary_back, summary_centering_front,
+                summary_centering_back, summary_corners_front, summary_corners_back, summary_surface_front,
+                summary_surface_back, summary_edges_front, summary_edges_back, summary_dimensions_h,
+                summary_dimensions_w, sql_id)
+        # print(info)
+        update_data(sql_pool, info)
+
+    except Exception as e:
+        log.error(f"Error getting card id detail: {e}")
+
+
+@retry(stop=stop_after_attempt(50), wait=wait_fixed(1800), after=after_log)
+def tag_detail_main(log):
+    """
+    主函数
+    :param log:
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool:
+        log.error("MySQL数据库连接失败")
+        raise Exception("MySQL数据库连接失败")
+
+    try:
+        while True:
+            cid_list = sql_pool.select_all("SELECT id,card_id FROM tag_record WHERE detail_state=0 LIMIT 10000")
+            if cid_list:
+                for cid in cid_list:
+                    try:
+                        get_card_id_detail(log, sql_pool, cid)
+                        sql_pool.update_one("UPDATE tag_record SET detail_state=1 WHERE id=%s", (cid[0],))
+                    except Exception as e:
+                        log.error(f"Error getting card id detail: {e}")
+            else:
+                log.info(f"No card id detail to get, waiting for 1800 seconds...")
+                time.sleep(3600)
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+if __name__ == '__main__':
+    tag_detail_main(logger)

+ 391 - 0
tag_spider/tag_spider.py

@@ -0,0 +1,391 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/3/3 11:21
+import time
+import inspect
+import schedule
+from loguru import logger
+from parsel import Selector
+from urllib.parse import unquote
+from mysql_pool import MySQLConnectionPool
+from urllib.parse import urlparse, parse_qs
+from DrissionPage import Chromium, ChromiumOptions
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+"""
+https://my.taggrading.com/pop-report/Baseball
+https://my.taggrading.com/pop-report/Basketball
+https://my.taggrading.com/pop-report/Football
+https://my.taggrading.com/pop-report/Hockey
+https://my.taggrading.com/pop-report/MMA
+https://my.taggrading.com/pop-report/Soccer
+https://my.taggrading.com/pop-report/Racing
+https://my.taggrading.com/pop-report/Golf
+https://my.taggrading.com/pop-report/Tennis
+https://my.taggrading.com/pop-report/Boxing
+https://my.taggrading.com/pop-report/Other%20Sports
+https://my.taggrading.com/pop-report/Multi-Sport
+https://my.taggrading.com/pop-report/Marvel%2FDC
+https://my.taggrading.com/pop-report/Star%20Wars
+https://my.taggrading.com/pop-report/Fortnite
+https://my.taggrading.com/pop-report/Garbage%20Pail%20Kids
+https://my.taggrading.com/pop-report/Music
+https://my.taggrading.com/pop-report/TV%2FMovies
+https://my.taggrading.com/pop-report/Wrestling
+https://my.taggrading.com/pop-report/Video%20Games
+https://my.taggrading.com/pop-report/Nature
+https://my.taggrading.com/pop-report/Pop%20Culture
+https://my.taggrading.com/pop-report/Disney
+https://my.taggrading.com/pop-report/Pok%C3%A9mon
+https://my.taggrading.com/pop-report/Magic%20the%20Gathering
+https://my.taggrading.com/pop-report/Dragon%20Ball
+https://my.taggrading.com/pop-report/Metazoo
+https://my.taggrading.com/pop-report/Wei%C3%9F%20Schwarz
+https://my.taggrading.com/pop-report/One%20Piece
+https://my.taggrading.com/pop-report/Lorcana
+https://my.taggrading.com/pop-report/Digimon
+https://my.taggrading.com/pop-report/Other%20TCG
+
+YU-GI-OH (COMING SOON)  打不开  404
+"""
+
+# CATEGORY_LIST = ["Baseball", "Basketball", "Football", "Hockey", "MMA", "Soccer", "Racing", "Golf", "Tennis", "Boxing",
+#                  "Other%20Sports", "Multi-Sport", "Marvel%2FDC", "Star%20Wars", "Fortnite", "Garbage%20Pail%20Kids",
+#                  "Music", "TV%2FMovies", "Wrestling", "Video%20Games", "Nature", "Pop%20Culture", "Disney",
+#                  "Pok%C3%A9mon", "Magic%20the%20Gathering", "Dragon%20Ball", "Metazoo", "Wei%C3%9F%20Schwarz",
+#                  "One%20Piece", "Lorcana", "Digimon", "Other%20TCG"]
+CATEGORY_LIST = ["Hockey", "MMA", "Soccer", "Racing", "Golf", "Tennis", "Boxing",
+                 "Other%20Sports", "Multi-Sport", "Marvel%2FDC", "Star%20Wars", "Fortnite", "Garbage%20Pail%20Kids",
+                 "Music", "TV%2FMovies", "Wrestling", "Video%20Games", "Nature", "Pop%20Culture", "Disney",
+                 "Pok%C3%A9mon", "Magic%20the%20Gathering", "Dragon%20Ball", "Metazoo", "Wei%C3%9F%20Schwarz",
+                 "One%20Piece", "Lorcana", "Digimon", "Other%20TCG"]
+
+logger.remove()
+logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+def get_one_page(log, d_url) -> any:
+    options = ChromiumOptions()
+    options.set_paths(local_port=9105, user_data_path=r'D:\Drissionpage_temp\tag_port_9105')
+    # options.set_argument("--disable-gpu")
+
+    # 隧道域名:端口号
+    # tunnel = "x371.kdltps.com:15818"
+    # options.set_proxy("http://" + tunnel)
+    options.set_argument("-accept-lang=en-US")
+    browser = Chromium(options)
+    tab = browser.latest_tab
+    try:
+        # 获取 stealth.min.js 的路径
+        # stealth_path = os.path.join(os.path.dirname(__file__), 'utils', 'stealth.min.js')
+
+        # 注入 stealth.min.js 到上下文中
+        # context.add_init_script(path=stealth_path)
+        # page = context.new_page()
+
+        tab.get(d_url)
+        # tab.wait.load_start()  # 等待页面进入加载状态
+        # if 'https://my.taggrading.com/card/' in d_url:
+        #     time.sleep(3)
+        # time.sleep(2)
+        # tab.scroll.to_bottom()
+        tab.wait.load_start()  # 等待页面进入加载状态
+
+        selector = Selector(tab.html)
+        return selector
+
+    except Exception as e:
+        log.error(f"查询失败,错误信息:{e}")
+        return None
+    finally:
+        tab.close()
+        browser.quit()
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_years(log, category):
+    try:
+        log.debug(f"Getting years for category: {category}")
+        category_url = f'https://my.taggrading.com/pop-report/{category}'
+        #                https://my.taggrading.com/pop-report/Baseball
+        # resp = requests.get(category_url, headers=headers, timeout=10, proxies=get_proxys(log))
+        selector = get_one_page(log, category_url)
+        if not selector:  # 检查返回值是否为 None
+            log.warning(f"Selector is None for category: {category}")
+            return []  # 返回默认值
+        # resp.raise_for_status()
+        # selector = Selector(text=resp.text)
+        tag_years_list = selector.xpath(
+            "//tbody[@class='MuiTableBody-root']/tr/td[1]/a/@href").getall()
+        return tag_years_list
+    except Exception as e:
+        log.error(f"Error getting years: {e}")
+        return None
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_set_names(log, year_url):
+    try:
+        log.debug(f"Getting set names for year: {year_url}")
+        # resp = requests.get(year_url, headers=headers, timeout=10, proxies=get_proxys(log))
+        # resp.raise_for_status()
+        # selector = Selector(text=resp.text)
+        selector = get_one_page(log, year_url)
+        if not selector:  # 检查返回值是否为 None
+            log.warning(f"Selector is None for year_url: {year_url}")
+            return []  # 返回默认值
+        set_names_list = selector.xpath(
+            "//tbody[@class='MuiTableBody-root']/tr/td[1]//a/@href").getall()
+        return set_names_list
+    except Exception as e:
+        log.error(f"Error getting set names: {e}")
+        return None
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_card_name_list(log, set_name_url):
+    try:
+        log.debug(f"Getting card names for set: {set_name_url}")
+        # resp = requests.get(set_name_url, headers=headers, timeout=10, proxies=get_proxys(log))
+        # resp.raise_for_status()
+        # selector = Selector(text=resp.text)
+        selector = get_one_page(log, set_name_url)
+        if not selector:  # 检查返回值是否为 None
+            log.warning(f"Selector is None for set_name_url: {set_name_url}")
+            return []  # 返回默认值
+        card_name_list = selector.xpath('//tbody[@class="MuiTableBody-root"]/tr/td[2]/div/div/a/@href').getall()
+        return card_name_list
+    except Exception as e:
+        log.error(f"Error getting card names: {e}")
+        return None
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_view_list(log, card_name_url):
+    try:
+        log.debug(f"Getting view for card: {card_name_url}")
+        # resp = requests.get(card_name_url, headers=headers, timeout=10, proxies=get_proxys(log))
+        # resp.raise_for_status()
+        # selector = Selector(text=resp.text)
+        selector = get_one_page(log, card_name_url)
+        if not selector:  # 检查返回值是否为 None
+            log.warning(f"Selector is None for card_name_url: {card_name_url}")
+            return []  # 返回默认值
+        # view_list = selector.xpath("//tbody[@class='MuiTableBody-root']/tr/td[3]/a/@href").getall()
+        view_list = selector.xpath("//tbody[@class='MuiTableBody-root']/tr")
+        return view_list
+    except Exception as e:
+        log.error(f"Error getting view: {e}")
+        return None
+
+
+def get_set_name(href):
+    # 解析 URL
+    parsed_url = urlparse(href)
+    # 提取查询参数
+    query_params = parse_qs(parsed_url.query)
+    # 获取 setName 的值,默认为 None
+    return query_params.get('setName', [None])[0]
+
+
+def save_list_data(sql_pool, list_info):
+    sql = """
+        INSERT INTO tag_record (category, year, brand_name, card_set_name, card_number, card_name, card_id, completed_date) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
+    """
+    sql_pool.insert_one(sql, list_info)
+
+
+@retry(stop=stop_after_attempt(50), wait=wait_fixed(1800), after=after_log)
+def tag_main(log):
+    """
+    主函数
+    :param log:
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool:
+        log.error("MySQL数据库连接失败")
+        raise Exception("MySQL数据库连接失败")
+
+    try:
+        # sql_card_id_list = sql_pool.select_all("SELECT card_id FROM tag_record")
+        # sql_card_id_list = [item[0] for item in sql_card_id_list]
+        for category in CATEGORY_LIST:
+            log.info(f"开始爬取 {category} 标签数据")
+            try:
+                tag_years_list = get_years(log, category)
+                # if not tag_years_list:
+                #     log.warning(f"Failed to get years for category: {category}, skipping category")
+                #     continue
+                if tag_years_list is None:
+                    log.warning(f"Failed to get years for category: {category}, returning default value")
+                    tag_years_list = []  # 返回默认值
+
+                for tag_year in tag_years_list:
+                    try:
+                        tag_year_url = f'https://my.taggrading.com{tag_year}'
+                        year = tag_year.split('/')[-1]
+                        if category == 'Hockey' and int(year) < 2022:
+                            continue
+
+                        set_names_list = get_set_names(log, tag_year_url)
+                        # print(f"set_names_list: {set_names_list}")
+                        # if not set_names_list:
+                        #     log.warning(f"Failed to get set names for year: {tag_year}, skipping year")
+                        #     continue
+                        if set_names_list is None:
+                            log.warning(f"Failed to get set names for year: {tag_year}, returning default value")
+                            set_names_list = []  # 返回默认值
+
+                        # 找到 'Score Rookie & Traded' 的位置
+                        start_index = -1
+                        for i, tag_set_name in enumerate(set_names_list):
+                            if '?' in tag_set_name:
+                                brand_name = tag_set_name.split('?')[0].split('/')[-1]
+                            else:
+                                brand_name = tag_set_name.split('/')[-1]
+
+                            if brand_name == 'Upper Deck':
+                                start_index = i
+                                break
+
+                        # 如果找到了 'Score Rookie & Traded',从该位置开始处理
+                        if start_index != -1:
+                            set_names_list = set_names_list[start_index:]
+
+                        for tag_set_name in set_names_list:
+                            try:
+                                set_name_url = f'https://my.taggrading.com{tag_set_name}'
+                                # print('tag_set_name:',tag_set_name)
+
+                                if '?' in tag_set_name:
+                                    brand_name = tag_set_name.split('?')[0].split('/')[-1]
+                                else:
+                                    brand_name = tag_set_name.split('/')[-1]
+
+                                card_set_name = get_set_name(tag_set_name)
+                                card_name_list = get_card_name_list(log, set_name_url)
+                                # if not card_name_list:
+                                #     log.warning(f"Failed to get card names for set: {tag_set_name}, skipping set")
+                                #     continue
+                                if card_name_list is None:
+                                    log.warning(
+                                        f"Failed to get card names for set: {tag_set_name}, returning default value")
+                                    card_name_list = []  # 返回默认值
+
+                                for tag_card_name in card_name_list:
+                                    try:
+                                        card_name_url = f'https://my.taggrading.com{tag_card_name}'
+                                        # print(f"tag_card_name: {tag_card_name}")
+                                        # brand_name = tag_set_name.split('/')[3]
+                                        card_name = tag_card_name.split('/')[5]
+                                        if '?' in tag_card_name:
+                                            card_number = tag_card_name.split('/')[-1].split('?')[0]
+                                        else:
+                                            card_number = tag_card_name.split('/')[-1]
+                                        if card_number:
+                                            card_number = unquote(card_number.strip())
+                                        else:
+                                            card_number = ''
+
+                                        view_list_url = get_view_list(log, card_name_url)
+                                        # if not view_list_url:
+                                        #     log.warning(
+                                        #         f"Failed to get view list for card: {tag_card_name}, skipping card")
+                                        #     continue
+                                        if view_list_url is None:
+                                            log.warning(
+                                                f"Failed to get view list for card: {tag_card_name}, returning default value")
+                                            view_list_url = []  # 返回默认值
+
+                                        for loop_view in view_list_url:
+                                            try:
+                                                tag_view = loop_view.xpath(".//td[3]/a/@href")
+                                                view_url = f'https://my.taggrading.com{tag_view}'
+                                                completed_date = loop_view.xpath("./td[7]/text()").get()
+                                                card_id = view_url.split('/')[-1]
+
+                                                # list_info = (
+                                                #     unquote(category), year, brand_name, card_set_name, card_number,
+                                                #     card_name, card_id,completed_date)
+                                                # get_card_id_detail(log, view_url, category, year, brand_name,
+                                                #                    card_set_name, card_number, card_name, card_id,
+                                                #                    sql_pool)
+                                                data_dict = {
+                                                    'category': unquote(category),
+                                                    'year': year,
+                                                    'brand_name': brand_name,
+                                                    'card_set_name': card_set_name,
+                                                    'card_number': card_number,
+                                                    'card_name': card_name,
+                                                    'card_id': card_id,
+                                                    'completed_date': completed_date
+                                                }
+                                                # print(list_info)
+                                                # save_list_data(sql_pool, list_info)
+                                                sql_pool.insert_one_or_dict(table="tag_record", data=data_dict, ignore=True)
+
+                                            except Exception as e:
+                                                log.error(f"Error getting view url: {e}")
+                                                continue
+                                    except Exception as e:
+                                        log.error(f"Error getting card name url: {e}")
+                                        continue
+                            except Exception as e:
+                                log.error(f"Error getting set name url: {e}")
+                                continue
+                    except Exception as e:
+                        log.error(f"Error getting tag year url: {e}")
+                        continue
+            except Exception as e:
+                log.error(f"Error getting tag years: {e}")
+                continue
+
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+def schedule_task():
+    """
+    设置定时任务
+    """
+    tag_main(log=logger)
+
+    # schedule.every().sunday.at("03:31").do(tag_main, log=logger)
+    schedule.every(15).days.at("03:31").do(tag_main, log=logger)
+
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+if __name__ == '__main__':
+    # schedule_task()
+    tag_main(logger)
+    # print(get_years(logger, 'Baseball'))

+ 22 - 0
tag_spider/tag_start_spider.py

@@ -0,0 +1,22 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/3/7 16:26
+import subprocess
+import time
+
+
+def start_spider(script_name):
+    """启动指定的爬虫脚本"""
+    print(f"Starting {script_name}...")
+    # 使用subprocess.Popen启动一个新的进程,这样即使父进程退出,子进程也能继续运行
+    subprocess.Popen(["python", script_name])
+
+
+# 启动已经内置了定时任务的爬虫
+start_spider("tag_spider.py")
+time.sleep(1)
+start_spider("tag_detail_spider.py")
+start_spider("tag_category_statistics_spider.py")
+
+print("All spiders have been started. Press Ctrl+C to stop the main process.")

+ 174 - 0
tag_spider/update_grade_dates.py

@@ -0,0 +1,174 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/1/22 15:08
+"""
+独立脚本,用于更新已有数据的评级日期
+"""
+import time
+import inspect
+from loguru import logger
+from parsel import Selector
+from mysql_pool import MySQLConnectionPool
+from DrissionPage import Chromium, ChromiumOptions
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+
+logger.remove()
+logger.add("./date_update_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+           level="DEBUG", retention="7 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+def update_grade_date(sql_pool, info):
+    """
+    更新数据库中的评级日期
+    :param sql_pool: MySQL连接池
+    :param info: 包含id和grade_date的元组
+    """
+    sql = """
+        UPDATE tag_record SET completed_date=%s WHERE card_id=%s
+    """
+    sql_pool.update_one(sql, info)
+
+
+# ... existing code ...
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_one_page(log, no_date_list, sql_pool):
+    """
+    获取页面内容
+    :param log: 日志记录器
+    :param no_date_list: 目标URL
+    :param sql_pool: MySQL连接池
+    """
+    options = ChromiumOptions()
+    options.set_paths(local_port=9105, user_data_path=r'D:\Drissionpage_temp\tag_date_update_port_9105')
+    options.set_argument("-accept-lang=en-US")
+    browser = Chromium(options)
+    try:
+        log.info(f"找到 {len(no_date_list)} 条没有评级日期的记录")
+        for item in no_date_list:
+            category = item[0]
+            year = item[1]
+            brand_name = item[2]
+            card_name = item[3]
+            card_number = item[4]
+            card_set_name = item[5]
+            card_id = item[6]
+            if card_set_name:
+                url = f"https://my.taggrading.com/pop-report/{category}/{year}/{brand_name}/{card_name}/{card_number}?setName={card_set_name}"
+            else:
+                url = f"https://my.taggrading.com/pop-report/{category}/{year}/{brand_name}/{card_name}/{card_number}"
+
+            log.debug(f"获取页面内容,URL: {url}")
+
+            tab = browser.new_tab()
+            tag_date = '1970-01-01'  # 初始化 tag_date 变量
+            try:
+                # tab.get("https://my.taggrading.com/pop-report/Baseball/1989/Classic/Ken Griffey Jr/131?setName=Travel Orange")
+                tab.get(url)
+                time.sleep(2)
+                # tab.wait.load_start()  # 等待页面进入加载状态
+                selector = Selector(tab.html)
+                tag_tr_list = selector.xpath("//tbody[@class='MuiTableBody-root']/tr")
+                if tag_tr_list:
+                    for tag_tr in tag_tr_list:
+                        # view_url = tag_tr.xpath("./td[3]/a/@href").get()
+                        # card_id = view_url.split('/')[-1]
+
+                        tag_date = tag_tr.xpath("./td[7]/text()").get()
+                        tag_date = tag_date.strip() if tag_date else '01-01-1970'
+                        # 01-09-2024  转换格式为 2024-01-09
+                        if tag_date:
+                            tag_date = time.strftime("%Y-%m-%d", time.strptime(tag_date, "%m-%d-%Y"))
+
+                        # print(f"card_id: {card_id}, tag_date: {tag_date}")
+
+
+                else:
+                    log.debug(f"页面内容为空,URL: {url}")
+                    # 页面内容为空时,也更新数据库,标记为已处理但无数据
+                    tag_date = '1970-01-01'  # 设置为空字符串表示无数据
+
+            except Exception as e:
+                log.error(f"获取页面内容失败,URL: {url}, 错误信息:{e}")
+                tag_date = '1970-01-01'  # 发生异常时也设置为空字符串
+
+            # 更新数据库中的评级日期
+            if tag_date is not None:
+                log.debug(f"更新数据库中的评级日期,card_id: {card_id}, tag_date: {tag_date}")
+                # update_grade_date(sql_pool, (tag_date, card_id))
+                sql_pool.update_one_or_dict(
+                    "tag_record",
+                    {"completed_date": tag_date},
+                    {"card_id": card_id}
+                )
+
+            tab.close()
+    except Exception as e:
+        log.error(f"查询失败,错误信息:{e}")
+        raise e
+    finally:
+        browser.quit()
+
+
+@retry(stop=stop_after_attempt(50), wait=wait_fixed(1800), after=after_log)
+def update_existing_grade_dates(log):
+    """
+    更新已有数据的评级日期
+    :param log: 日志记录器
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 评级日期更新任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool:
+        log.error("MySQL数据库连接失败")
+        raise Exception("MySQL数据库连接失败")
+
+    try:
+        """
+        https://my.taggrading.com/pop-report/Baseball/1989/Donruss/Gary%20Carter/53
+        https://my.taggrading.com/pop-report/Baseball/1989/Classic/Craig%20Biggio/51?setName=Light+Blue
+
+        """
+        while True:
+            # 查询所有没有评级日期的记录
+            no_date_list = sql_pool.select_all(
+                "SELECT category, year, brand_name, card_name, card_number, card_set_name, card_id FROM tag_record WHERE completed_date IS NULL LIMIT 1000")
+            no_date_list = [item for item in no_date_list]
+            # print(no_date_list)
+            if no_date_list:
+                get_one_page(log, no_date_list, sql_pool)
+            else:
+                log.info("所有记录都已更新评级日期,等待新数据...")
+                time.sleep(3600)  # 等待1小时后再次检查
+
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'评级日期更新程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+if __name__ == '__main__':
+    update_existing_grade_dates(logger)
+    # get_one_page(logger, [1,1,1,1,1,1], None)

+ 349 - 0
tag_spider/update_grade_dates_loop.py

@@ -0,0 +1,349 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/1/22 15:08
+"""
+独立脚本,用于更新已有数据的评级日期
+"""
+import time
+import inspect
+import schedule
+from loguru import logger
+from parsel import Selector
+from urllib.parse import unquote
+from mysql_pool import MySQLConnectionPool
+from urllib.parse import urlparse, parse_qs
+from DrissionPage import Chromium, ChromiumOptions
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+"""
+https://my.taggrading.com/pop-report/Baseball
+https://my.taggrading.com/pop-report/Basketball
+https://my.taggrading.com/pop-report/Football
+https://my.taggrading.com/pop-report/Hockey
+https://my.taggrading.com/pop-report/MMA
+https://my.taggrading.com/pop-report/Soccer
+https://my.taggrading.com/pop-report/Racing
+https://my.taggrading.com/pop-report/Golf
+https://my.taggrading.com/pop-report/Tennis
+https://my.taggrading.com/pop-report/Boxing
+https://my.taggrading.com/pop-report/Other%20Sports
+https://my.taggrading.com/pop-report/Multi-Sport
+https://my.taggrading.com/pop-report/Marvel%2FDC
+https://my.taggrading.com/pop-report/Star%20Wars
+https://my.taggrading.com/pop-report/Fortnite
+https://my.taggrading.com/pop-report/Garbage%20Pail%20Kids
+https://my.taggrading.com/pop-report/Music
+https://my.taggrading.com/pop-report/TV%2FMovies
+https://my.taggrading.com/pop-report/Wrestling
+https://my.taggrading.com/pop-report/Video%20Games
+https://my.taggrading.com/pop-report/Nature
+https://my.taggrading.com/pop-report/Pop%20Culture
+https://my.taggrading.com/pop-report/Disney
+https://my.taggrading.com/pop-report/Pok%C3%A9mon
+https://my.taggrading.com/pop-report/Magic%20the%20Gathering
+https://my.taggrading.com/pop-report/Dragon%20Ball
+https://my.taggrading.com/pop-report/Metazoo
+https://my.taggrading.com/pop-report/Wei%C3%9F%20Schwarz
+https://my.taggrading.com/pop-report/One%20Piece
+https://my.taggrading.com/pop-report/Lorcana
+https://my.taggrading.com/pop-report/Digimon
+https://my.taggrading.com/pop-report/Other%20TCG
+
+YU-GI-OH (COMING SOON)  打不开  404
+"""
+
+CATEGORY_LIST = ["Baseball", "Basketball", "Football", "Hockey", "MMA", "Soccer", "Racing", "Golf", "Tennis", "Boxing",
+                 "Other%20Sports", "Multi-Sport", "Marvel%2FDC", "Star%20Wars", "Fortnite", "Garbage%20Pail%20Kids",
+                 "Music", "TV%2FMovies", "Wrestling", "Video%20Games", "Nature", "Pop%20Culture", "Disney",
+                 "Pok%C3%A9mon", "Magic%20the%20Gathering", "Dragon%20Ball", "Metazoo", "Wei%C3%9F%20Schwarz",
+                 "One%20Piece", "Lorcana", "Digimon", "Other%20TCG"]
+
+
+# logger.remove()
+# logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+#            format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+#            level="DEBUG", retention="7 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+def get_one_page(log, d_url) -> any:
+    options = ChromiumOptions()
+    options.set_paths(local_port=9105, user_data_path=r'D:\Drissionpage_temp\tag_port_9105')
+    # options.set_argument("--disable-gpu")
+
+    # 隧道域名:端口号
+    # tunnel = "x371.kdltps.com:15818"
+    # options.set_proxy("http://" + tunnel)
+    options.set_argument("-accept-lang=en-US")
+    browser = Chromium(options)
+    tab = browser.latest_tab
+    try:
+        tab.get(d_url)
+        # tab.wait.load_start()  # 等待页面进入加载状态
+        # if 'https://my.taggrading.com/card/' in d_url:
+        #     time.sleep(3)
+        # time.sleep(2)
+        # tab.scroll.to_bottom()
+        tab.wait.load_start()  # 等待页面进入加载状态
+
+        selector = Selector(tab.html)
+        return selector
+
+    except Exception as e:
+        log.error(f"查询失败,错误信息:{e}")
+        return None
+    finally:
+        tab.close()
+        browser.quit()
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_years(log, category):
+    try:
+        log.debug(f"Getting years for category: {category}")
+        category_url = f'https://my.taggrading.com/pop-report/{category}'
+        #                https://my.taggrading.com/pop-report/Baseball
+        # resp = requests.get(category_url, headers=headers, timeout=10, proxies=get_proxys(log))
+        selector = get_one_page(log, category_url)
+        if not selector:  # 检查返回值是否为 None
+            log.warning(f"Selector is None for category: {category}")
+            return []  # 返回默认值
+        # resp.raise_for_status()
+        # selector = Selector(text=resp.text)
+        tag_years_list = selector.xpath(
+            "//tbody[@class='MuiTableBody-root']/tr/td[1]/a/@href").getall()
+        return tag_years_list
+    except Exception as e:
+        log.error(f"Error getting years: {e}")
+        return None
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_set_names(log, year_url):
+    try:
+        log.debug(f"Getting set names for year: {year_url}")
+        # resp = requests.get(year_url, headers=headers, timeout=10, proxies=get_proxys(log))
+        # resp.raise_for_status()
+        # selector = Selector(text=resp.text)
+        selector = get_one_page(log, year_url)
+        if not selector:  # 检查返回值是否为 None
+            log.warning(f"Selector is None for year_url: {year_url}")
+            return []  # 返回默认值
+        set_names_list = selector.xpath(
+            "//tbody[@class='MuiTableBody-root']/tr/td[1]//a/@href").getall()
+        return set_names_list
+    except Exception as e:
+        log.error(f"Error getting set names: {e}")
+        return None
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_card_name_list(log, set_name_url):
+    try:
+        log.debug(f"Getting card names for set: {set_name_url}")
+        # resp = requests.get(set_name_url, headers=headers, timeout=10, proxies=get_proxys(log))
+        # resp.raise_for_status()
+        # selector = Selector(text=resp.text)
+        selector = get_one_page(log, set_name_url)
+        if not selector:  # 检查返回值是否为 None
+            log.warning(f"Selector is None for set_name_url: {set_name_url}")
+            return []  # 返回默认值
+        card_name_list = selector.xpath('//tbody[@class="MuiTableBody-root"]/tr/td[2]/div/div/a/@href').getall()
+        return card_name_list
+    except Exception as e:
+        log.error(f"Error getting card names: {e}")
+        return None
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_view_list(log, card_name_url):
+    try:
+        log.debug(f"Getting view for card: {card_name_url}")
+        # resp = requests.get(card_name_url, headers=headers, timeout=10, proxies=get_proxys(log))
+        # resp.raise_for_status()
+        # selector = Selector(text=resp.text)
+        selector = get_one_page(log, card_name_url)
+        if not selector:  # 检查返回值是否为 None
+            log.warning(f"Selector is None for card_name_url: {card_name_url}")
+            return []  # 返回默认值
+        # view_list = selector.xpath("//tbody[@class='MuiTableBody-root']/tr/td[3]/a/@href").getall()
+        view_list = selector.xpath("//tbody[@class='MuiTableBody-root']/tr")
+        return view_list
+    except Exception as e:
+        log.error(f"Error getting view: {e}")
+        return None
+
+
+def get_set_name(href):
+    # 解析 URL
+    parsed_url = urlparse(href)
+    # 提取查询参数
+    query_params = parse_qs(parsed_url.query)
+    # 获取 setName 的值,默认为 None
+    return query_params.get('setName', [None])[0]
+
+
+@retry(stop=stop_after_attempt(50), wait=wait_fixed(1800), after=after_log)
+def tag_main(log):
+    """
+    主函数
+    :param log:
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool:
+        log.error("MySQL数据库连接失败")
+        raise Exception("MySQL数据库连接失败")
+
+    try:
+        # sql_card_id_list = sql_pool.select_all("SELECT card_id FROM tag_record")
+        # sql_card_id_list = [item[0] for item in sql_card_id_list]
+        for category in CATEGORY_LIST:
+            log.info(f"开始爬取 {category} 标签数据")
+            try:
+                tag_years_list = get_years(log, category)
+                # if not tag_years_list:
+                #     log.warning(f"Failed to get years for category: {category}, skipping category")
+                #     continue
+                if tag_years_list is None:
+                    log.warning(f"Failed to get years for category: {category}, returning default value")
+                    tag_years_list = []  # 返回默认值
+
+                for tag_year in tag_years_list:
+                    try:
+                        tag_year_url = f'https://my.taggrading.com{tag_year}'
+                        year = tag_year.split('/')[-1]
+                        log.debug(f"Getting set names for year: {year}")
+
+                        set_names_list = get_set_names(log, tag_year_url)
+                        # print(f"set_names_list: {set_names_list}")
+                        # if not set_names_list:
+                        #     log.warning(f"Failed to get set names for year: {tag_year}, skipping year")
+                        #     continue
+                        if set_names_list is None:
+                            log.warning(f"Failed to get set names for year: {tag_year}, returning default value")
+                            set_names_list = []  # 返回默认值
+
+                        for tag_set_name in set_names_list:
+                            try:
+                                set_name_url = f'https://my.taggrading.com{tag_set_name}'
+                                log.debug(f"Getting card names for set: {tag_set_name}")
+                                # print('tag_set_name:',tag_set_name)
+
+                                if '?' in tag_set_name:
+                                    brand_name = tag_set_name.split('?')[0].split('/')[-1]
+                                else:
+                                    brand_name = tag_set_name.split('/')[-1]
+
+                                card_set_name = get_set_name(tag_set_name)
+                                log.debug(f"Getting card names for set: {card_set_name}, brand_name: {brand_name}")
+                                card_name_list = get_card_name_list(log, set_name_url)
+                                # if not card_name_list:
+                                #     log.warning(f"Failed to get card names for set: {tag_set_name}, skipping set")
+                                #     continue
+                                if card_name_list is None:
+                                    log.warning(
+                                        f"Failed to get card names for set: {tag_set_name}, returning default value")
+                                    card_name_list = []  # 返回默认值
+
+                                for tag_card_name in card_name_list:
+                                    try:
+                                        card_name_url = f'https://my.taggrading.com{tag_card_name}'
+                                        log.debug(f"Getting view list for card: {tag_card_name}")
+                                        # print(f"tag_card_name: {tag_card_name}")
+                                        # brand_name = tag_set_name.split('/')[3]
+                                        card_name = tag_card_name.split('/')[5]
+                                        if '?' in tag_card_name:
+                                            card_number = tag_card_name.split('/')[-1].split('?')[0]
+                                        else:
+                                            card_number = tag_card_name.split('/')[-1]
+                                        if card_number:
+                                            card_number = unquote(card_number.strip())
+                                        else:
+                                            card_number = ''
+
+                                        view_list_url = get_view_list(log, card_name_url)
+                                        log.debug(f"Getting view list for card_name: {card_name}, card_number:{card_number}")
+                                        # if not view_list_url:
+                                        #     log.warning(
+                                        #         f"Failed to get view list for card: {tag_card_name}, skipping card")
+                                        #     continue
+                                        if view_list_url is None:
+                                            log.warning(
+                                                f"Failed to get view list for card: {tag_card_name}, returning default value")
+                                            view_list_url = []  # 返回默认值
+
+                                        for loop_view in view_list_url:
+                                            try:
+                                                tag_view = loop_view.xpath(".//td[3]/a/@href")
+                                                view_url = f'https://my.taggrading.com{tag_view}'
+                                                completed_date = loop_view.xpath("./td[7]/text()").get()
+                                                card_id = view_url.split('/')[-1]
+                                                data_dict = {"completed_date": completed_date}
+                                                condition_dict = {"card_id": card_id}
+                                                # print(data_dict, condition_dict)
+                                                sql_pool.update_one_or_dict(table="tag_record", data=data_dict, condition=condition_dict)
+
+                                            except Exception as e:
+                                                log.error(f"Error getting view url: {e}")
+                                                continue
+                                    except Exception as e:
+                                        log.error(f"Error getting card name url: {e}")
+                                        continue
+                            except Exception as e:
+                                log.error(f"Error getting set name url: {e}")
+                                continue
+                    except Exception as e:
+                        log.error(f"Error getting tag year url: {e}")
+                        continue
+            except Exception as e:
+                log.error(f"Error getting tag years: {e}")
+                continue
+
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+def schedule_task():
+    """
+    设置定时任务
+    """
+    tag_main(log=logger)
+
+    # schedule.every().sunday.at("03:31").do(tag_main, log=logger)
+    schedule.every(15).days.at("03:31").do(tag_main, log=logger)
+
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+if __name__ == '__main__':
+    # schedule_task()
+    tag_main(logger)
+    # print(get_years(logger, 'Baseball'))
+
+