mysql_pool.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/3/25 14:14
  5. import re
  6. import pymysql
  7. import YamlLoader
  8. from loguru import logger
  9. from dbutils.pooled_db import PooledDB
  10. # 获取yaml配置
  11. yaml = YamlLoader.readYaml()
  12. mysqlYaml = yaml.get("mysql")
  13. sql_host = mysqlYaml.getValueAsString("host")
  14. sql_port = mysqlYaml.getValueAsInt("port")
  15. sql_user = mysqlYaml.getValueAsString("username")
  16. sql_password = mysqlYaml.getValueAsString("password")
  17. sql_db = mysqlYaml.getValueAsString("db")
  18. class MySQLConnectionPool:
  19. """
  20. MySQL连接池
  21. """
  22. def __init__(self, mincached=4, maxcached=5, maxconnections=10, log=None):
  23. """
  24. 初始化连接池
  25. :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
  26. :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
  27. :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
  28. :param log: 自定义日志记录器
  29. """
  30. # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
  31. self.log = log or logger
  32. self.pool = PooledDB(
  33. creator=pymysql,
  34. mincached=mincached,
  35. maxcached=maxcached,
  36. maxconnections=maxconnections,
  37. blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
  38. host=sql_host,
  39. port=sql_port,
  40. user=sql_user,
  41. password=sql_password,
  42. database=sql_db,
  43. ping=0 # 每次连接使用时自动检查有效性(0=不检查,1=执行query前检查,2=每次执行前检查)
  44. )
  45. def _execute(self, query, args=None, commit=False):
  46. """
  47. 执行SQL
  48. :param query: SQL语句
  49. :param args: SQL参数
  50. :param commit: 是否提交事务
  51. :return: 查询结果
  52. """
  53. try:
  54. with self.pool.connection() as conn:
  55. with conn.cursor() as cursor:
  56. cursor.execute(query, args)
  57. if commit:
  58. conn.commit()
  59. self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
  60. return cursor
  61. except Exception as e:
  62. if commit:
  63. conn.rollback()
  64. self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
  65. raise e
  66. def select_one(self, query, args=None):
  67. """
  68. 执行查询,返回单个结果
  69. :param query: 查询语句
  70. :param args: 查询参数
  71. :return: 查询结果
  72. """
  73. cursor = self._execute(query, args)
  74. return cursor.fetchone()
  75. def select_all(self, query, args=None):
  76. """
  77. 执行查询,返回所有结果
  78. :param query: 查询语句
  79. :param args: 查询参数
  80. :return: 查询结果
  81. """
  82. cursor = self._execute(query, args)
  83. return cursor.fetchall()
  84. def insert_one(self, query, args):
  85. """
  86. 执行单条插入语句
  87. :param query: 插入语句
  88. :param args: 插入参数
  89. """
  90. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  91. cursor = self._execute(query, args, commit=True)
  92. return cursor.lastrowid # 返回插入的ID
  93. def insert_all(self, query, args_list):
  94. """
  95. 执行批量插入语句,如果失败则逐条插入
  96. :param query: 插入语句
  97. :param args_list: 插入参数列表
  98. """
  99. conn = None
  100. cursor = None
  101. try:
  102. conn = self.pool.connection()
  103. cursor = conn.cursor()
  104. cursor.executemany(query, args_list)
  105. conn.commit()
  106. self.log.debug(f"sql insert_all, SQL: {query}, Rows: {len(args_list)}")
  107. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  108. except Exception as e:
  109. conn.rollback()
  110. self.log.error(f"Batch insertion failed after 5 attempts. Trying single inserts. Error: {e}")
  111. # 如果批量插入失败,则逐条插入
  112. rowcount = 0
  113. for args in args_list:
  114. self.insert_one(query, args)
  115. rowcount += 1
  116. self.log.debug(f"Batch insertion failed. Inserted {rowcount} rows individually.")
  117. finally:
  118. if cursor:
  119. cursor.close()
  120. if conn:
  121. conn.close()
  122. def insert_one_or_dict(self, table=None, data=None, query=None, args=None, commit=True, ignore=False):
  123. """
  124. 单条插入(支持字典或原始SQL)
  125. :param table: 表名(字典插入时必需)
  126. :param data: 字典数据 {列名: 值}
  127. :param query: 直接SQL语句(与data二选一)
  128. :param args: SQL参数(query使用时必需)
  129. :param commit: 是否自动提交
  130. :param ignore: 是否使用ignore
  131. :return: 最后插入ID
  132. """
  133. if data is not None:
  134. if not isinstance(data, dict):
  135. raise ValueError("Data must be a dictionary")
  136. keys = ', '.join([self._safe_identifier(k) for k in data.keys()])
  137. values = ', '.join(['%s'] * len(data))
  138. # query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
  139. # 构建 INSERT IGNORE 语句
  140. ignore_clause = "IGNORE" if ignore else ""
  141. # insert_sql = f"INSERT {ignore_clause} INTO {table} ({columns}) VALUES ({placeholders})"
  142. query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
  143. args = tuple(data.values())
  144. elif query is None:
  145. raise ValueError("Either data or query must be provided")
  146. # cursor = self._execute(query, args, commit)
  147. # self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
  148. # self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  149. # return cursor.lastrowid
  150. try:
  151. cursor = self._execute(query, args, commit)
  152. self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
  153. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  154. return cursor.lastrowid
  155. except pymysql.err.IntegrityError as e:
  156. if "Duplicate entry" in str(e):
  157. self.log.warning(f"插入失败:重复条目,已跳过。错误详情: {e}")
  158. # print("插入失败:重复条目", e)
  159. return -1 # 返回 -1 表示重复条目被跳过
  160. else:
  161. self.log.exception(f"数据库完整性错误: {e}")
  162. # print("插入失败:完整性错误", e)
  163. raise
  164. except Exception as e:
  165. # self.log.error(f"未知错误: {str(e)}", exc_info=True)
  166. self.log.exception(f"未知错误: {e}") # 记录完整异常信息
  167. # print("插入失败:未知错误", e)
  168. raise
  169. def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
  170. ignore=False):
  171. """
  172. 批量插入(支持字典列表或原始SQL)
  173. :param table: 表名(字典插入时必需)
  174. :param data_list: 字典列表 [{列名: 值}]
  175. :param query: 直接SQL语句(与data_list二选一)
  176. :param args_list: SQL参数列表(query使用时必需)
  177. :param batch_size: 分批大小
  178. :param commit: 是否自动提交
  179. :param ignore: 是否使用ignore
  180. :return: 影响行数
  181. """
  182. if data_list is not None:
  183. if not data_list or not isinstance(data_list[0], dict):
  184. raise ValueError("Data_list must be a non-empty list of dictionaries")
  185. keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
  186. values = ', '.join(['%s'] * len(data_list[0]))
  187. # 构建 INSERT IGNORE 语句
  188. ignore_clause = "IGNORE" if ignore else ""
  189. # insert_sql = f"INSERT {ignore_clause} INTO {table} ({columns}) VALUES ({placeholders})"
  190. query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
  191. args_list = [tuple(d.values()) for d in data_list]
  192. elif query is None:
  193. raise ValueError("Either data_list or query must be provided")
  194. total = 0
  195. for i in range(0, len(args_list), batch_size):
  196. batch = args_list[i:i + batch_size]
  197. try:
  198. with self.pool.connection() as conn:
  199. with conn.cursor() as cursor:
  200. cursor.executemany(query, batch)
  201. if commit:
  202. conn.commit()
  203. total += cursor.rowcount
  204. except pymysql.Error as e:
  205. if "Duplicate entry" in str(e):
  206. # self.log.warning(f"检测到重复条目,开始逐条插入。错误详情: {e}")
  207. raise e
  208. # rowcount = 0
  209. # for args in batch:
  210. # try:
  211. # self.insert_one_or_dict(table=table, data=dict(zip(data_list[0].keys(), args)),
  212. # commit=commit)
  213. # rowcount += 1
  214. # except pymysql.err.IntegrityError as e2:
  215. # if "Duplicate entry" in str(e2):
  216. # self.log.warning(f"跳过重复条目: {args}")
  217. # else:
  218. # self.log.error(f"插入失败: {e2}, 参数: {args}")
  219. # total += rowcount
  220. else:
  221. self.log.exception(f"数据库错误: {e}")
  222. if commit:
  223. conn.rollback()
  224. raise e
  225. # 重新抛出异常,供外部捕获
  226. # 降级为单条插入
  227. # for args in batch:
  228. # try:
  229. # self.insert_one_or_dict(table=None, query=query, args=args, commit=commit)
  230. # total += 1
  231. # except Exception as e2:
  232. # self.log.error(f"Single insert failed: {e2}")
  233. # continue
  234. if table:
  235. self.log.info(f"sql insert_many, Table: {table}, Total Rows: {total}")
  236. else:
  237. self.log.info(f"sql insert_many, Query: {query}, Total Rows: {total}")
  238. return total
  239. def insert_many_two(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True):
  240. """
  241. 批量插入(支持字典列表或原始SQL)
  242. :param table: 表名(字典插入时必需)
  243. :param data_list: 字典列表 [{列名: 值}]
  244. :param query: 直接SQL语句(与data_list二选一)
  245. :param args_list: SQL参数列表(query使用时必需)
  246. :param batch_size: 分批大小
  247. :param commit: 是否自动提交
  248. :return: 影响行数
  249. """
  250. if data_list is not None:
  251. if not data_list or not isinstance(data_list[0], dict):
  252. raise ValueError("Data_list must be a non-empty list of dictionaries")
  253. keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
  254. values = ', '.join(['%s'] * len(data_list[0]))
  255. query = f"INSERT INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
  256. args_list = [tuple(d.values()) for d in data_list]
  257. elif query is None:
  258. raise ValueError("Either data_list or query must be provided")
  259. total = 0
  260. for i in range(0, len(args_list), batch_size):
  261. batch = args_list[i:i + batch_size]
  262. try:
  263. with self.pool.connection() as conn:
  264. with conn.cursor() as cursor:
  265. # 添加调试日志:输出 SQL 和参数示例
  266. # self.log.debug(f"Batch insert SQL: {query}")
  267. # self.log.debug(f"Sample args: {batch[0] if batch else 'None'}")
  268. cursor.executemany(query, batch)
  269. if commit:
  270. conn.commit()
  271. total += cursor.rowcount
  272. # self.log.debug(f"Batch insert succeeded. Rows: {cursor.rowcount}")
  273. except Exception as e: # 明确捕获数据库异常
  274. self.log.exception(f"Batch insert failed: {e}") # 使用 exception 记录堆栈
  275. self.log.error(f"Failed SQL: {query}, Args count: {len(batch)}")
  276. if commit:
  277. conn.rollback()
  278. # 降级为单条插入,并记录每个错误
  279. rowcount = 0
  280. for args in batch:
  281. try:
  282. self.insert_one(query, args)
  283. rowcount += 1
  284. except Exception as e2:
  285. self.log.error(f"Single insert failed: {e2}, Args: {args}")
  286. total += rowcount
  287. self.log.debug(f"Inserted {rowcount}/{len(batch)} rows individually.")
  288. self.log.info(f"sql insert_many, Table: {table}, Total Rows: {total}")
  289. return total
  290. def insert_too_many(self, query, args_list, batch_size=1000):
  291. """
  292. 执行批量插入语句,分片提交, 单次插入大于十万+时可用, 如果失败则降级为逐条插入
  293. :param query: 插入语句
  294. :param args_list: 插入参数列表
  295. :param batch_size: 每次插入的条数
  296. """
  297. self.log.info(f"sql insert_too_many, Query: {query}, Total Rows: {len(args_list)}")
  298. for i in range(0, len(args_list), batch_size):
  299. batch = args_list[i:i + batch_size]
  300. try:
  301. with self.pool.connection() as conn:
  302. with conn.cursor() as cursor:
  303. cursor.executemany(query, batch)
  304. conn.commit()
  305. self.log.debug(f"insert_too_many -> Total Rows: {len(batch)}")
  306. except Exception as e:
  307. self.log.error(f"insert_too_many error. Trying single insert. Error: {e}")
  308. # 当前批次降级为单条插入
  309. for args in batch:
  310. self.insert_one(query, args)
  311. def update_one(self, query, args):
  312. """
  313. 执行单条更新语句
  314. :param query: 更新语句
  315. :param args: 更新参数
  316. """
  317. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  318. return self._execute(query, args, commit=True)
  319. def update_all(self, query, args_list):
  320. """
  321. 执行批量更新语句,如果失败则逐条更新
  322. :param query: 更新语句
  323. :param args_list: 更新参数列表
  324. """
  325. conn = None
  326. cursor = None
  327. try:
  328. conn = self.pool.connection()
  329. cursor = conn.cursor()
  330. cursor.executemany(query, args_list)
  331. conn.commit()
  332. self.log.debug(f"sql update_all, SQL: {query}, Rows: {len(args_list)}")
  333. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  334. except Exception as e:
  335. conn.rollback()
  336. self.log.error(f"Error executing query: {e}")
  337. # 如果批量更新失败,则逐条更新
  338. rowcount = 0
  339. for args in args_list:
  340. self.update_one(query, args)
  341. rowcount += 1
  342. self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
  343. finally:
  344. if cursor:
  345. cursor.close()
  346. if conn:
  347. conn.close()
  348. def update_one_or_dict(self, table=None, data=None, condition=None, query=None, args=None, commit=True):
  349. """
  350. 单条更新(支持字典或原始SQL)
  351. :param table: 表名(字典模式必需)
  352. :param data: 字典数据 {列名: 值}(与 query 二选一)
  353. :param condition: 更新条件,支持以下格式:
  354. - 字典: {"id": 1} → "WHERE id = %s"
  355. - 字符串: "id = 1" → "WHERE id = 1"(需自行确保安全)
  356. - 元组: ("id = %s", [1]) → "WHERE id = %s"(参数化查询)
  357. :param query: 直接SQL语句(与 data 二选一)
  358. :param args: SQL参数(query 模式下必需)
  359. :param commit: 是否自动提交
  360. :return: 影响行数
  361. :raises: ValueError 参数校验失败时抛出
  362. """
  363. # 参数校验
  364. if data is not None:
  365. if not isinstance(data, dict):
  366. raise ValueError("Data must be a dictionary")
  367. if table is None:
  368. raise ValueError("Table name is required for dictionary update")
  369. if condition is None:
  370. raise ValueError("Condition is required for dictionary update")
  371. # 构建 SET 子句
  372. set_clause = ", ".join([f"{self._safe_identifier(k)} = %s" for k in data.keys()])
  373. set_values = list(data.values())
  374. # 解析条件
  375. condition_clause, condition_args = self._parse_condition(condition)
  376. query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
  377. args = set_values + condition_args
  378. elif query is None:
  379. raise ValueError("Either data or query must be provided")
  380. # 执行更新
  381. cursor = self._execute(query, args, commit)
  382. # self.log.debug(
  383. # f"Updated table={table}, rows={cursor.rowcount}, query={query[:100]}...",
  384. # extra={"table": table, "rows": cursor.rowcount}
  385. # )
  386. return cursor.rowcount
  387. def _parse_condition(self, condition):
  388. """
  389. 解析条件为 (clause, args) 格式
  390. :param condition: 字典/字符串/元组
  391. :return: (str, list) SQL 子句和参数列表
  392. """
  393. if isinstance(condition, dict):
  394. clause = " AND ".join([f"{self._safe_identifier(k)} = %s" for k in condition.keys()])
  395. args = list(condition.values())
  396. elif isinstance(condition, str):
  397. clause = condition # 注意:需调用方确保安全
  398. args = []
  399. elif isinstance(condition, (tuple, list)) and len(condition) == 2:
  400. clause, args = condition[0], condition[1]
  401. if not isinstance(args, (list, tuple)):
  402. args = [args]
  403. else:
  404. raise ValueError("Condition must be dict/str/(clause, args)")
  405. return clause, args
  406. def update_many(self, table=None, data_list=None, condition_list=None, query=None, args_list=None, batch_size=500,
  407. commit=True):
  408. """
  409. 批量更新(支持字典列表或原始SQL)
  410. :param table: 表名(字典插入时必需)
  411. :param data_list: 字典列表 [{列名: 值}]
  412. :param condition_list: 条件列表(必须为字典,与data_list等长)
  413. :param query: 直接SQL语句(与data_list二选一)
  414. :param args_list: SQL参数列表(query使用时必需)
  415. :param batch_size: 分批大小
  416. :param commit: 是否自动提交
  417. :return: 影响行数
  418. """
  419. if data_list is not None:
  420. if not data_list or not isinstance(data_list[0], dict):
  421. raise ValueError("Data_list must be a non-empty list of dictionaries")
  422. if condition_list is None or len(data_list) != len(condition_list):
  423. raise ValueError("Condition_list must be provided and match the length of data_list")
  424. if not all(isinstance(cond, dict) for cond in condition_list):
  425. raise ValueError("All elements in condition_list must be dictionaries")
  426. # 获取第一个数据项和条件项的键
  427. first_data_keys = set(data_list[0].keys())
  428. first_cond_keys = set(condition_list[0].keys())
  429. # 构造基础SQL
  430. set_clause = ', '.join([self._safe_identifier(k) + ' = %s' for k in data_list[0].keys()])
  431. condition_clause = ' AND '.join([self._safe_identifier(k) + ' = %s' for k in condition_list[0].keys()])
  432. base_query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
  433. total = 0
  434. # 分批次处理
  435. for i in range(0, len(data_list), batch_size):
  436. batch_data = data_list[i:i + batch_size]
  437. batch_conds = condition_list[i:i + batch_size]
  438. batch_args = []
  439. # 检查当前批次的结构是否一致
  440. can_batch = True
  441. for data, cond in zip(batch_data, batch_conds):
  442. data_keys = set(data.keys())
  443. cond_keys = set(cond.keys())
  444. if data_keys != first_data_keys or cond_keys != first_cond_keys:
  445. can_batch = False
  446. break
  447. batch_args.append(tuple(data.values()) + tuple(cond.values()))
  448. if not can_batch:
  449. # 结构不一致,转为单条更新
  450. for data, cond in zip(batch_data, batch_conds):
  451. self.update_one_or_dict(table=table, data=data, condition=cond, commit=commit)
  452. total += 1
  453. continue
  454. # 执行批量更新
  455. try:
  456. with self.pool.connection() as conn:
  457. with conn.cursor() as cursor:
  458. cursor.executemany(base_query, batch_args)
  459. if commit:
  460. conn.commit()
  461. total += cursor.rowcount
  462. self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
  463. except Exception as e:
  464. if commit:
  465. conn.rollback()
  466. self.log.error(f"Batch update failed: {e}")
  467. # 降级为单条更新
  468. for args, data, cond in zip(batch_args, batch_data, batch_conds):
  469. try:
  470. self._execute(base_query, args, commit=commit)
  471. total += 1
  472. except Exception as e2:
  473. self.log.error(f"Single update failed: {e2}, Data: {data}, Condition: {cond}")
  474. self.log.info(f"Total updated rows: {total}")
  475. return total
  476. elif query is not None:
  477. # 处理原始SQL和参数列表
  478. if args_list is None:
  479. raise ValueError("args_list must be provided when using query")
  480. total = 0
  481. for i in range(0, len(args_list), batch_size):
  482. batch_args = args_list[i:i + batch_size]
  483. try:
  484. with self.pool.connection() as conn:
  485. with conn.cursor() as cursor:
  486. cursor.executemany(query, batch_args)
  487. if commit:
  488. conn.commit()
  489. total += cursor.rowcount
  490. self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
  491. except Exception as e:
  492. if commit:
  493. conn.rollback()
  494. self.log.error(f"Batch update failed: {e}")
  495. # 降级为单条更新
  496. for args in batch_args:
  497. try:
  498. self._execute(query, args, commit=commit)
  499. total += 1
  500. except Exception as e2:
  501. self.log.error(f"Single update failed: {e2}, Args: {args}")
  502. self.log.info(f"Total updated rows: {total}")
  503. return total
  504. else:
  505. raise ValueError("Either data_list or query must be provided")
  506. def check_pool_health(self):
  507. """
  508. 检查连接池中有效连接数
  509. # 使用示例
  510. # 配置 MySQL 连接池
  511. sql_pool = MySQLConnectionPool(log=log)
  512. if not sql_pool.check_pool_health():
  513. log.error("数据库连接池异常")
  514. raise RuntimeError("数据库连接池异常")
  515. """
  516. try:
  517. with self.pool.connection() as conn:
  518. conn.ping(reconnect=True)
  519. return True
  520. except Exception as e:
  521. self.log.error(f"Connection pool health check failed: {e}")
  522. return False
  523. def close(self):
  524. """
  525. 关闭连接池,释放所有连接
  526. """
  527. try:
  528. if hasattr(self, 'pool') and self.pool:
  529. self.pool.close()
  530. self.log.info("数据库连接池已关闭")
  531. except Exception as e:
  532. self.log.error(f"关闭连接池失败: {e}")
  533. @staticmethod
  534. def _safe_identifier(name):
  535. """SQL标识符安全校验"""
  536. if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
  537. raise ValueError(f"Invalid SQL identifier: {name}")
  538. return name
  539. if __name__ == '__main__':
  540. sql_pool = MySQLConnectionPool()
  541. data_dic = {'card_type_id': 111, 'card_type_name': '补充包 继承的意志【OPC-13】', 'card_type_position': 964,
  542. 'card_id': 5284, 'card_name': '蒙奇·D·路飞', 'card_number': 'OP13-001', 'card_rarity': 'L',
  543. 'card_img': 'https://source.windoent.com/OnePiecePc/Picture/1757929283612OP13-001.png',
  544. 'card_life': '4', 'card_attribute': '打', 'card_power': '5000', 'card_attack': '-',
  545. 'card_color': '红/绿', 'subscript': 4, 'card_features': '超新星/草帽一伙',
  546. 'card_text_desc': '【咚!!×1】【对方的攻击时】我方处于活跃状态的咚!!不多于5张的场合,可以将我方任意张数的咚!!转为休息状态。每有1张转为休息状态的咚!!,本次战斗中,此领袖或我方最多1张拥有《草帽一伙》特征的角色力量+2000。',
  547. 'card_offer_type': '补充包 继承的意志【OPC-13】', 'crawler_language': '简中'}
  548. sql_pool.insert_one_or_dict(table="one_piece_record", data=data_dic)