# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/3/25 14:14 import re import pymysql import YamlLoader from loguru import logger from dbutils.pooled_db import PooledDB # 获取yaml配置 yaml = YamlLoader.readYaml() mysqlYaml = yaml.get("mysql") sql_host = mysqlYaml.getValueAsString("host") sql_port = mysqlYaml.getValueAsInt("port") sql_user = mysqlYaml.getValueAsString("username") sql_password = mysqlYaml.getValueAsString("password") sql_db = mysqlYaml.getValueAsString("db") class MySQLConnectionPool: """ MySQL连接池 """ def __init__(self, mincached=4, maxcached=5, maxconnections=10, log=None): """ 初始化连接池 :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建 :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制) :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接) :param log: 自定义日志记录器 """ # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger self.log = log or logger self.pool = PooledDB( creator=pymysql, mincached=mincached, maxcached=maxcached, maxconnections=maxconnections, blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 host=sql_host, port=sql_port, user=sql_user, password=sql_password, database=sql_db, ping=0 # 每次连接使用时自动检查有效性(0=不检查,1=执行query前检查,2=每次执行前检查) ) def _execute(self, query, args=None, commit=False): """ 执行SQL :param query: SQL语句 :param args: SQL参数 :param commit: 是否提交事务 :return: 查询结果 """ try: with self.pool.connection() as conn: with conn.cursor() as cursor: cursor.execute(query, args) if commit: conn.commit() self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}") return cursor except Exception as e: if commit: conn.rollback() self.log.error(f"Error executing query: {e}, Query: {query}, Args: {args}") raise e def select_one(self, query, args=None): """ 执行查询,返回单个结果 :param query: 查询语句 :param args: 查询参数 :return: 查询结果 """ cursor = self._execute(query, args) return cursor.fetchone() def select_all(self, query, args=None): """ 执行查询,返回所有结果 :param query: 查询语句 :param args: 查询参数 :return: 查询结果 """ cursor = self._execute(query, args) return cursor.fetchall() def insert_one(self, query, args): """ 执行单条插入语句 :param query: 插入语句 :param args: 插入参数 """ self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') cursor = self._execute(query, args, commit=True) return cursor.lastrowid # 返回插入的ID def insert_all(self, query, args_list): """ 执行批量插入语句,如果失败则逐条插入 :param query: 插入语句 :param args_list: 插入参数列表 """ conn = None cursor = None try: conn = self.pool.connection() cursor = conn.cursor() cursor.executemany(query, args_list) conn.commit() self.log.debug(f"sql insert_all, SQL: {query}, Rows: {len(args_list)}") self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') except Exception as e: conn.rollback() self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}") # 如果批量插入失败,则逐条插入 rowcount = 0 for args in args_list: self.insert_one(query, args) rowcount += 1 self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.") finally: if cursor: cursor.close() if conn: conn.close() def insert_one_or_dict(self, table=None, data=None, query=None, args=None, commit=True): """ 单条插入(支持字典或原始SQL) :param table: 表名(字典插入时必需) :param data: 字典数据 {列名: 值} :param query: 直接SQL语句(与data二选一) :param args: SQL参数(query使用时必需) :param commit: 是否自动提交 :return: 最后插入ID """ if data is not None: if not isinstance(data, dict): raise ValueError("Data must be a dictionary") keys = ', '.join([self._safe_identifier(k) for k in data.keys()]) values = ', '.join(['%s'] * len(data)) query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})" args = tuple(data.values()) elif query is None: raise ValueError("Either data or query must be provided") cursor = self._execute(query, args, commit) self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}") self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') return cursor.lastrowid def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=500, commit=True): """ 批量插入(支持字典列表或原始SQL) :param table: 表名(字典插入时必需) :param data_list: 字典列表 [{列名: 值}] :param query: 直接SQL语句(与data_list二选一) :param args_list: SQL参数列表(query使用时必需) :param batch_size: 分批大小 :param commit: 是否自动提交 :return: 影响行数 """ if data_list is not None: if not data_list or not isinstance(data_list[0], dict): raise ValueError("Data_list must be a non-empty list of dictionaries") keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()]) values = ', '.join(['%s'] * len(data_list[0])) query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})" args_list = [tuple(d.values()) for d in data_list] elif query is None: raise ValueError("Either data_list or query must be provided") total = 0 for i in range(0, len(args_list), batch_size): batch = args_list[i:i + batch_size] try: with self.pool.connection() as conn: with conn.cursor() as cursor: cursor.executemany(query, batch) if commit: conn.commit() total += cursor.rowcount # self.log.debug(f"sql insert_many_or_dict, SQL: {query}, Rows: {cursor.rowcount}") except Exception as e: if commit: conn.rollback() self.log.error(f"Batch insert failed: {e}") # 降级为单条插入 for args in batch: try: self.insert_one_or_dict(table=None, query=query, args=args, commit=commit) total += 1 except Exception as e2: self.log.error(f"Single insert failed: {e2}") continue self.log.info(f"sql insert_many_or_dict, Table: {table}, Total Rows: {total}") return total def insert_too_many(self, query, args_list, batch_size=1000): """ 执行批量插入语句,分片提交, 单次插入大于十万+时可用, 如果失败则降级为逐条插入 :param query: 插入语句 :param args_list: 插入参数列表 :param batch_size: 每次插入的条数 """ for i in range(0, len(args_list), batch_size): batch = args_list[i:i + batch_size] try: with self.pool.connection() as conn: with conn.cursor() as cursor: cursor.executemany(query, batch) conn.commit() except Exception as e: self.log.error(f"insert_too_many error. Trying single insert. Error: {e}") # 当前批次降级为单条插入 for args in batch: self.insert_one(query, args) def update_one(self, query, args): """ 执行单条更新语句 :param query: 更新语句 :param args: 更新参数 """ self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') return self._execute(query, args, commit=True) def update_all(self, query, args_list): """ 执行批量更新语句,如果失败则逐条更新 :param query: 更新语句 :param args_list: 更新参数列表 """ conn = None cursor = None try: conn = self.pool.connection() cursor = conn.cursor() cursor.executemany(query, args_list) conn.commit() self.log.debug(f"sql update_all, SQL: {query}, Rows: {len(args_list)}") self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') except Exception as e: conn.rollback() self.log.error(f"Error executing query: {e}") # 如果批量更新失败,则逐条更新 rowcount = 0 for args in args_list: self.update_one(query, args) rowcount += 1 self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.') finally: if cursor: cursor.close() if conn: conn.close() def check_pool_health(self): """ 检查连接池中有效连接数 # 使用示例 # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") """ try: with self.pool.connection() as conn: conn.ping(reconnect=True) return True except Exception as e: self.log.error(f"Connection pool health check failed: {e}") return False @staticmethod def _safe_identifier(name): """SQL标识符安全校验""" if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name): raise ValueError(f"Invalid SQL identifier: {name}") return name