# -*- coding: utf-8 -*- # Author : Charley # Python : 3.8.10 # Date: 2024-08-05 19:42 import pymysql import YamlLoader from loguru import logger from retrying import retry from dbutils.pooled_db import PooledDB # 获取yaml配置 yaml = YamlLoader.readYaml() mysqlYaml = yaml.get("mysql") sql_host = mysqlYaml.getValueAsString("host") sql_port = mysqlYaml.getValueAsInt("port") sql_user = mysqlYaml.getValueAsString("username") sql_password = mysqlYaml.getValueAsString("password") sql_db = mysqlYaml.getValueAsString("db") class MySQLConnectionPool: """ MySQL连接池 """ def __init__(self, mincached=1, maxcached=3, maxconnections=5, log=None): """ 初始化连接池 :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建 :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制) :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接) """ # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger self.log = log or logger self.pool = PooledDB( creator=pymysql, mincached=mincached, maxcached=maxcached, maxconnections=maxconnections, blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 host=sql_host, port=sql_port, user=sql_user, password=sql_password, database=sql_db ) @retry(stop_max_attempt_number=100, wait_fixed=600000) def _get_connection(self): """ 获取连接 :return: 连接 """ try: return self.pool.connection() except Exception as e: self.log.error(f"Failed to get connection from pool: {e}, wait 10 mins retry") raise e @staticmethod def _close_connection(conn): """ 关闭连接 :param conn: 连接 """ if conn: conn.close() @retry(stop_max_attempt_number=5, wait_fixed=1000) def _execute(self, query, args=None, commit=False): """ 执行SQL :param query: SQL语句 :param args: SQL参数 :param commit: 是否提交事务 :return: 查询结果 """ conn = None cursor = None try: conn = self._get_connection() cursor = conn.cursor() cursor.execute(query, args) if commit: conn.commit() self.log.debug(f"sql _execute , Query: {query}, Rows: {cursor.rowcount}") return cursor except Exception as e: if conn and not commit: conn.rollback() self.log.error(f"Error executing query: {e}") raise e finally: if cursor: cursor.close() self._close_connection(conn) def select_one(self, query, args=None): """ 执行查询,返回单个结果 :param query: 查询语句 :param args: 查询参数 :return: 查询结果 """ cursor = self._execute(query, args) return cursor.fetchone() def select_all(self, query, args=None): """ 执行查询,返回所有结果 :param query: 查询语句 :param args: 查询参数 :return: 查询结果 """ cursor = self._execute(query, args) return cursor.fetchall() def insert_one(self, query, args): """ 执行单条插入语句 :param query: 插入语句 :param args: 插入参数 """ # self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') return self._execute(query, args, commit=True) def insert_all(self, query, args_list): """ 执行批量插入语句,如果失败则逐条插入 :param query: 插入语句 :param args_list: 插入参数列表 """ conn = None cursor = None try: conn = self._get_connection() cursor = conn.cursor() cursor.executemany(query, args_list) conn.commit() self.log.debug(f"sql insert_all , SQL: {query}, Rows: {cursor.rowcount}") self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') except Exception as e: conn.rollback() self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}") # 如果批量插入失败,则逐条插入 rowcount = 0 for args in args_list: self.insert_one(query, args) rowcount += 1 self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.") finally: cursor.close() self._close_connection(conn) def update_one(self, query, args): """ 执行单条更新语句 :param query: 更新语句 :param args: 更新参数 """ self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') return self._execute(query, args, commit=True) def update_all(self, query, args_list): """ 执行批量更新语句,如果失败则逐条更新 :param query: 更新语句 :param args_list: 更新参数列表 """ conn = None cursor = None try: conn = self._get_connection() cursor = conn.cursor() cursor.executemany(query, args_list) conn.commit() self.log.debug(f"sql update_all , SQL: {query}, Rows: {cursor.rowcount}") self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>') except Exception as e: conn.rollback() self.log.error(f"Error executing query: {e}") # 如果批量更新失败,则逐条更新 rowcount = 0 for args in args_list: self.update_one(query, args) rowcount += 1 self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.') finally: cursor.close() self._close_connection(conn)