| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/7/1 13:13
- import utils
- import inspect
- from loguru import logger
- from datetime import datetime
- from mysql_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- logger.remove()
- logger.add("./logs/kbs_add_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_group_id_list_for_pid_one_page(log, token, pid, sql_pool):
- """
- 获取 每个pid的 group_id列表
- :param log:
- :param token:
- :param pid:
- :param sql_pool:
- """
- log.debug(f"--------------- {inspect.currentframe().f_code.co_name}, pid:{pid} ---------------")
- url = "https://api.qiandao.cn/b2c-web/v1/kuji/query/group/next"
- params = {
- # "shelfId": "776833910955877820"
- "shelfId": pid
- }
- resp_json = utils.request_get_data(log, url, params, token)
- # print(resp_json)
- if resp_json.get("code") == 0:
- parse_group_id_list(log, resp_json, pid, sql_pool)
- elif resp_json.get("code", 0) == 2600030:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
- sql_pool.update_one(
- query="UPDATE qiandao_mall_card_bag_reward_list_record SET bag_state = 2 WHERE pid = %s",
- args=(pid,))
- return
- else:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
- def parse_group_id_list(log, resp_json, pid, sql_pool):
- log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
- data = resp_json.get("data", {}).get("groupSummary", [])
- for item in data:
- group_id = item.get("groupId")
- try:
- # 获取 该组 购买数据的最大时间。
- max_trading_time = sql_pool.select_one(
- query="SELECT MAX(trading_time) FROM qiandao_mall_card_bag_reward_buy_record WHERE group_id = %s",
- args=(group_id,))
- max_trading_time = max_trading_time[0] if max_trading_time else None
- get_card_bag_reward_buyer_data(log, group_id, pid, sql_pool, max_trading_time)
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_card_bag_reward_buyer_data(log, group_id, pid, sql_pool, max_trading_time):
- """
- 获取 卡包赏 购买数据(支持自动翻页)
- :param log: 日志记录器
- :param group_id: 组ID
- :param pid: pid
- :param sql_pool: sql_pool
- :param max_trading_time: max_trading_time
- """
- url = "https://api.qiandao.cn/box/kuji/query/v3/draw-records"
- limit = "10" # 每页数量
- seq = "0" # 初始seq值
- while True:
- params = {
- "groupId": group_id,
- "limit": limit,
- "seq": seq
- }
- resp_json = utils.request_get_data(log, url, params)
- if resp_json.get("code") != 0:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
- break
- # 解析并处理当前页数据
- result = parse_card_bag_reward_buyer_data(log, resp_json, pid, sql_pool, group_id, max_trading_time)
- if not result:
- log.debug("No more data to process.")
- break
- has_more, next_seq = result
- if not has_more:
- log.debug("No more pages to fetch.")
- break
- seq = next_seq
- # time.sleep(random.uniform(0.5, 1))
- def parse_card_bag_reward_buyer_data(log, resp_json, pid, sql_pool, group_id, max_trading_time):
- """
- 解析 卡包赏 购买数据,并返回是否有更多数据及下一页的seq
- :param log: 日志记录器
- :param resp_json: 响应数据
- :param pid: pid
- :param sql_pool: sql_pool
- :param group_id: group_id
- :param max_trading_time: max_trading_time
- :return: (has_more: bool, next_seq: str)
- """
- log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
- data = resp_json.get("data", {})
- records = data.get("records", [])
- if not records:
- return False, None
- info_list = []
- for item in records:
- # print(item)
- trading_time = item.get("time") # 交易时间
- trading_time_str = utils.transform_ms(log, trading_time)
- if not trading_time_str:
- log.debug(f"Invalid trading time: {trading_time_str}")
- continue # 跳过无效时间
- # 字符串 -> datetime
- trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
- if max_trading_time and trading_time <= max_trading_time:
- log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
- return False, None
- productName = item.get("productName")
- userName = item.get("userName")
- userId = item.get("userId")
- level = item.get("level") # 1代表A赏, 以此类推
- levelType = item.get("levelType")
- seq = item.get("seq") # 抽卡顺序
- buy_dict = {
- "pid": pid,
- "group_id": group_id,
- "product_name": productName,
- "user_name": userName,
- "user_id": userId,
- "level": level,
- "level_type": levelType,
- "seq": seq,
- "trading_time": trading_time_str
- }
- # print(buy_dict)
- info_list.append(buy_dict)
- if info_list:
- try:
- sql_pool.insert_many(table="qiandao_mall_card_bag_reward_buy_record", data_list=info_list)
- sql_pool.update_one(
- query="UPDATE qiandao_mall_card_bag_reward_list_record SET bag_state = 1 WHERE pid = %s",
- args=(pid,))
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} call insert_many error: {e}")
- has_more = data.get("hasMore", False)
- next_seq = records[-1].get("seq") if has_more else None # 取最后一条的 seq
- return has_more, next_seq
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def qd_card_bag_reward_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- sql_token = sql_pool.select_one("SELECT token FROM qiandao_token LIMIT 1")
- token = sql_token[0]
- sql_pid2_list = sql_pool.select_all("SELECT DISTINCT pid FROM qiandao_mall_card_bag_reward_list_record")
- # "SELECT DISTINCT pid FROM qiandao_mall_card_bag_reward_list_record WHERE is_sold_out = 1"
- sql_pid2_list = [item[0] for item in sql_pid2_list]
- # 测试
- if not sql_pid2_list:
- # sql_pid2_list = ["873770249084734603"]
- log.debug("No sql_pid2_list")
- return
- for pid2 in sql_pid2_list:
- try:
- get_group_id_list_for_pid_one_page(log, token, pid2, sql_pool)
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} for sql_pid2_list: {pid2}, error: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- if __name__ == '__main__':
- qd_card_bag_reward_main(log=logger)
|