| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.8.10
- # Date: 2024-08-05 19:42
- import pymysql
- import YamlLoader
- from loguru import logger
- from retrying import retry
- from dbutils.pooled_db import PooledDB
- # 获取yaml配置
- yaml = YamlLoader.readYaml()
- mysqlYaml = yaml.get("mysql")
- sql_host = mysqlYaml.getValueAsString("host")
- sql_port = mysqlYaml.getValueAsInt("port")
- sql_user = mysqlYaml.getValueAsString("username")
- sql_password = mysqlYaml.getValueAsString("password")
- sql_db = mysqlYaml.getValueAsString("db")
- class MySQLConnectionPool:
- """
- MySQL连接池
- """
- def __init__(self, mincached=10, maxcached=5, maxconnections=20, log=None):
- """
- 初始化连接池
- :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
- :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
- :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
- """
- # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
- self.log = log or logger
- self.pool = PooledDB(
- creator=pymysql,
- mincached=mincached,
- maxcached=maxcached,
- maxconnections=maxconnections,
- blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
- host=sql_host,
- port=sql_port,
- user=sql_user,
- password=sql_password,
- database=sql_db
- )
- @retry(stop_max_attempt_number=100, wait_fixed=600000)
- def _get_connection(self):
- """
- 获取连接
- :return: 连接
- """
- try:
- return self.pool.connection()
- except Exception as e:
- self.log.error(f"Failed to get connection from pool: {e}, wait 10 mins retry")
- raise e
- @staticmethod
- def _close_connection(conn):
- """
- 关闭连接
- :param conn: 连接
- """
- if conn:
- conn.close()
- @retry(stop_max_attempt_number=5, wait_fixed=1000)
- def _execute(self, query, args=None, commit=False):
- """
- 执行SQL
- :param query: SQL语句
- :param args: SQL参数
- :param commit: 是否提交事务
- :return: 查询结果
- """
- conn = None
- cursor = None
- try:
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute(query, args)
- if commit:
- conn.commit()
- self.log.debug(f"sql _execute , Query: {query}, Rows: {cursor.rowcount}")
- return cursor
- except Exception as e:
- if conn and not commit:
- conn.rollback()
- self.log.error(f"Error executing query: {e}")
- raise e
- finally:
- if cursor:
- cursor.close()
- self._close_connection(conn)
- def select_one(self, query, args=None):
- """
- 执行查询,返回单个结果
- :param query: 查询语句
- :param args: 查询参数
- :return: 查询结果
- """
- cursor = self._execute(query, args)
- return cursor.fetchone()
- def select_all(self, query, args=None):
- """
- 执行查询,返回所有结果
- :param query: 查询语句
- :param args: 查询参数
- :return: 查询结果
- """
- cursor = self._execute(query, args)
- return cursor.fetchall()
- def insert_one(self, query, args):
- """
- 执行单条插入语句
- :param query: 插入语句
- :param args: 插入参数
- """
- self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
- return self._execute(query, args, commit=True)
- def insert_all(self, query, args_list):
- """
- 执行批量插入语句,如果失败则逐条插入
- :param query: 插入语句
- :param args_list: 插入参数列表
- """
- conn = None
- cursor = None
- try:
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.executemany(query, args_list)
- conn.commit()
- self.log.debug(f"sql insert_all , SQL: {query}, Rows: {cursor.rowcount}")
- self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
- except Exception as e:
- conn.rollback()
- self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}")
- # 如果批量插入失败,则逐条插入
- rowcount = 0
- for args in args_list:
- self.insert_one(query, args)
- rowcount += 1
- self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.")
- finally:
- cursor.close()
- self._close_connection(conn)
- def update_one(self, query, args):
- """
- 执行单条更新语句
- :param query: 更新语句
- :param args: 更新参数
- """
- self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
- return self._execute(query, args, commit=True)
- def update_all(self, query, args_list):
- """
- 执行批量更新语句,如果失败则逐条更新
- :param query: 更新语句
- :param args_list: 更新参数列表
- """
- conn = None
- cursor = None
- try:
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.executemany(query, args_list)
- conn.commit()
- self.log.debug(f"sql update_all , SQL: {query}, Rows: {cursor.rowcount}")
- self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
- except Exception as e:
- conn.rollback()
- self.log.error(f"Error executing query: {e}")
- # 如果批量更新失败,则逐条更新
- rowcount = 0
- for args in args_list:
- self.update_one(query, args)
- rowcount += 1
- self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
- finally:
- cursor.close()
- self._close_connection(conn)
|