# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/6/3 16:56 from loguru import logger from mysq_pool import MySQLConnectionPool def get_add_cert(start_, end_): num_list = [] # 循环生成8位数的字符串 for num in range(start_, end_ + 1): # 使用zfill将数字转换为字符串,并确保长度为8位,不足部分用0填充 formatted_num = str(num).zfill(8) # print(formatted_num) num_list.append(formatted_num) return num_list sql_pool = MySQLConnectionPool(log=logger) # logger.remove() # logger.add(lambda record: print(record), colorize=True, format="{time} {message}") # 查询最后一条数据 并且 +3000 将新任务插入表中 # max_cert = sql_pool.select_one("SELECT cert_id FROM ags_record ORDER BY id DESC LIMIT 1") start_max_cert = 270000 end_max_cert = 1000000 new_id_list = get_add_cert(start_max_cert, end_max_cert) new_id_list = [str(ni) for ni in new_id_list] # logger.debug(f'查询到最新的 id 为:{max_cert[0]}, 开始生成新数据, 并添加到任务表中.........') # 防止重复添加任务 # state0_id_list = sql_pool.select_all("SELECT cert_id FROM ags_task") # print(len(state0_id_list)) # state0_id_list = [i[0] for i in state0_id_list] # # # filtered_new_id_list = [] # # for ni in new_id_list: # if str(ni) in state0_id_list: # logger.debug(f"{ni} 已存在, 跳过添加") # else: # filtered_new_id_list.append(ni) # print(f'查询到 {len(filtered_new_id_list)} 条新的任务, 添加到任务表中......') # print(filtered_new_id_list) sql_pool.insert_all("INSERT INTO ags_task(cert_id) VALUES (%s)", new_id_list)