mysq_pool.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.8.10
  4. # Date: 2024-08-05 19:42
  5. import pymysql
  6. import YamlLoader
  7. from loguru import logger
  8. from retrying import retry
  9. from dbutils.pooled_db import PooledDB
  10. # 获取yaml配置
  11. yaml = YamlLoader.readYaml()
  12. mysqlYaml = yaml.get("mysql")
  13. sql_host = mysqlYaml.getValueAsString("host")
  14. sql_port = mysqlYaml.getValueAsInt("port")
  15. sql_user = mysqlYaml.getValueAsString("username")
  16. sql_password = mysqlYaml.getValueAsString("password")
  17. sql_db = mysqlYaml.getValueAsString("db")
  18. class MySQLConnectionPool:
  19. """
  20. MySQL连接池
  21. """
  22. def __init__(self, mincached=1, maxcached=3, maxconnections=5, log=None):
  23. """
  24. 初始化连接池
  25. :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
  26. :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
  27. :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
  28. """
  29. # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
  30. self.log = log or logger
  31. self.pool = PooledDB(
  32. creator=pymysql,
  33. mincached=mincached,
  34. maxcached=maxcached,
  35. maxconnections=maxconnections,
  36. blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
  37. host=sql_host,
  38. port=sql_port,
  39. user=sql_user,
  40. password=sql_password,
  41. database=sql_db
  42. )
  43. @retry(stop_max_attempt_number=100, wait_fixed=600000)
  44. def _get_connection(self):
  45. """
  46. 获取连接
  47. :return: 连接
  48. """
  49. try:
  50. return self.pool.connection()
  51. except Exception as e:
  52. self.log.error(f"Failed to get connection from pool: {e}, wait 10 mins retry")
  53. raise e
  54. @staticmethod
  55. def _close_connection(conn):
  56. """
  57. 关闭连接
  58. :param conn: 连接
  59. """
  60. if conn:
  61. conn.close()
  62. @retry(stop_max_attempt_number=5, wait_fixed=1000)
  63. def _execute(self, query, args=None, commit=False):
  64. """
  65. 执行SQL
  66. :param query: SQL语句
  67. :param args: SQL参数
  68. :param commit: 是否提交事务
  69. :return: 查询结果
  70. """
  71. conn = None
  72. cursor = None
  73. try:
  74. conn = self._get_connection()
  75. cursor = conn.cursor()
  76. cursor.execute(query, args)
  77. if commit:
  78. conn.commit()
  79. self.log.debug(f"sql _execute , Query: {query}, Rows: {cursor.rowcount}")
  80. return cursor
  81. except Exception as e:
  82. if conn and not commit:
  83. conn.rollback()
  84. self.log.error(f"Error executing query: {e}")
  85. raise e
  86. finally:
  87. if cursor:
  88. cursor.close()
  89. self._close_connection(conn)
  90. def select_one(self, query, args=None):
  91. """
  92. 执行查询,返回单个结果
  93. :param query: 查询语句
  94. :param args: 查询参数
  95. :return: 查询结果
  96. """
  97. cursor = self._execute(query, args)
  98. return cursor.fetchone()
  99. def select_all(self, query, args=None):
  100. """
  101. 执行查询,返回所有结果
  102. :param query: 查询语句
  103. :param args: 查询参数
  104. :return: 查询结果
  105. """
  106. cursor = self._execute(query, args)
  107. return cursor.fetchall()
  108. def insert_one(self, query, args):
  109. """
  110. 执行单条插入语句
  111. :param query: 插入语句
  112. :param args: 插入参数
  113. """
  114. # self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  115. return self._execute(query, args, commit=True)
  116. def insert_all(self, query, args_list):
  117. """
  118. 执行批量插入语句,如果失败则逐条插入
  119. :param query: 插入语句
  120. :param args_list: 插入参数列表
  121. """
  122. conn = None
  123. cursor = None
  124. try:
  125. conn = self._get_connection()
  126. cursor = conn.cursor()
  127. cursor.executemany(query, args_list)
  128. conn.commit()
  129. self.log.debug(f"sql insert_all , SQL: {query}, Rows: {cursor.rowcount}")
  130. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  131. except Exception as e:
  132. conn.rollback()
  133. self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}")
  134. # 如果批量插入失败,则逐条插入
  135. rowcount = 0
  136. for args in args_list:
  137. self.insert_one(query, args)
  138. rowcount += 1
  139. self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.")
  140. finally:
  141. cursor.close()
  142. self._close_connection(conn)
  143. def update_one(self, query, args):
  144. """
  145. 执行单条更新语句
  146. :param query: 更新语句
  147. :param args: 更新参数
  148. """
  149. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  150. return self._execute(query, args, commit=True)
  151. def update_all(self, query, args_list):
  152. """
  153. 执行批量更新语句,如果失败则逐条更新
  154. :param query: 更新语句
  155. :param args_list: 更新参数列表
  156. """
  157. conn = None
  158. cursor = None
  159. try:
  160. conn = self._get_connection()
  161. cursor = conn.cursor()
  162. cursor.executemany(query, args_list)
  163. conn.commit()
  164. self.log.debug(f"sql update_all , SQL: {query}, Rows: {cursor.rowcount}")
  165. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  166. except Exception as e:
  167. conn.rollback()
  168. self.log.error(f"Error executing query: {e}")
  169. # 如果批量更新失败,则逐条更新
  170. rowcount = 0
  171. for args in args_list:
  172. self.update_one(query, args)
  173. rowcount += 1
  174. self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
  175. finally:
  176. cursor.close()
  177. self._close_connection(conn)