mysq_pool.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.8.10
  4. # Date: 2024-08-05 19:42
  5. import pymysql
  6. import YamlLoader
  7. from loguru import logger
  8. from retrying import retry
  9. from dbutils.pooled_db import PooledDB
  10. # 获取yaml配置
  11. yaml = YamlLoader.readYaml()
  12. mysqlYaml = yaml.get("mysql")
  13. sql_host = mysqlYaml.getValueAsString("host")
  14. sql_port = mysqlYaml.getValueAsInt("port")
  15. sql_user = mysqlYaml.getValueAsString("username")
  16. sql_password = mysqlYaml.getValueAsString("password")
  17. sql_db = mysqlYaml.getValueAsString("db")
  18. class MySQLConnectionPool:
  19. """
  20. MySQL连接池
  21. """
  22. def __init__(self, mincached=1, maxcached=2, maxconnections=3, log=None):
  23. """
  24. 初始化连接池
  25. :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
  26. :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
  27. :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
  28. """
  29. # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
  30. self.log = log or logger
  31. self.pool = PooledDB(
  32. creator=pymysql,
  33. mincached=mincached,
  34. maxcached=maxcached,
  35. maxconnections=maxconnections,
  36. blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
  37. host=sql_host,
  38. port=sql_port,
  39. user=sql_user,
  40. password=sql_password,
  41. database=sql_db,
  42. ping=2, # 每次执行前检查连接有效性,防止使用已断开的连接
  43. connect_timeout=5, # 连接超时时间(秒)
  44. # read_timeout=30, # 读取超时时间(秒)
  45. write_timeout=30 # 写入超时时间(秒)
  46. )
  47. @retry(stop_max_attempt_number=100, wait_fixed=600000)
  48. def _get_connection(self):
  49. """
  50. 获取连接
  51. :return: 连接
  52. """
  53. try:
  54. return self.pool.connection()
  55. except Exception as e:
  56. self.log.error(f"Failed to get connection from pool: {e}, wait 10 mins retry")
  57. raise e
  58. @staticmethod
  59. def _close_connection(conn):
  60. """
  61. 关闭连接
  62. :param conn: 连接
  63. """
  64. if conn:
  65. conn.close()
  66. @retry(stop_max_attempt_number=5, wait_fixed=1000)
  67. def _execute(self, query, args=None, commit=False):
  68. """
  69. 执行SQL
  70. :param query: SQL语句
  71. :param args: SQL参数
  72. :param commit: 是否提交事务
  73. :return: 查询结果
  74. """
  75. conn = None
  76. cursor = None
  77. try:
  78. conn = self._get_connection()
  79. cursor = conn.cursor()
  80. cursor.execute(query, args)
  81. if commit:
  82. conn.commit()
  83. self.log.debug(f"sql _execute , Query: {query}, Rows: {cursor.rowcount}")
  84. return cursor
  85. except Exception as e:
  86. if conn and not commit:
  87. conn.rollback()
  88. self.log.error(f"Error executing query: {e}")
  89. raise e
  90. finally:
  91. if cursor:
  92. cursor.close()
  93. self._close_connection(conn)
  94. def select_one(self, query, args=None):
  95. """
  96. 执行查询,返回单个结果
  97. :param query: 查询语句
  98. :param args: 查询参数
  99. :return: 查询结果
  100. """
  101. cursor = self._execute(query, args)
  102. return cursor.fetchone()
  103. def select_all(self, query, args=None):
  104. """
  105. 执行查询,返回所有结果
  106. :param query: 查询语句
  107. :param args: 查询参数
  108. :return: 查询结果
  109. """
  110. cursor = self._execute(query, args)
  111. return cursor.fetchall()
  112. def insert_one(self, query, args):
  113. """
  114. 执行单条插入语句
  115. :param query: 插入语句
  116. :param args: 插入参数
  117. """
  118. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  119. return self._execute(query, args, commit=True)
  120. def insert_all(self, query, args_list):
  121. """
  122. 执行批量插入语句,如果失败则逐条插入
  123. :param query: 插入语句
  124. :param args_list: 插入参数列表
  125. """
  126. conn = None
  127. cursor = None
  128. try:
  129. conn = self._get_connection()
  130. cursor = conn.cursor()
  131. cursor.executemany(query, args_list)
  132. conn.commit()
  133. self.log.debug(f"sql insert_all , SQL: {query}, Rows: {cursor.rowcount}")
  134. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  135. except Exception as e:
  136. conn.rollback()
  137. self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}")
  138. # 如果批量插入失败,则逐条插入
  139. rowcount = 0
  140. for args in args_list:
  141. self.insert_one(query, args)
  142. rowcount += 1
  143. self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.")
  144. finally:
  145. cursor.close()
  146. self._close_connection(conn)
  147. def update_one(self, query, args):
  148. """
  149. 执行单条更新语句
  150. :param query: 更新语句
  151. :param args: 更新参数
  152. """
  153. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  154. return self._execute(query, args, commit=True)
  155. def update_all(self, query, args_list):
  156. """
  157. 执行批量更新语句,如果失败则逐条更新
  158. :param query: 更新语句
  159. :param args_list: 更新参数列表
  160. """
  161. conn = None
  162. cursor = None
  163. try:
  164. conn = self._get_connection()
  165. cursor = conn.cursor()
  166. cursor.executemany(query, args_list)
  167. conn.commit()
  168. self.log.debug(f"sql update_all , SQL: {query}, Rows: {cursor.rowcount}")
  169. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  170. except Exception as e:
  171. conn.rollback()
  172. self.log.error(f"Error executing query: {e}")
  173. # 如果批量更新失败,则逐条更新
  174. rowcount = 0
  175. for args in args_list:
  176. self.update_one(query, args)
  177. rowcount += 1
  178. self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
  179. finally:
  180. cursor.close()
  181. self._close_connection(conn)