Procházet zdrojové kódy

refactor(spider): 重构代理和请求重试逻辑并优化数据解析与数据库操作

- 使用 tenacity 代替 retrying 实现重试机制,统一重试回调日志输出
- 代理获取函数增加 logger 参数支持,改进异常日志记录
- 调整请求和解析流程,新增日志入参,提升调试信息完善度
- 优化日期转换函数,增加异常捕获和日志记录
- 更新数据解析逻辑,修正 XPath 路径并规范字段映射
- 修改数据保存方式,采用字典参数直接写入数据库
- 线程池任务提交及异常处理加入日志参数,更精细任务状态跟踪
- 扩展数据库连接池最大连接数及缓存数,提升连接复用效率
- 重构批量插入逻辑,增加异常降级处理及详细日志,解决重复数据插入问题
- 更新调度任务调用方式,支持日志参数传递及首次执行明确日志输出
- README 增加新爬虫任务启动命令说明
charley před 1 měsícem
rodič
revize
8f1af64db1
3 změnil soubory, kde provedl 230 přidání a 251 odebrání
  1. 7 0
      ags_spider/README.md
  2. 148 139
      ags_spider/ags_new_daily.py
  3. 75 112
      ags_spider/mysql_pool.py

+ 7 - 0
ags_spider/README.md

@@ -12,3 +12,10 @@ python ags_new_daily.py
 python ags_history_spider.py
 ```
 
+
+## 3.  pop report爬虫任务
+
+```python
+# 启动命令
+python ags_pop_spider.py
+```

+ 148 - 139
ags_spider/ags_new_daily.py

@@ -2,27 +2,44 @@
 # Author  : Charley
 # Python  : 3.8.10
 # Date: 2024-10-14 11:01
-import json
 import time
 import requests
 import schedule
 import user_agent
 import concurrent.futures
 from loguru import logger
-from retrying import retry
 from parsel import Selector
 from datetime import datetime
-
 from mysql_pool import MySQLConnectionPool
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+
+# logger.remove()
+# logger.add("logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+#            format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+#            level="DEBUG", retention="7 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
 
-logger.remove()
-logger.add("logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
-           format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
-           level="DEBUG", retention="7 day")
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
 
 
-@retry(stop_max_attempt_number=3, wait_fixed=2000)
-def get_proxys_(log):
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys(log):
     """
     获取代理
     :return: 代理
@@ -41,28 +58,28 @@ def get_proxys_(log):
         raise e
 
 
-@retry(stop_max_attempt_number=5, wait_fixed=1000)
-def get_proxys():
-    # 已购买账户  北美
-    # http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36927"
-    # https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36927"
-    http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36928"
-    https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36928"
-
-    # url = "https://ifconfig.me"
-    try:
-        proxySettings = {
-            "http": http_proxy,
-            "https": https_proxy,
-        }
-        return proxySettings
-    except Exception as e:
-        logger.error(f"Error getting proxy: {e}")
-        raise e
-
-
-@retry(stop_max_attempt_number=3, wait_fixed=2000)
-def get_price(ucid):
+# @retry(stop=stop_after_attempt(5), wait=wait_fixed(2), after=after_log)
+# def get_proxys_(log):
+#     # 已购买账户  北美
+#     # http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36927"
+#     # https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36927"
+#     http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36928"
+#     https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36928"
+#
+#     # url = "https://ifconfig.me"
+#     try:
+#         proxySettings = {
+#             "http": http_proxy,
+#             "https": https_proxy,
+#         }
+#         return proxySettings
+#     except Exception as e:
+#         log.error(f"Error getting proxy: {e}")
+#         raise e
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_price(log, ucid):
     # ucid = '204224'
     headers = {
         "accept": "application/json, text/plain, */*",
@@ -72,152 +89,135 @@ def get_price(ucid):
     }
     url = f"https://robograding.com/api/v3/card-price/{ucid}"
     response = requests.get(url, headers=headers, proxies=get_proxys(), timeout=10)
+    # log.debug(response.text)
     # response = requests.get(url, headers=headers)
 
     if response.status_code != 200:
-        logger.debug('请求失败,重试......................')
+        log.debug('请求失败,重试......................')
         raise Exception('请求失败,重试......................')
     price = response.json().get('price')
     return price
 
 
-def transform_date(date_str):
+def transform_date(log, date_str):
     """
     November 13, 2020 类型的日期字符串 转换成年月日格式
+    :param log: logger
     :param date_str: 日期字符串
     :return: formatted_date
     """
-    # 解析日期字符串
-    date_obj = datetime.strptime(date_str, "%B %d, %Y")
-
-    # 格式化日期
-    formatted_date = date_obj.strftime("%Y-%m-%d")
-    return formatted_date
+    try:
+        # 解析日期字符串
+        date_obj = datetime.strptime(date_str, "%B %d, %Y")
 
+        # 格式化日期
+        formatted_date = date_obj.strftime("%Y-%m-%d")
+        return formatted_date
+    except Exception as e:
+        log.error(f"Error transforming date: {e}")
+        return None
 
-def save_data(sql_pool, info):
-    sql = "INSERT INTO ags_record (cert_id, name, title, score, card_type, release_date, series, card, ags_set, owner, centering_overall, surface_overall, edges_overall, corners_overall, price, front_img, back_img) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
-    sql_pool.insert_one(sql, info)
 
 
-def parse_data(cert_id, resp_text, sql_pool):
+def parse_data(log, cert_id, resp_text, sql_pool):
     selector = Selector(text=resp_text)
-    name = selector.xpath('//div[@class="feed-view__header__content"]//h1/text()').get()
-    title = selector.xpath('//div[@class="feed-view__header__content"]//h2/text()').get()
-    score = selector.xpath(
-        '//div[@class="feed-view__header__content"]//p[@class="feed-view__header__grade-score"]/text()').get()
-
-    tr_list = selector.xpath('//div[@class="feed-view__right-side"]//tbody/tr[not(contains(@class, "feed-view"))]')
-    result_dict = {'card_type': '',
-                   'release_date': '',
-                   'series': '',
-                   'card': '',
-                   'set': '',
-                   'owner': ''}
-    for tr in tr_list:
-        item_key = tr.xpath('./td[1]/h3/text()').get()
-        item_val = tr.xpath('./td[2]/text()').get()
-        try:
-            item_key = item_key.strip().replace(':', '')
-            item_val = item_val.strip().replace(':', '')
-            # print('item_key,item_val', item_key, item_val)
-            key_snake_case = item_key.replace(' ', '_').replace('/', '_').lower()
-            if key_snake_case in list(result_dict.keys()):
-                # print(key_snake_case, item_val)
-                result_dict[key_snake_case] = item_val
-        except Exception as e:
-            logger.debug(e)
+    tag_div1 = selector.xpath('//main/section/div[1]')
+    tag_div2 = selector.xpath('//main/section/div[2]')
+    name = tag_div2.xpath('//div/h3/text()').get()
+    # title = tag_div2.xpath('//div[@class="feed-view__header__content"]//h2/text()').get()
+    # score = tag_div2.xpath('/div[1]/div/div/div/span[2]/text()').get()
+    score = tag_div2.xpath('./div[1]//span[2]/text()').get()
+    # print(score)
+
+    tr_list = tag_div2.xpath('./div[@class="w-full pt-4"]/div')
+    ags_set = tr_list.xpath('./div[1]/span[2]/text()').get()
+    card_year = tr_list.xpath('./div[2]/span[2]/text()').get()
+    card_type = tr_list.xpath('./div[3]/span[2]/text()').get()
+    series = tr_list.xpath('./div[4]/span[2]/text()').get()
+    released = tr_list.xpath('./div[5]/span[2]/text()').get()  # 转换
+    owner = tr_list.xpath('./div[6]/span[2]/text()').get()
 
     # overall
-    centering_overall = selector.xpath(
-        '//div[@class="feed-view__breakdown__scores feed-view__breakdown__scores--contained"]/div[1]/p[2]/text()').get()
-    surface_overall = selector.xpath(
-        '//div[@class="feed-view__breakdown__scores feed-view__breakdown__scores--contained"]/div[2]/p[2]/text()').get()
-    edges_overall = selector.xpath(
-        '//div[@class="feed-view__breakdown__scores feed-view__breakdown__scores--contained"]/div[3]/p[2]/text()').get()
-    corners_overall = selector.xpath(
-        '//div[@class="feed-view__breakdown__scores feed-view__breakdown__scores--contained"]/div[4]/p[2]/text()').get()
-
-    # 获取用户id
-    user_card_id = selector.xpath(
-        '//div[@class="feed-view__breakdown__scores-holder feed-view__breakdown__scores-card-price"]/div/@data-user-card-id').get()
-    if user_card_id:
-        price = get_price(user_card_id)
-    else:
-        price = None
-
-    img_json = selector.xpath(
-        '//section[@class="feed-view__content"]//div[@class="feed-view__card"]/div/@data-images').get()
-    img_dict = json.loads(img_json)
-    front_img = img_dict.get('front_slab_image')
-    back_img = img_dict.get('back_slab_image')
-    if not front_img:
-        front_img = img_dict.get('image_path')
-        back_img = None
-
-    release_date = transform_date(result_dict.get('release_date')) if result_dict.get('release_date') else None
+    centering_overall = tag_div2.xpath(
+        './div[2]/div/div[1]/div/span/text()').get()
+    surface_overall = tag_div2.xpath(
+        './div[2]/div/div[2]/div/span/text()').get()
+    edges_overall = tag_div2.xpath(
+        './div[2]/div/div[3]/div/span/text()').get()
+    corners_overall = tag_div2.xpath(
+        './div[2]/div/div[4]/div/span/text()').get()
+
+    front_img = tag_div1.xpath('.//img/@src').get()
+
+
+    release_date = transform_date(log, released) if released else None
+
+    data_dict = {
+        "cert_id": cert_id,
+        "name": name,
+        "score": score,
+        "card_year":card_year,
+        "card_type": card_type,
+        "release_date": release_date,
+        "series": series,
+        "ags_set": ags_set,
+        "owner": owner,
+        "centering_overall": centering_overall,
+        "surface_overall": surface_overall,
+        "edges_overall": edges_overall,
+        "corners_overall": corners_overall,
+        "front_img": front_img,
+    }
+    # print(data_dict)
 
-    info = (cert_id, name, title, score, result_dict.get('card_type'), release_date,
-            result_dict.get('series'), result_dict.get('card'), result_dict.get('set'), result_dict.get('owner'),
-            centering_overall, surface_overall, edges_overall, corners_overall, price, front_img, back_img)
-    # print(info)
-    save_data(sql_pool, info)
+    # 保存数据到数据库
+    sql_pool.insert_one_or_dict(table="ags_record", data=data_dict)
 
 
-@retry(stop_max_attempt_number=5, wait_fixed=2000)
-def get_data(ags_id_list, sql_pool):
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_data(log, ags_id_list, sql_pool):
     sql_id = ags_id_list[0]
     cert_id = ags_id_list[1]
-    logger.debug(f"开始处理 {cert_id}")
+    log.debug(f"开始处理 {cert_id}")
     headers = {
         "user-agent": user_agent.generate_user_agent()
     }
-    url = f"https://robograding.com/feed/{cert_id}/view"
+    url = f"https://agscard.com/feed/{cert_id}/view"
 
     try:
-        response = requests.get(url, headers=headers, proxies=get_proxys(), timeout=10)
+        response = requests.get(url, headers=headers, proxies=get_proxys(log), timeout=10)
         # response = requests.get(url, headers=headers)
         response.raise_for_status()
+        # print(response.text)
 
         if "Grades are not available yet" in response.text:
-            logger.debug("Grades are not available yet in response.text......................")
+            log.debug("Grades are not available yet in response.text......................")
             # 更新数据库状态为未完成
             sql_pool.update_one("UPDATE ags_task SET state=2 WHERE id=%s", (sql_id,))
         else:
-            parse_data(cert_id, response.text, sql_pool)
+            parse_data(log, cert_id, response.text, sql_pool)
             # 更新数据库状态为已完成
             sql_pool.update_one("UPDATE ags_task SET state=1 WHERE id=%s", (sql_id,))
     except requests.RequestException as e:
-        logger.error(f"Request error: {e}")
-        raise  # 可以选择重新抛出异常以便外部处理
+        log.error(f"Request error: {e}")
+        raise
 
 
-def process_urls(ids, mysql_pool, batch_size=1000, max_workers=5):
+def process_urls(log, ids, mysql_pool, batch_size=1000, max_workers=5):
     with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
         for i in range(0, len(ids), batch_size):
             batch = ids[i:i + batch_size]
             try:
-                futures_to_urls = {executor.submit(get_data, url, mysql_pool): url for url in batch}
+                futures_to_urls = {executor.submit(get_data, log, url, mysql_pool): url for url in batch}
                 for future in concurrent.futures.as_completed(futures_to_urls):
                     url = futures_to_urls[future]
                     try:
                         future.result()
-                        logger.debug(f"处理 {url} 成功")
+                        log.debug(f"处理 {url} 成功")
                     except Exception as exc:
-                        logger.debug(f"处理 {url} 出错: {exc}")
+                        log.debug(f"处理 {url} 出错: {exc}")
             except Exception as e:
-                logger.error(f"提交任务失败: {e}")
-
-
-# def get_add_cert(start_, end_):
-#     num_list = []
-#     # 循环生成8位数的字符串
-#     for num in range(start_, end_ + 1):
-#         # 使用zfill将数字转换为字符串,并确保长度为8位,不足部分用0填充
-#         formatted_num = str(num).zfill(8)
-#         # print(formatted_num)
-#         num_list.append(formatted_num)
-#     return num_list
+                log.error(f"提交任务失败: {e}")
 
 
 def get_new_task(sql_pool):
@@ -232,45 +232,52 @@ def get_new_task(sql_pool):
     每日更新任务为+2000,-1000
     """
     ags_id_list = sql_pool.select_all(
-        f"SELECT id, cert_id FROM ags_task WHERE state != 1 AND cert_id <= '{end_max_cert_str}' ORDER BY id DESC LIMIT 3000")
+        f"SELECT id, cert_id FROM ags_task WHERE state != 1 AND cert_id <= '{end_max_cert_str}' ORDER BY id DESC LIMIT 10000") # 3000别忘了!!!!!!!!!!!!!!!!!!
     # ags_id_list = sql_pool.select_all("SELECT id,cert_id FROM ags_task WHERE id < 927059 AND state = 0 LIMIT 10000")
     ags_id_list = [i for i in ags_id_list]
     return ags_id_list
 
 
-@retry(stop_max_attempt_number=5, wait_fixed=2000)
-def main():
+@retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
+def main(log):
     """
     爬虫主函数
     """
     try:
-        logger.info("开始运行 tag_spider 爬虫任务............................................................")
-        sql_pool = MySQLConnectionPool(log=logger)
+        log.info("开始运行 tag_spider 爬虫任务............................................................")
+        sql_pool = MySQLConnectionPool(log=log)
         if not sql_pool:
-            logger.error("数据库连接失败")
+            log.error("数据库连接失败")
             raise Exception("数据库连接失败")
 
         # while True:
         new_task = get_new_task(sql_pool)
         if not new_task:
-            logger.debug(".............................. 没有新任务,结束本轮任务 ..............................")
+            log.debug(".............................. 没有新任务,结束本轮任务 ..............................")
             # break
             return
 
         try:
-            process_urls(new_task, sql_pool, batch_size=1000, max_workers=5)
+            process_urls(log, new_task, sql_pool, batch_size=1000, max_workers=5)
         except Exception as e:
-            logger.error('process urls: ', e)
+            log.error('process urls: ', e)
 
     except Exception as e:
-        logger.error(f'error:{e}')
+        log.error(f'error:{e}')
     finally:
-        logger.info("爬虫程序运行结束,等待下一轮的采集任务.............")
+        log.info("爬虫程序运行结束,等待下一轮的采集任务.............")
 
 
 def schedule_task():
-    main()
-    schedule.every().day.at("00:01").do(main)
+    """
+    爬虫模块 定时任务 的启动文件
+    """
+    # 立即运行一次任务
+    main(logger)
+
+    # 设置定时任务
+    schedule.every().day.at("00:01").do(main, log=logger)
+
     while True:
         schedule.run_pending()
         time.sleep(1)
@@ -278,4 +285,6 @@ def schedule_task():
 
 if __name__ == '__main__':
     schedule_task()
-    # main()
+    # main(logger)
+    # print(get_price('710785'))
+    # get_data(logger, (1061124, '00714066'), None )

+ 75 - 112
ags_spider/mysql_pool.py

@@ -23,7 +23,7 @@ class MySQLConnectionPool:
     MySQL连接池
     """
 
-    def __init__(self, mincached=1, maxcached=2, maxconnections=3, log=None):
+    def __init__(self, mincached=4, maxcached=5, maxconnections=10, log=None):
         """
         初始化连接池
         :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
@@ -44,10 +44,7 @@ class MySQLConnectionPool:
             user=sql_user,
             password=sql_password,
             database=sql_db,
-            ping=2,  # 每次执行前检查连接有效性,防止使用已断开的连接
-            connect_timeout=5,  # 连接超时时间(秒)
-            # read_timeout=30,  # 读取超时时间(秒)
-            write_timeout=30  # 写入超时时间(秒)
+            ping=0  # 每次连接使用时自动检查有效性(0=不检查,1=执行query前检查,2=每次执行前检查)
         )
 
     def _execute(self, query, args=None, commit=False):
@@ -67,7 +64,7 @@ class MySQLConnectionPool:
                     self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
                     return cursor
         except Exception as e:
-            if commit and conn:
+            if commit:
                 conn.rollback()
             self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
             raise e
@@ -104,7 +101,7 @@ class MySQLConnectionPool:
 
     def insert_all(self, query, args_list):
         """
-        执行批量插入语句,如果失败则逐条插入
+        执行批量插入语句如果失败则逐条插入
         :param query: 插入语句
         :param args_list: 插入参数列表
         """
@@ -115,33 +112,17 @@ class MySQLConnectionPool:
             cursor = conn.cursor()
             cursor.executemany(query, args_list)
             conn.commit()
-            self.log.debug(f"sql insert_all, SQL: {query[:100]}..., Rows: {cursor.rowcount}")
+            self.log.debug(f"sql insert_all, SQL: {query}, Rows: {len(args_list)}")
             self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
-        except pymysql.err.IntegrityError as e:
-            if "Duplicate entry" in str(e):
-                conn.rollback()
-                self.log.warning(f"批量插入遇到重复,开始逐条插入。错误: {e}")
-                rowcount = 0
-                for args in args_list:
-                    try:
-                        self.insert_one(query, args)
-                        rowcount += 1
-                    except pymysql.err.IntegrityError as e2:
-                        if "Duplicate entry" in str(e2):
-                            self.log.debug(f"跳过重复条目: {e2}")
-                        else:
-                            self.log.error(f"插入失败: {e2}")
-                    except Exception as e2:
-                        self.log.error(f"插入失败: {e2}")
-                self.log.info(f"逐条插入完成: {rowcount}/{len(args_list)}条")
-            else:
-                conn.rollback()
-                self.log.exception(f"数据库完整性错误: {e}")
-                raise e
         except Exception as e:
             conn.rollback()
-            self.log.exception(f"批量插入失败: {e}")
-            raise e
+            self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}")
+            # 如果批量插入失败,则逐条插入
+            rowcount = 0
+            for args in args_list:
+                self.insert_one(query, args)
+                rowcount += 1
+            self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.")
         finally:
             if cursor:
                 cursor.close()
@@ -166,13 +147,20 @@ class MySQLConnectionPool:
             keys = ', '.join([self._safe_identifier(k) for k in data.keys()])
             values = ', '.join(['%s'] * len(data))
 
+            # query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
             # 构建 INSERT IGNORE 语句
             ignore_clause = "IGNORE" if ignore else ""
+            # insert_sql = f"INSERT {ignore_clause} INTO {table} ({columns}) VALUES ({placeholders})"
             query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
             args = tuple(data.values())
         elif query is None:
             raise ValueError("Either data or query must be provided")
 
+        # cursor = self._execute(query, args, commit)
+        # self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
+        # self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        # return cursor.lastrowid
+
         try:
             cursor = self._execute(query, args, commit)
             self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
@@ -181,12 +169,16 @@ class MySQLConnectionPool:
         except pymysql.err.IntegrityError as e:
             if "Duplicate entry" in str(e):
                 self.log.warning(f"插入失败:重复条目,已跳过。错误详情: {e}")
+                # print("插入失败:重复条目", e)
                 return -1  # 返回 -1 表示重复条目被跳过
             else:
                 self.log.exception(f"数据库完整性错误: {e}")
+                # print("插入失败:完整性错误", e)
                 raise
         except Exception as e:
-            self.log.exception(f"未知错误: {e}")
+            # self.log.error(f"未知错误: {str(e)}", exc_info=True)
+            self.log.exception(f"未知错误: {e}")  # 记录完整异常信息
+            # print("插入失败:未知错误", e)
             raise
 
     def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
@@ -211,6 +203,7 @@ class MySQLConnectionPool:
 
             # 构建 INSERT IGNORE 语句
             ignore_clause = "IGNORE" if ignore else ""
+            # insert_sql = f"INSERT {ignore_clause} INTO {table} ({columns}) VALUES ({placeholders})"
             query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
             args_list = [tuple(d.values()) for d in data_list]
         elif query is None:
@@ -226,71 +219,51 @@ class MySQLConnectionPool:
                         if commit:
                             conn.commit()
                         total += cursor.rowcount
-            except pymysql.err.IntegrityError as e:
-                # 处理唯一索引冲突
+            except pymysql.Error as e:
                 if "Duplicate entry" in str(e):
-                    if ignore:
-                        # 如果使用了 INSERT IGNORE,理论上不会进这里,但以防万一
-                        self.log.warning(f"批量插入遇到重复条目(ignore模式): {e}")
-                    else:
-                        # 没有使用 IGNORE,降级为逐条插入
-                        self.log.warning(f"批量插入遇到重复条目,开始逐条插入。错误: {e}")
-                        if commit:
-                            conn.rollback()
-                        
-                        rowcount = 0
-                        for j, args in enumerate(batch):
-                            try:
-                                if data_list:
-                                    # 字典模式
-                                    self.insert_one_or_dict(
-                                        table=table,
-                                        data=dict(zip(data_list[0].keys(), args)),
-                                        commit=commit,
-                                        ignore=False  # 单条插入时手动捕获重复
-                                    )
-                                else:
-                                    # 原始SQL模式
-                                    self.insert_one(query, args)
-                                rowcount += 1
-                            except pymysql.err.IntegrityError as e2:
-                                if "Duplicate entry" in str(e2):
-                                    self.log.debug(f"跳过重复条目[{i+j+1}]: {e2}")
-                                else:
-                                    self.log.error(f"插入失败[{i+j+1}]: {e2}")
-                            except Exception as e2:
-                                self.log.error(f"插入失败[{i+j+1}]: {e2}")
-                        total += rowcount
-                        self.log.info(f"批次逐条插入完成: 成功{rowcount}/{len(batch)}条")
+                    # self.log.warning(f"检测到重复条目,开始逐条插入。错误详情: {e}")
+                    raise e
+                    # rowcount = 0
+                    # for args in batch:
+                    #     try:
+                    #         self.insert_one_or_dict(table=table, data=dict(zip(data_list[0].keys(), args)),
+                    #                                 commit=commit)
+                    #         rowcount += 1
+                    #     except pymysql.err.IntegrityError as e2:
+                    #         if "Duplicate entry" in str(e2):
+                    #             self.log.warning(f"跳过重复条目: {args}")
+                    #         else:
+                    #             self.log.error(f"插入失败: {e2}, 参数: {args}")
+                    # total += rowcount
                 else:
-                    # 其他完整性错误
-                    self.log.exception(f"数据库完整性错误: {e}")
+                    self.log.exception(f"数据库错误: {e}")
                     if commit:
                         conn.rollback()
                     raise e
-            except Exception as e:
-                # 其他数据库错误
-                self.log.exception(f"批量插入失败: {e}")
-                if commit:
-                    conn.rollback()
-                raise e
+                # 重新抛出异常,供外部捕获
+                # 降级为单条插入
+                # for args in batch:
+                #     try:
+                #         self.insert_one_or_dict(table=None, query=query, args=args, commit=commit)
+                #         total += 1
+                #     except Exception as e2:
+                #         self.log.error(f"Single insert failed: {e2}")
+                # continue
         if table:
             self.log.info(f"sql insert_many, Table: {table}, Total Rows: {total}")
         else:
             self.log.info(f"sql insert_many, Query: {query}, Total Rows: {total}")
         return total
 
-    def insert_many_two(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
-                        ignore=False):
+    def insert_many_two(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True):
         """
-        批量插入(支持字典列表或原始SQL) - 备用方法
-        :param table: 表名(字典插入时必需)
+        批量插入(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
         :param data_list: 字典列表 [{列名: 值}]
-        :param query: 直接SQL语句(与data_list二选一)
-        :param args_list: SQL参数列表(query使用时必需)
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
         :param batch_size: 分批大小
         :param commit: 是否自动提交
-        :param ignore: 是否使用INSERT IGNORE
         :return: 影响行数
         """
         if data_list is not None:
@@ -298,51 +271,41 @@ class MySQLConnectionPool:
                 raise ValueError("Data_list must be a non-empty list of dictionaries")
             keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
             values = ', '.join(['%s'] * len(data_list[0]))
-            ignore_clause = "IGNORE" if ignore else ""
-            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
             args_list = [tuple(d.values()) for d in data_list]
         elif query is None:
             raise ValueError("Either data_list or query must be provided")
-    
+
         total = 0
         for i in range(0, len(args_list), batch_size):
             batch = args_list[i:i + batch_size]
             try:
                 with self.pool.connection() as conn:
                     with conn.cursor() as cursor:
+                        # 添加调试日志:输出 SQL 和参数示例
+                        # self.log.debug(f"Batch insert SQL: {query}")
+                        # self.log.debug(f"Sample args: {batch[0] if batch else 'None'}")
                         cursor.executemany(query, batch)
                         if commit:
                             conn.commit()
                         total += cursor.rowcount
-            except pymysql.err.IntegrityError as e:
-                if "Duplicate entry" in str(e) and not ignore:
-                    self.log.warning(f"批量插入遇到重复,降级为逐条插入: {e}")
-                    if commit:
-                        conn.rollback()
-                    rowcount = 0
-                    for args in batch:
-                        try:
-                            self.insert_one(query, args)
-                            rowcount += 1
-                        except pymysql.err.IntegrityError as e2:
-                            if "Duplicate entry" in str(e2):
-                                self.log.debug(f"跳过重复条目: {e2}")
-                            else:
-                                self.log.error(f"插入失败: {e2}")
-                        except Exception as e2:
-                            self.log.error(f"插入失败: {e2}")
-                    total += rowcount
-                else:
-                    self.log.exception(f"数据库完整性错误: {e}")
-                    if commit:
-                        conn.rollback()
-                    raise e
-            except Exception as e:
-                self.log.exception(f"批量插入失败: {e}")
+                        # self.log.debug(f"Batch insert succeeded. Rows: {cursor.rowcount}")
+            except Exception as e:  # 明确捕获数据库异常
+                self.log.exception(f"Batch insert failed: {e}")  # 使用 exception 记录堆栈
+                self.log.error(f"Failed SQL: {query}, Args count: {len(batch)}")
                 if commit:
                     conn.rollback()
-                raise e
-        self.log.info(f"sql insert_many_two, Table: {table}, Total Rows: {total}")
+                # 降级为单条插入,并记录每个错误
+                rowcount = 0
+                for args in batch:
+                    try:
+                        self.insert_one(query, args)
+                        rowcount += 1
+                    except Exception as e2:
+                        self.log.error(f"Single insert failed: {e2}, Args: {args}")
+                total += rowcount
+                self.log.debug(f"Inserted {rowcount}/{len(batch)} rows individually.")
+        self.log.info(f"sql insert_many, Table: {table}, Total Rows: {total}")
         return total
 
     def insert_too_many(self, query, args_list, batch_size=1000):