mysql_pool.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/3/25 14:14
  5. import re
  6. import pymysql
  7. import YamlLoader
  8. from loguru import logger
  9. from dbutils.pooled_db import PooledDB
  10. # 获取yaml配置
  11. yaml = YamlLoader.readYaml()
  12. mysqlYaml = yaml.get("mysql")
  13. sql_host = mysqlYaml.getValueAsString("host")
  14. sql_port = mysqlYaml.getValueAsInt("port")
  15. sql_user = mysqlYaml.getValueAsString("username")
  16. sql_password = mysqlYaml.getValueAsString("password")
  17. sql_db = mysqlYaml.getValueAsString("db")
  18. class MySQLConnectionPool:
  19. """
  20. MySQL连接池
  21. """
  22. def __init__(self, mincached=1, maxcached=2, maxconnections=3, log=None):
  23. """
  24. 初始化连接池
  25. :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
  26. :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
  27. :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
  28. :param log: 自定义日志记录器
  29. """
  30. # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
  31. self.log = log or logger
  32. self.pool = PooledDB(
  33. creator=pymysql,
  34. mincached=mincached,
  35. maxcached=maxcached,
  36. maxconnections=maxconnections,
  37. blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
  38. host=sql_host,
  39. port=sql_port,
  40. user=sql_user,
  41. password=sql_password,
  42. database=sql_db,
  43. ping=2, # 每次执行前检查连接有效性,防止使用已断开的连接
  44. connect_timeout=5, # 连接超时时间(秒)
  45. # read_timeout=30, # 读取超时时间(秒)
  46. write_timeout=30 # 写入超时时间(秒)
  47. )
  48. # def _execute(self, query, args=None, commit=False):
  49. # """
  50. # 执行SQL
  51. # :param query: SQL语句
  52. # :param args: SQL参数
  53. # :param commit: 是否提交事务
  54. # :return: 查询结果
  55. # """
  56. # try:
  57. # with self.pool.connection() as conn:
  58. # with conn.cursor() as cursor:
  59. # cursor.execute(query, args)
  60. # if commit:
  61. # conn.commit()
  62. # self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
  63. # return cursor
  64. # except Exception as e:
  65. # if commit and conn:
  66. # conn.rollback()
  67. # self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
  68. # raise e
  69. def _execute(self, query, args=None, commit=False):
  70. """
  71. 执行SQL(带断连重试)
  72. :param query: SQL语句
  73. :param args: SQL参数
  74. :param commit: 是否提交事务
  75. :return: 查询结果
  76. """
  77. conn = None
  78. for attempt in range(2): # 最多重试1次
  79. try:
  80. with self.pool.connection() as conn:
  81. with conn.cursor() as cursor:
  82. cursor.execute(query, args)
  83. if commit:
  84. conn.commit()
  85. self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
  86. return cursor
  87. except pymysql.err.InterfaceError as e:
  88. # 连接已断开,重试一次
  89. if attempt == 0:
  90. self.log.warning(f"数据库连接断开,正在重试... Error: {e}")
  91. continue
  92. self.log.error(f"重试后仍失败: {e}, Query: {query}")
  93. raise e
  94. except pymysql.err.IntegrityError:
  95. # 完整性错误(如重复条目)交由上层处理,避免在此打印完整堆栈污染日志
  96. if commit and conn:
  97. try:
  98. conn.rollback()
  99. except Exception:
  100. pass
  101. raise
  102. except Exception as e:
  103. if commit and conn:
  104. try:
  105. conn.rollback()
  106. except Exception:
  107. pass
  108. self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
  109. raise e
  110. def select_one(self, query, args=None):
  111. """
  112. 执行查询,返回单个结果
  113. :param query: 查询语句
  114. :param args: 查询参数
  115. :return: 查询结果
  116. """
  117. cursor = self._execute(query, args)
  118. return cursor.fetchone()
  119. def select_all(self, query, args=None):
  120. """
  121. 执行查询,返回所有结果
  122. :param query: 查询语句
  123. :param args: 查询参数
  124. :return: 查询结果
  125. """
  126. cursor = self._execute(query, args)
  127. return cursor.fetchall()
  128. def insert_one(self, query, args):
  129. """
  130. 执行单条插入语句
  131. :param query: 插入语句
  132. :param args: 插入参数
  133. """
  134. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  135. cursor = self._execute(query, args, commit=True)
  136. return cursor.lastrowid # 返回插入的ID
  137. def insert_all(self, query, args_list):
  138. """
  139. 执行批量插入语句,如果失败则逐条插入
  140. :param query: 插入语句
  141. :param args_list: 插入参数列表
  142. """
  143. conn = None
  144. cursor = None
  145. try:
  146. conn = self.pool.connection()
  147. cursor = conn.cursor()
  148. cursor.executemany(query, args_list)
  149. conn.commit()
  150. self.log.debug(f"sql insert_all, SQL: {query[:100]}..., Rows: {cursor.rowcount}")
  151. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  152. except pymysql.err.IntegrityError as e:
  153. if "Duplicate entry" in str(e):
  154. conn.rollback()
  155. self.log.warning(f"批量插入遇到重复,开始逐条插入。错误: {e}")
  156. rowcount = 0
  157. for args in args_list:
  158. try:
  159. self.insert_one(query, args)
  160. rowcount += 1
  161. except pymysql.err.IntegrityError as e2:
  162. if "Duplicate entry" in str(e2):
  163. self.log.debug(f"跳过重复条目: {e2}")
  164. else:
  165. self.log.error(f"插入失败: {e2}")
  166. except Exception as e2:
  167. self.log.error(f"插入失败: {e2}")
  168. self.log.info(f"逐条插入完成: {rowcount}/{len(args_list)}条")
  169. else:
  170. conn.rollback()
  171. self.log.exception(f"数据库完整性错误: {e}")
  172. raise e
  173. except Exception as e:
  174. conn.rollback()
  175. self.log.exception(f"批量插入失败: {e}")
  176. raise e
  177. finally:
  178. if cursor:
  179. cursor.close()
  180. if conn:
  181. conn.close()
  182. def insert_one_or_dict(self, table=None, data=None, query=None, args=None, commit=True, ignore=False):
  183. """
  184. 单条插入(支持字典或原始SQL)
  185. :param table: 表名(字典插入时必需)
  186. :param data: 字典数据 {列名: 值}
  187. :param query: 直接SQL语句(与data二选一)
  188. :param args: SQL参数(query使用时必需)
  189. :param commit: 是否自动提交
  190. :param ignore: 是否使用ignore
  191. :return: 最后插入ID
  192. """
  193. if data is not None:
  194. if not isinstance(data, dict):
  195. raise ValueError("Data must be a dictionary")
  196. keys = ', '.join([self._safe_identifier(k) for k in data.keys()])
  197. values = ', '.join(['%s'] * len(data))
  198. # 构建 INSERT IGNORE 语句
  199. ignore_clause = "IGNORE" if ignore else ""
  200. query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
  201. args = tuple(data.values())
  202. elif query is None:
  203. raise ValueError("Either data or query must be provided")
  204. try:
  205. cursor = self._execute(query, args, commit)
  206. self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
  207. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  208. return cursor.lastrowid
  209. except pymysql.err.IntegrityError as e:
  210. if "Duplicate entry" in str(e):
  211. # 重复条目用 warning 简短输出,不打印堆栈
  212. self.log.warning(f"插入跳过-重复条目 Table: {table}, {e.args[1] if len(e.args) > 1 else e}")
  213. return -1 # 返回 -1 表示重复条目被跳过
  214. else:
  215. self.log.error(f"数据库完整性错误 Table: {table}, Error: {e}")
  216. raise
  217. except Exception as e:
  218. self.log.error(f"insert_one_or_dict 失败 Table: {table}, Error: {e}")
  219. raise
  220. def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
  221. ignore=False):
  222. """
  223. 批量插入(支持字典列表或原始SQL)
  224. :param table: 表名(字典插入时必需)
  225. :param data_list: 字典列表 [{列名: 值}]
  226. :param query: 直接SQL语句(与data_list二选一)
  227. :param args_list: SQL参数列表(query使用时必需)
  228. :param batch_size: 分批大小
  229. :param commit: 是否自动提交
  230. :param ignore: 是否使用ignore
  231. :return: 影响行数
  232. """
  233. if data_list is not None:
  234. if not data_list or not isinstance(data_list[0], dict):
  235. raise ValueError("Data_list must be a non-empty list of dictionaries")
  236. keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
  237. values = ', '.join(['%s'] * len(data_list[0]))
  238. # 构建 INSERT IGNORE 语句
  239. ignore_clause = "IGNORE" if ignore else ""
  240. query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
  241. args_list = [tuple(d.values()) for d in data_list]
  242. elif query is None:
  243. raise ValueError("Either data_list or query must be provided")
  244. total = 0
  245. for i in range(0, len(args_list), batch_size):
  246. batch = args_list[i:i + batch_size]
  247. try:
  248. with self.pool.connection() as conn:
  249. with conn.cursor() as cursor:
  250. cursor.executemany(query, batch)
  251. if commit:
  252. conn.commit()
  253. total += cursor.rowcount
  254. except pymysql.err.IntegrityError as e:
  255. # 处理唯一索引冲突
  256. if "Duplicate entry" in str(e):
  257. if ignore:
  258. # 如果使用了 INSERT IGNORE,理论上不会进这里,但以防万一
  259. self.log.warning(f"批量插入遇到重复条目(ignore模式): {e}")
  260. else:
  261. # 没有使用 IGNORE,降级为逐条插入
  262. self.log.warning(f"批量插入遇到重复条目,开始逐条插入。错误: {e}")
  263. if commit:
  264. conn.rollback()
  265. rowcount = 0
  266. for j, args in enumerate(batch):
  267. try:
  268. if data_list:
  269. # 字典模式
  270. self.insert_one_or_dict(
  271. table=table,
  272. data=dict(zip(data_list[0].keys(), args)),
  273. commit=commit,
  274. ignore=False # 单条插入时手动捕获重复
  275. )
  276. else:
  277. # 原始SQL模式
  278. self.insert_one(query, args)
  279. rowcount += 1
  280. except pymysql.err.IntegrityError as e2:
  281. if "Duplicate entry" in str(e2):
  282. self.log.debug(f"跳过重复条目[{i+j+1}]: {e2}")
  283. else:
  284. self.log.error(f"插入失败[{i+j+1}]: {e2}")
  285. except Exception as e2:
  286. self.log.error(f"插入失败[{i+j+1}]: {e2}")
  287. total += rowcount
  288. self.log.info(f"批次逐条插入完成: 成功{rowcount}/{len(batch)}条")
  289. else:
  290. # 其他完整性错误
  291. self.log.exception(f"数据库完整性错误: {e}")
  292. if commit:
  293. conn.rollback()
  294. raise e
  295. except Exception as e:
  296. # 其他数据库错误
  297. self.log.exception(f"批量插入失败: {e}")
  298. if commit:
  299. conn.rollback()
  300. raise e
  301. if table:
  302. self.log.info(f"sql insert_many, Table: {table}, Total Rows: {total}")
  303. else:
  304. self.log.info(f"sql insert_many, Query: {query}, Total Rows: {total}")
  305. return total
  306. def insert_many_two(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
  307. ignore=False):
  308. """
  309. 批量插入(支持字典列表或原始SQL) - 备用方法
  310. :param table: 表名(字典插入时必需)
  311. :param data_list: 字典列表 [{列名: 值}]
  312. :param query: 直接SQL语句(与data_list二选一)
  313. :param args_list: SQL参数列表(query使用时必需)
  314. :param batch_size: 分批大小
  315. :param commit: 是否自动提交
  316. :param ignore: 是否使用INSERT IGNORE
  317. :return: 影响行数
  318. """
  319. if data_list is not None:
  320. if not data_list or not isinstance(data_list[0], dict):
  321. raise ValueError("Data_list must be a non-empty list of dictionaries")
  322. keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
  323. values = ', '.join(['%s'] * len(data_list[0]))
  324. ignore_clause = "IGNORE" if ignore else ""
  325. query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
  326. args_list = [tuple(d.values()) for d in data_list]
  327. elif query is None:
  328. raise ValueError("Either data_list or query must be provided")
  329. total = 0
  330. for i in range(0, len(args_list), batch_size):
  331. batch = args_list[i:i + batch_size]
  332. try:
  333. with self.pool.connection() as conn:
  334. with conn.cursor() as cursor:
  335. cursor.executemany(query, batch)
  336. if commit:
  337. conn.commit()
  338. total += cursor.rowcount
  339. except pymysql.err.IntegrityError as e:
  340. if "Duplicate entry" in str(e) and not ignore:
  341. self.log.warning(f"批量插入遇到重复,降级为逐条插入: {e}")
  342. if commit:
  343. conn.rollback()
  344. rowcount = 0
  345. for args in batch:
  346. try:
  347. self.insert_one(query, args)
  348. rowcount += 1
  349. except pymysql.err.IntegrityError as e2:
  350. if "Duplicate entry" in str(e2):
  351. self.log.debug(f"跳过重复条目: {e2}")
  352. else:
  353. self.log.error(f"插入失败: {e2}")
  354. except Exception as e2:
  355. self.log.error(f"插入失败: {e2}")
  356. total += rowcount
  357. else:
  358. self.log.exception(f"数据库完整性错误: {e}")
  359. if commit:
  360. conn.rollback()
  361. raise e
  362. except Exception as e:
  363. self.log.exception(f"批量插入失败: {e}")
  364. if commit:
  365. conn.rollback()
  366. raise e
  367. self.log.info(f"sql insert_many_two, Table: {table}, Total Rows: {total}")
  368. return total
  369. def insert_too_many(self, query, args_list, batch_size=1000):
  370. """
  371. 执行批量插入语句,分片提交, 单次插入大于十万+时可用, 如果失败则降级为逐条插入
  372. :param query: 插入语句
  373. :param args_list: 插入参数列表
  374. :param batch_size: 每次插入的条数
  375. """
  376. self.log.info(f"sql insert_too_many, Query: {query}, Total Rows: {len(args_list)}")
  377. for i in range(0, len(args_list), batch_size):
  378. batch = args_list[i:i + batch_size]
  379. try:
  380. with self.pool.connection() as conn:
  381. with conn.cursor() as cursor:
  382. cursor.executemany(query, batch)
  383. conn.commit()
  384. self.log.debug(f"insert_too_many -> Total Rows: {len(batch)}")
  385. except Exception as e:
  386. self.log.error(f"insert_too_many error. Trying single insert. Error: {e}")
  387. # 当前批次降级为单条插入
  388. for args in batch:
  389. self.insert_one(query, args)
  390. def update_one(self, query, args):
  391. """
  392. 执行单条更新语句
  393. :param query: 更新语句
  394. :param args: 更新参数
  395. """
  396. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  397. return self._execute(query, args, commit=True)
  398. def update_all(self, query, args_list):
  399. """
  400. 执行批量更新语句,如果失败则逐条更新
  401. :param query: 更新语句
  402. :param args_list: 更新参数列表
  403. """
  404. conn = None
  405. cursor = None
  406. try:
  407. conn = self.pool.connection()
  408. cursor = conn.cursor()
  409. cursor.executemany(query, args_list)
  410. conn.commit()
  411. self.log.debug(f"sql update_all, SQL: {query}, Rows: {len(args_list)}")
  412. self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
  413. except Exception as e:
  414. conn.rollback()
  415. self.log.error(f"Error executing query: {e}")
  416. # 如果批量更新失败,则逐条更新
  417. rowcount = 0
  418. for args in args_list:
  419. self.update_one(query, args)
  420. rowcount += 1
  421. self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
  422. finally:
  423. if cursor:
  424. cursor.close()
  425. if conn:
  426. conn.close()
  427. def update_one_or_dict(self, table=None, data=None, condition=None, query=None, args=None, commit=True):
  428. """
  429. 单条更新(支持字典或原始SQL)
  430. :param table: 表名(字典模式必需)
  431. :param data: 字典数据 {列名: 值}(与 query 二选一)
  432. :param condition: 更新条件,支持以下格式:
  433. - 字典: {"id": 1} → "WHERE id = %s"
  434. - 字符串: "id = 1" → "WHERE id = 1"(需自行确保安全)
  435. - 元组: ("id = %s", [1]) → "WHERE id = %s"(参数化查询)
  436. :param query: 直接SQL语句(与 data 二选一)
  437. :param args: SQL参数(query 模式下必需)
  438. :param commit: 是否自动提交
  439. :return: 影响行数
  440. :raises: ValueError 参数校验失败时抛出
  441. """
  442. # 参数校验
  443. if data is not None:
  444. if not isinstance(data, dict):
  445. raise ValueError("Data must be a dictionary")
  446. if table is None:
  447. raise ValueError("Table name is required for dictionary update")
  448. if condition is None:
  449. raise ValueError("Condition is required for dictionary update")
  450. # 构建 SET 子句
  451. set_clause = ", ".join([f"{self._safe_identifier(k)} = %s" for k in data.keys()])
  452. set_values = list(data.values())
  453. # 解析条件
  454. condition_clause, condition_args = self._parse_condition(condition)
  455. query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
  456. args = set_values + condition_args
  457. elif query is None:
  458. raise ValueError("Either data or query must be provided")
  459. # 执行更新
  460. cursor = self._execute(query, args, commit)
  461. # self.log.debug(
  462. # f"Updated table={table}, rows={cursor.rowcount}, query={query[:100]}...",
  463. # extra={"table": table, "rows": cursor.rowcount}
  464. # )
  465. return cursor.rowcount
  466. def _parse_condition(self, condition):
  467. """
  468. 解析条件为 (clause, args) 格式
  469. :param condition: 字典/字符串/元组
  470. :return: (str, list) SQL 子句和参数列表
  471. """
  472. if isinstance(condition, dict):
  473. clause = " AND ".join([f"{self._safe_identifier(k)} = %s" for k in condition.keys()])
  474. args = list(condition.values())
  475. elif isinstance(condition, str):
  476. clause = condition # 注意:需调用方确保安全
  477. args = []
  478. elif isinstance(condition, (tuple, list)) and len(condition) == 2:
  479. clause, args = condition[0], condition[1]
  480. if not isinstance(args, (list, tuple)):
  481. args = [args]
  482. else:
  483. raise ValueError("Condition must be dict/str/(clause, args)")
  484. return clause, args
  485. def update_many(self, table=None, data_list=None, condition_list=None, query=None, args_list=None, batch_size=500,
  486. commit=True):
  487. """
  488. 批量更新(支持字典列表或原始SQL)
  489. :param table: 表名(字典插入时必需)
  490. :param data_list: 字典列表 [{列名: 值}]
  491. :param condition_list: 条件列表(必须为字典,与data_list等长)
  492. :param query: 直接SQL语句(与data_list二选一)
  493. :param args_list: SQL参数列表(query使用时必需)
  494. :param batch_size: 分批大小
  495. :param commit: 是否自动提交
  496. :return: 影响行数
  497. """
  498. if data_list is not None:
  499. if not data_list or not isinstance(data_list[0], dict):
  500. raise ValueError("Data_list must be a non-empty list of dictionaries")
  501. if condition_list is None or len(data_list) != len(condition_list):
  502. raise ValueError("Condition_list must be provided and match the length of data_list")
  503. if not all(isinstance(cond, dict) for cond in condition_list):
  504. raise ValueError("All elements in condition_list must be dictionaries")
  505. # 获取第一个数据项和条件项的键
  506. first_data_keys = set(data_list[0].keys())
  507. first_cond_keys = set(condition_list[0].keys())
  508. # 构造基础SQL
  509. set_clause = ', '.join([self._safe_identifier(k) + ' = %s' for k in data_list[0].keys()])
  510. condition_clause = ' AND '.join([self._safe_identifier(k) + ' = %s' for k in condition_list[0].keys()])
  511. base_query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
  512. total = 0
  513. # 分批次处理
  514. for i in range(0, len(data_list), batch_size):
  515. batch_data = data_list[i:i + batch_size]
  516. batch_conds = condition_list[i:i + batch_size]
  517. batch_args = []
  518. # 检查当前批次的结构是否一致
  519. can_batch = True
  520. for data, cond in zip(batch_data, batch_conds):
  521. data_keys = set(data.keys())
  522. cond_keys = set(cond.keys())
  523. if data_keys != first_data_keys or cond_keys != first_cond_keys:
  524. can_batch = False
  525. break
  526. batch_args.append(tuple(data.values()) + tuple(cond.values()))
  527. if not can_batch:
  528. # 结构不一致,转为单条更新
  529. for data, cond in zip(batch_data, batch_conds):
  530. self.update_one_or_dict(table=table, data=data, condition=cond, commit=commit)
  531. total += 1
  532. continue
  533. # 执行批量更新
  534. try:
  535. with self.pool.connection() as conn:
  536. with conn.cursor() as cursor:
  537. cursor.executemany(base_query, batch_args)
  538. if commit:
  539. conn.commit()
  540. total += cursor.rowcount
  541. self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
  542. except Exception as e:
  543. if commit:
  544. conn.rollback()
  545. self.log.error(f"Batch update failed: {e}")
  546. # 降级为单条更新
  547. for args, data, cond in zip(batch_args, batch_data, batch_conds):
  548. try:
  549. self._execute(base_query, args, commit=commit)
  550. total += 1
  551. except Exception as e2:
  552. self.log.error(f"Single update failed: {e2}, Data: {data}, Condition: {cond}")
  553. self.log.info(f"Total updated rows: {total}")
  554. return total
  555. elif query is not None:
  556. # 处理原始SQL和参数列表
  557. if args_list is None:
  558. raise ValueError("args_list must be provided when using query")
  559. total = 0
  560. for i in range(0, len(args_list), batch_size):
  561. batch_args = args_list[i:i + batch_size]
  562. try:
  563. with self.pool.connection() as conn:
  564. with conn.cursor() as cursor:
  565. cursor.executemany(query, batch_args)
  566. if commit:
  567. conn.commit()
  568. total += cursor.rowcount
  569. self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
  570. except Exception as e:
  571. if commit:
  572. conn.rollback()
  573. self.log.error(f"Batch update failed: {e}")
  574. # 降级为单条更新
  575. for args in batch_args:
  576. try:
  577. self._execute(query, args, commit=commit)
  578. total += 1
  579. except Exception as e2:
  580. self.log.error(f"Single update failed: {e2}, Args: {args}")
  581. self.log.info(f"Total updated rows: {total}")
  582. return total
  583. else:
  584. raise ValueError("Either data_list or query must be provided")
  585. def check_pool_health(self):
  586. """
  587. 检查连接池中有效连接数
  588. # 使用示例
  589. # 配置 MySQL 连接池
  590. sql_pool = MySQLConnectionPool(log=log)
  591. if not sql_pool.check_pool_health():
  592. log.error("数据库连接池异常")
  593. raise RuntimeError("数据库连接池异常")
  594. """
  595. try:
  596. with self.pool.connection() as conn:
  597. conn.ping(reconnect=True)
  598. return True
  599. except Exception as e:
  600. self.log.error(f"Connection pool health check failed: {e}")
  601. return False
  602. def close(self):
  603. """
  604. 关闭连接池,释放所有连接
  605. """
  606. try:
  607. if hasattr(self, 'pool') and self.pool:
  608. self.pool.close()
  609. self.log.info("数据库连接池已关闭")
  610. except Exception as e:
  611. self.log.error(f"关闭连接池失败: {e}")
  612. @staticmethod
  613. def _safe_identifier(name):
  614. """SQL标识符安全校验"""
  615. if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
  616. raise ValueError(f"Invalid SQL identifier: {name}")
  617. return name
  618. if __name__ == '__main__':
  619. sql_pool = MySQLConnectionPool()
  620. data_dic = {'card_type_id': 111, 'card_type_name': '补充包 继承的意志【OPC-13】', 'card_type_position': 964,
  621. 'card_id': 5284, 'card_name': '蒙奇·D·路飞', 'card_number': 'OP13-001', 'card_rarity': 'L',
  622. 'card_img': 'https://source.windoent.com/OnePiecePc/Picture/1757929283612OP13-001.png',
  623. 'card_life': '4', 'card_attribute': '打', 'card_power': '5000', 'card_attack': '-',
  624. 'card_color': '红/绿', 'subscript': 4, 'card_features': '超新星/草帽一伙',
  625. 'card_text_desc': '【咚!!×1】【对方的攻击时】我方处于活跃状态的咚!!不多于5张的场合,可以将我方任意张数的咚!!转为休息状态。每有1张转为休息状态的咚!!,本次战斗中,此领袖或我方最多1张拥有《草帽一伙》特征的角色力量+2000。',
  626. 'card_offer_type': '补充包 继承的意志【OPC-13】', 'crawler_language': '简中'}
  627. sql_pool.insert_one_or_dict(table="one_piece_record", data=data_dic)