mysql_pool.py 11 KB


  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/3/25 14:14
  5. import re
  6. import pymysql
  7. import YamlLoader
  8. from loguru import logger
  9. from dbutils.pooled_db import PooledDB
  10. # 获取yaml配置
  11. yaml = YamlLoader.readYaml()
  12. mysqlYaml = yaml.get("mysql")
  13. sql_host = mysqlYaml.getValueAsString("host")
  14. sql_port = mysqlYaml.getValueAsInt("port")
  15. sql_user = mysqlYaml.getValueAsString("username")
  16. sql_password = mysqlYaml.getValueAsString("password")
  17. sql_db = mysqlYaml.getValueAsString("db")
  18. class MySQLConnectionPool:
  19. """
  20. MySQL连接池
  21. """
  22. def __init__(self, mincached=4, maxcached=5, maxconnections=10, log=None):
  23. """
  24. 初始化连接池
  25. :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
  26. :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
  27. :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
  28. :param log: 自定义日志记录器
  29. """
  30. # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
  31. self.log = log or logger
  32. self.pool = PooledDB(
  33. creator=pymysql,
  34. mincached=mincached,
  35. maxcached=maxcached,
  36. maxconnections=maxconnections,
  37. blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
  38. host=sql_host,
  39. port=sql_port,
  40. user=sql_user,
  41. password=sql_password,
  42. database=sql_db,
  43. ping=0 # 每次连接使用时自动检查有效性(0=不检查,1=执行query前检查,2=每次执行前检查)
  44. )
  45. def _execute(self, query, args=None, commit=False):
  46. """
  47. 执行SQL
  48. :param query: SQL语句
  49. :param args: SQL参数
  50. :param commit: 是否提交事务
  51. :return: 查询结果
  52. """
  53. try:
  54. with self.pool.connection() as conn:
  55. with conn.cursor() as cursor:
  56. cursor.execute(query, args)
  57. if commit:
  58. conn.commit()
  59. self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
  60. return cursor
  61. except Exception as e:
  62. if commit:
  63. conn.rollback()
  64. self.log.error(f"Error executing query: {e}, Query: {query}, Args: {args}")
  65. raise e
  66. def select_one(self, query, args=None):
  67. """
  68. 执行查询,返回单个结果
  69. :param query: 查询语句
  70. :param args: 查询参数
  71. :return: 查询结果
  72. """
  73. cursor = self._execute(query, args)
  74. return cursor.fetchone()
  75. def select_all(self, query, args=None):
  76. """
  77. 执行查询,返回所有结果
  78. :param query: 查询语句
  79. :param args: 查询参数
  80. :return: 查询结果
  81. """
  82. cursor = self._execute(query, args)
  83. return cursor.fetchall()
  84. def insert_one(self, query, args):
  85. """
  86. 执行单条插入语句
  87. :param query: 插入语句
  88. :param args: 插入参数
  89. """
  90. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  91. cursor = self._execute(query, args, commit=True)
  92. return cursor.lastrowid # 返回插入的ID
  93. def insert_all(self, query, args_list):
  94. """
  95. 执行批量插入语句,如果失败则逐条插入
  96. :param query: 插入语句
  97. :param args_list: 插入参数列表
  98. """
  99. conn = None
  100. cursor = None
  101. try:
  102. conn = self.pool.connection()
  103. cursor = conn.cursor()
  104. cursor.executemany(query, args_list)
  105. conn.commit()
  106. self.log.debug(f"sql insert_all, SQL: {query}, Rows: {len(args_list)}")
  107. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  108. except Exception as e:
  109. conn.rollback()
  110. self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}")
  111. # 如果批量插入失败,则逐条插入
  112. rowcount = 0
  113. for args in args_list:
  114. self.insert_one(query, args)
  115. rowcount += 1
  116. self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.")
  117. finally:
  118. if cursor:
  119. cursor.close()
  120. if conn:
  121. conn.close()
  122. def insert_one_or_dict(self, table=None, data=None, query=None, args=None, commit=True):
  123. """
  124. 单条插入(支持字典或原始SQL)
  125. :param table: 表名(字典插入时必需)
  126. :param data: 字典数据 {列名: 值}
  127. :param query: 直接SQL语句(与data二选一)
  128. :param args: SQL参数(query使用时必需)
  129. :param commit: 是否自动提交
  130. :return: 最后插入ID
  131. """
  132. if data is not None:
  133. if not isinstance(data, dict):
  134. raise ValueError("Data must be a dictionary")
  135. keys = ', '.join([self._safe_identifier(k) for k in data.keys()])
  136. values = ', '.join(['%s'] * len(data))
  137. query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
  138. args = tuple(data.values())
  139. elif query is None:
  140. raise ValueError("Either data or query must be provided")
  141. cursor = self._execute(query, args, commit)
  142. self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
  143. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  144. return cursor.lastrowid
  145. def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=500, commit=True):
  146. """
  147. 批量插入(支持字典列表或原始SQL)
  148. :param table: 表名(字典插入时必需)
  149. :param data_list: 字典列表 [{列名: 值}]
  150. :param query: 直接SQL语句(与data_list二选一)
  151. :param args_list: SQL参数列表(query使用时必需)
  152. :param batch_size: 分批大小
  153. :param commit: 是否自动提交
  154. :return: 影响行数
  155. """
  156. if data_list is not None:
  157. if not data_list or not isinstance(data_list[0], dict):
  158. raise ValueError("Data_list must be a non-empty list of dictionaries")
  159. keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
  160. values = ', '.join(['%s'] * len(data_list[0]))
  161. query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
  162. args_list = [tuple(d.values()) for d in data_list]
  163. elif query is None:
  164. raise ValueError("Either data_list or query must be provided")
  165. total = 0
  166. for i in range(0, len(args_list), batch_size):
  167. batch = args_list[i:i + batch_size]
  168. try:
  169. with self.pool.connection() as conn:
  170. with conn.cursor() as cursor:
  171. cursor.executemany(query, batch)
  172. if commit:
  173. conn.commit()
  174. total += cursor.rowcount
  175. # self.log.debug(f"sql insert_many_or_dict, SQL: {query}, Rows: {cursor.rowcount}")
  176. except Exception as e:
  177. if commit:
  178. conn.rollback()
  179. self.log.error(f"Batch insert failed: {e}")
  180. # 降级为单条插入
  181. for args in batch:
  182. try:
  183. self.insert_one_or_dict(table=None, query=query, args=args, commit=commit)
  184. total += 1
  185. except Exception as e2:
  186. self.log.error(f"Single insert failed: {e2}")
  187. continue
  188. self.log.info(f"sql insert_many_or_dict, Table: {table}, Total Rows: {total}")
  189. return total
  190. def insert_too_many(self, query, args_list, batch_size=1000):
  191. """
  192. 执行批量插入语句,分片提交, 单次插入大于十万+时可用, 如果失败则降级为逐条插入
  193. :param query: 插入语句
  194. :param args_list: 插入参数列表
  195. :param batch_size: 每次插入的条数
  196. """
  197. for i in range(0, len(args_list), batch_size):
  198. batch = args_list[i:i + batch_size]
  199. try:
  200. with self.pool.connection() as conn:
  201. with conn.cursor() as cursor:
  202. cursor.executemany(query, batch)
  203. conn.commit()
  204. except Exception as e:
  205. self.log.error(f"insert_too_many error. Trying single insert. Error: {e}")
  206. # 当前批次降级为单条插入
  207. for args in batch:
  208. self.insert_one(query, args)
  209. def update_one(self, query, args):
  210. """
  211. 执行单条更新语句
  212. :param query: 更新语句
  213. :param args: 更新参数
  214. """
  215. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  216. return self._execute(query, args, commit=True)
  217. def update_all(self, query, args_list):
  218. """
  219. 执行批量更新语句,如果失败则逐条更新
  220. :param query: 更新语句
  221. :param args_list: 更新参数列表
  222. """
  223. conn = None
  224. cursor = None
  225. try:
  226. conn = self.pool.connection()
  227. cursor = conn.cursor()
  228. cursor.executemany(query, args_list)
  229. conn.commit()
  230. self.log.debug(f"sql update_all, SQL: {query}, Rows: {len(args_list)}")
  231. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  232. except Exception as e:
  233. conn.rollback()
  234. self.log.error(f"Error executing query: {e}")
  235. # 如果批量更新失败,则逐条更新
  236. rowcount = 0
  237. for args in args_list:
  238. self.update_one(query, args)
  239. rowcount += 1
  240. self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
  241. finally:
  242. if cursor:
  243. cursor.close()
  244. if conn:
  245. conn.close()
  246. def check_pool_health(self):
  247. """
  248. 检查连接池中有效连接数
  249. # 使用示例
  250. # 配置 MySQL 连接池
  251. sql_pool = MySQLConnectionPool(log=log)
  252. if not sql_pool.check_pool_health():
  253. log.error("数据库连接池异常")
  254. raise RuntimeError("数据库连接池异常")
  255. """
  256. try:
  257. with self.pool.connection() as conn:
  258. conn.ping(reconnect=True)
  259. return True
  260. except Exception as e:
  261. self.log.error(f"Connection pool health check failed: {e}")
  262. return False
  263. @staticmethod
  264. def _safe_identifier(name):
  265. """SQL标识符安全校验"""
  266. if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
  267. raise ValueError(f"Invalid SQL identifier: {name}")
  268. return name