add_task.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/6/3 16:56
  5. from loguru import logger
  6. from mysq_pool import MySQLConnectionPool
  7. def get_add_cert(start_, end_):
  8. num_list = []
  9. # 循环生成8位数的字符串
  10. for num in range(start_, end_ + 1):
  11. # 使用zfill将数字转换为字符串,并确保长度为8位,不足部分用0填充
  12. formatted_num = str(num).zfill(8)
  13. # print(formatted_num)
  14. num_list.append(formatted_num)
  15. return num_list
  16. sql_pool = MySQLConnectionPool(log=logger)
  17. # logger.remove()
  18. # logger.add(lambda record: print(record), colorize=True, format="<green>{time}</green> <level>{message}</level>")
  19. # 查询最后一条数据 并且 +3000 将新任务插入表中
  20. # max_cert = sql_pool.select_one("SELECT cert_id FROM ags_record ORDER BY id DESC LIMIT 1")
  21. start_max_cert = 270000
  22. end_max_cert = 1000000
  23. new_id_list = get_add_cert(start_max_cert, end_max_cert)
  24. new_id_list = [str(ni) for ni in new_id_list]
  25. # logger.debug(f'查询到最新的 id 为:{max_cert[0]}, 开始生成新数据, 并添加到任务表中.........')
  26. # 防止重复添加任务
  27. # state0_id_list = sql_pool.select_all("SELECT cert_id FROM ags_task")
  28. # print(len(state0_id_list))
  29. # state0_id_list = [i[0] for i in state0_id_list]
  30. #
  31. #
  32. # filtered_new_id_list = []
  33. #
  34. # for ni in new_id_list:
  35. # if str(ni) in state0_id_list:
  36. # logger.debug(f"{ni} 已存在, 跳过添加")
  37. # else:
  38. # filtered_new_id_list.append(ni)
  39. # print(f'查询到 {len(filtered_new_id_list)} 条新的任务, 添加到任务表中......')
  40. # print(filtered_new_id_list)
  41. sql_pool.insert_all("INSERT INTO ags_task(cert_id) VALUES (%s)", new_id_list)