Ver código fonte

fix(mysql_pool): 增强数据库连接断开后的重试机制

- 将 pymysql 连接 ping 参数改为 2,确保每次执行前检查连接有效性
- 实现 _execute 方法,支持在连接断开时自动重试一次
- 针对完整性错误进行特别处理,避免打印堆栈污染日志
- 修改 insert_one_or_dict 中重复条目日志为简短警告
- 调整主程序中示例代码,恢复插入单条记录的调用
- 在 YamlLoader.py 中新增路径解析逻辑,支持按主脚本目录查找配置
- 优化 readYaml 函数,增加配置文件环境后缀加载支持
- requirements.txt 新增项目依赖包列表,固定版本号以保证环境一致
charley 1 semana atrás
pai
commit
3d1e03f632

+ 45 - 25
myslabs_spider/YamlLoader.py

@@ -11,10 +11,10 @@ regex = re.compile(r'^\$\{(?P<ENV>[A-Z_\-]+:)?(?P<VAL>[\w.]+)}$')
 class YamlConfig:
     def __init__(self, config):
         self.config = config
-    
+
     def get(self, key: str):
         return YamlConfig(self.config.get(key))
-    
+
     def getValueAsString(self, key: str):
         try:
             match = regex.match(self.config[key])
@@ -25,7 +25,7 @@ class YamlConfig:
             return None
         except:
             return self.config[key]
-    
+
     def getValueAsInt(self, key: str):
         try:
             match = regex.match(self.config[key])
@@ -36,7 +36,7 @@ class YamlConfig:
             return 0
         except:
             return int(self.config[key])
-    
+
     def getValueAsBool(self, key: str):
         try:
             match = regex.match(self.config[key])
@@ -49,30 +49,50 @@ class YamlConfig:
             return bool(self.config[key])
 
 
-def readYaml(path: str = 'application.yml', profile: str = None) -> YamlConfig:
+def _resolve_path(path: str) -> str:
+    """
+    解析 yaml 文件路径,按优先级查找:
+      1) 绝对路径或 cwd 下存在 → 直接用(保留旧行为,向后兼容)
+      2) 调用方主脚本所在目录 → 兜底,方便打包后从任意 cwd 启动
+    :param path: (str) 用户传入的路径,默认 'application.yml'
+    :return: (str) 实际可读取的完整路径;找不到则返回原 path 让 open() 抛错
+    """
+    # 1) 旧行为:cwd 或绝对路径
     if os.path.exists(path):
-        with open(path) as fd:
-            conf = yaml.load(fd, Loader=yaml.FullLoader)
-    
+        return path
+
+    # 2) 主脚本目录(__main__.__file__)
+    try:
+        import __main__
+        main_file = getattr(__main__, '__file__', None)
+        if main_file:
+            candidate = os.path.join(os.path.dirname(os.path.abspath(main_file)), path)
+            if os.path.exists(candidate):
+                return candidate
+    except Exception:
+        pass
+
+    return path
+
+
+def readYaml(path: str = 'application.yml', profile: str = None) -> YamlConfig:
+    """
+    读取 yaml 配置。
+    :param path: (str) yaml 文件路径,默认 'application.yml'。
+                       优先 cwd / 绝对路径(保留旧行为),找不到再 fallback 到主脚本所在目录。
+    :param profile: (str) 可选环境后缀,如 'dev' 会额外加载 'application-dev.yml' 并 update
+    :return: (YamlConfig) 配置访问对象
+    :raises FileNotFoundError: cwd 和主脚本目录都找不到时抛出
+    """
+    real_path = _resolve_path(path)
+    with open(real_path, encoding='utf-8') as fd:
+        conf = yaml.load(fd, Loader=yaml.FullLoader)
+
     if profile is not None:
-        result = path.split('.')
+        result = real_path.rsplit('.', 1)
         profiledYaml = f'{result[0]}-{profile}.{result[1]}'
         if os.path.exists(profiledYaml):
-            with open(profiledYaml) as fd:
+            with open(profiledYaml, encoding='utf-8') as fd:
                 conf.update(yaml.load(fd, Loader=yaml.FullLoader))
-    
-    return YamlConfig(conf)
-
-# res = readYaml()
-# mysqlConf = res.get('mysql')
-# print(mysqlConf)
 
-# print(res.getValueAsString("host"))
-# mysqlYaml = mysqlConf.getValueAsString("host")
-# print(mysqlYaml)
-# host = mysqlYaml.get("host").split(':')[-1][:-1]
-# port = mysqlYaml.get("port").split(':')[-1][:-1]
-# username = mysqlYaml.get("username").split(':')[-1][:-1]
-# password = mysqlYaml.get("password").split(':')[-1][:-1]
-# mysql_db = mysqlYaml.get("db").split(':')[-1][:-1]
-# print(host,port,username,password)
+    return YamlConfig(conf)

+ 69 - 48
myslabs_spider/mysql_pool.py

@@ -44,36 +44,75 @@ class MySQLConnectionPool:
             user=sql_user,
             password=sql_password,
             database=sql_db,
-            charset="utf8mb4",
-            use_unicode=True,
-            init_command="SET NAMES utf8mb4",
-            ping=1,  # 0:完全关闭(更快), 1:仅在取连接时检查, 2:每次执行前检查连接有效性,防止使用已断开的连接
+            ping=2,  # 每次执行前检查连接有效性,防止使用已断开的连接
             connect_timeout=5,  # 连接超时时间(秒)
             # read_timeout=30,  # 读取超时时间(秒)
             write_timeout=30  # 写入超时时间(秒)
         )
 
+    # def _execute(self, query, args=None, commit=False):
+    #     """
+    #     执行SQL
+    #     :param query: SQL语句
+    #     :param args: SQL参数
+    #     :param commit: 是否提交事务
+    #     :return: 查询结果
+    #     """
+    #     try:
+    #         with self.pool.connection() as conn:
+    #             with conn.cursor() as cursor:
+    #                 cursor.execute(query, args)
+    #                 if commit:
+    #                     conn.commit()
+    #                 self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
+    #                 return cursor
+    #     except Exception as e:
+    #         if commit and conn:
+    #             conn.rollback()
+    #         self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
+    #         raise e
+
     def _execute(self, query, args=None, commit=False):
         """
-        执行SQL
+        执行SQL(带断连重试)
         :param query: SQL语句
         :param args: SQL参数
         :param commit: 是否提交事务
         :return: 查询结果
         """
-        try:
-            with self.pool.connection() as conn:
-                with conn.cursor() as cursor:
-                    cursor.execute(query, args)
-                    if commit:
-                        conn.commit()
-                    self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
-                    return cursor
-        except Exception as e:
-            if commit and conn:
-                conn.rollback()
-            self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
-            raise e
+        conn = None
+        for attempt in range(2):  # 最多重试1次
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.execute(query, args)
+                        if commit:
+                            conn.commit()
+                        self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
+                        return cursor
+            except pymysql.err.InterfaceError as e:
+                # 连接已断开,重试一次
+                if attempt == 0:
+                    self.log.warning(f"数据库连接断开,正在重试... Error: {e}")
+                    continue
+                self.log.error(f"重试后仍失败: {e}, Query: {query}")
+                raise e
+            except pymysql.err.IntegrityError:
+                # 完整性错误(如重复条目)交由上层处理,避免在此打印完整堆栈污染日志
+                if commit and conn:
+                    try:
+                        conn.rollback()
+                    except Exception:
+                        pass
+                raise
+            except Exception as e:
+                if commit and conn:
+                    try:
+                        conn.rollback()
+                    except Exception:
+                        pass
+                self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
+                raise e
 
     def select_one(self, query, args=None):
         """
@@ -183,13 +222,14 @@ class MySQLConnectionPool:
             return cursor.lastrowid
         except pymysql.err.IntegrityError as e:
             if "Duplicate entry" in str(e):
-                self.log.warning(f"插入失败:重复条目,已跳过。错误详情: {e}")
+                # 重复条目用 warning 简短输出,不打印堆栈
+                self.log.warning(f"插入跳过-重复条目 Table: {table}, {e.args[1] if len(e.args) > 1 else e}")
                 return -1  # 返回 -1 表示重复条目被跳过
             else:
-                self.log.exception(f"数据库完整性错误: {e}")
+                self.log.error(f"数据库完整性错误 Table: {table}, Error: {e}")
                 raise
         except Exception as e:
-            self.log.exception(f"未知错误: {e}")
+            self.log.error(f"insert_one_or_dict 失败 Table: {table}, Error: {e}")
             raise
 
     def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
@@ -621,30 +661,11 @@ class MySQLConnectionPool:
 
 if __name__ == '__main__':
     sql_pool = MySQLConnectionPool()
-    # data_dic = {'card_type_id': 111, 'card_type_name': '补充包 继承的意志【OPC-13】', 'card_type_position': 964,
-    #             'card_id': 5284, 'card_name': '蒙奇·D·路飞', 'card_number': 'OP13-001', 'card_rarity': 'L',
-    #             'card_img': 'https://source.windoent.com/OnePiecePc/Picture/1757929283612OP13-001.png',
-    #             'card_life': '4', 'card_attribute': '打', 'card_power': '5000', 'card_attack': '-',
-    #             'card_color': '红/绿', 'subscript': 4, 'card_features': '超新星/草帽一伙',
-    #             'card_text_desc': '【咚!!×1】【对方的攻击时】我方处于活跃状态的咚!!不多于5张的场合,可以将我方任意张数的咚!!转为休息状态。每有1张转为休息状态的咚!!,本次战斗中,此领袖或我方最多1张拥有《草帽一伙》特征的角色力量+2000。',
-    #             'card_offer_type': '补充包 继承的意志【OPC-13】', 'crawler_language': '简中'}
-    # sql_pool.insert_one_or_dict(table="one_piece_record", data=data_dic)
-
-    sql_pool.insert_many(
-        table="jhs_product_record",
-        data_list=[
-            {
-                "product_id": 99999991,
-                "seller_username": "浣熊小助理(裸卡版)",
-                "auction_product_name": "2000 日文 无编号 #175 U 波克比 有瑕疵",
-            },
-            {
-                "product_id": 99999992,
-                "seller_username": "测试商家二号",
-                "auction_product_name": "中文批量插入测试",
-            },
-        ],
-        ignore=False
-    )
-
-
+    data_dic = {'card_type_id': 111, 'card_type_name': '补充包 继承的意志【OPC-13】', 'card_type_position': 964,
+                'card_id': 5284, 'card_name': '蒙奇·D·路飞', 'card_number': 'OP13-001', 'card_rarity': 'L',
+                'card_img': 'https://source.windoent.com/OnePiecePc/Picture/1757929283612OP13-001.png',
+                'card_life': '4', 'card_attribute': '打', 'card_power': '5000', 'card_attack': '-',
+                'card_color': '红/绿', 'subscript': 4, 'card_features': '超新星/草帽一伙',
+                'card_text_desc': '【咚!!×1】【对方的攻击时】我方处于活跃状态的咚!!不多于5张的场合,可以将我方任意张数的咚!!转为休息状态。每有1张转为休息状态的咚!!,本次战斗中,此领袖或我方最多1张拥有《草帽一伙》特征的角色力量+2000。',
+                'card_offer_type': '补充包 继承的意志【OPC-13】', 'crawler_language': '简中'}
+    sql_pool.insert_one_or_dict(table="one_piece_record", data=data_dic)

+ 10 - 0
myslabs_spider/requirements.txt

@@ -0,0 +1,10 @@
+-i https://mirrors.aliyun.com/pypi/simple/
+DBUtils==3.1.2
+loguru==0.7.3
+parsel==1.11.0
+PyMySQL==1.1.2
+PyYAML==6.0.3
+requests==2.33.1
+schedule==1.2.2
+tenacity==9.1.4
+user_agent==0.1.14