卡包赏补充.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/7/1 13:13
  5. import utils
  6. import inspect
  7. from loguru import logger
  8. from datetime import datetime
  9. from mysql_pool import MySQLConnectionPool
  10. from tenacity import retry, stop_after_attempt, wait_fixed
  11. logger.remove()
  12. logger.add("./logs/kbs_add_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  13. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  14. level="DEBUG", retention="7 day")
  15. def after_log(retry_state):
  16. """
  17. retry 回调
  18. :param retry_state: RetryCallState 对象
  19. """
  20. # 检查 args 是否存在且不为空
  21. if retry_state.args and len(retry_state.args) > 0:
  22. log = retry_state.args[0] # 获取传入的 logger
  23. else:
  24. log = logger # 使用全局 logger
  25. if retry_state.outcome.failed:
  26. log.warning(
  27. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  28. else:
  29. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  30. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  31. def get_group_id_list_for_pid_one_page(log, token, pid, sql_pool):
  32. """
  33. 获取 每个pid的 group_id列表
  34. :param log:
  35. :param token:
  36. :param pid:
  37. :param sql_pool:
  38. """
  39. log.debug(f"--------------- {inspect.currentframe().f_code.co_name}, pid:{pid} ---------------")
  40. url = "https://api.qiandao.cn/b2c-web/v1/kuji/query/group/next"
  41. params = {
  42. # "shelfId": "776833910955877820"
  43. "shelfId": pid
  44. }
  45. resp_json = utils.request_get_data(log, url, params, token)
  46. # print(resp_json)
  47. if resp_json.get("code") == 0:
  48. parse_group_id_list(log, resp_json, pid, sql_pool)
  49. elif resp_json.get("code", 0) == 2600030:
  50. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  51. sql_pool.update_one(
  52. query="UPDATE qiandao_mall_card_bag_reward_list_record SET bag_state = 2 WHERE pid = %s",
  53. args=(pid,))
  54. return
  55. else:
  56. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  57. def parse_group_id_list(log, resp_json, pid, sql_pool):
  58. log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
  59. data = resp_json.get("data", {}).get("groupSummary", [])
  60. for item in data:
  61. group_id = item.get("groupId")
  62. try:
  63. # 获取 该组 购买数据的最大时间。
  64. max_trading_time = sql_pool.select_one(
  65. query="SELECT MAX(trading_time) FROM qiandao_mall_card_bag_reward_buy_record WHERE group_id = %s",
  66. args=(group_id,))
  67. max_trading_time = max_trading_time[0] if max_trading_time else None
  68. get_card_bag_reward_buyer_data(log, group_id, pid, sql_pool, max_trading_time)
  69. except Exception as e:
  70. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  71. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  72. def get_card_bag_reward_buyer_data(log, group_id, pid, sql_pool, max_trading_time):
  73. """
  74. 获取 卡包赏 购买数据(支持自动翻页)
  75. :param log: 日志记录器
  76. :param group_id: 组ID
  77. :param pid: pid
  78. :param sql_pool: sql_pool
  79. :param max_trading_time: max_trading_time
  80. """
  81. url = "https://api.qiandao.cn/box/kuji/query/v3/draw-records"
  82. limit = "10" # 每页数量
  83. seq = "0" # 初始seq值
  84. while True:
  85. params = {
  86. "groupId": group_id,
  87. "limit": limit,
  88. "seq": seq
  89. }
  90. resp_json = utils.request_get_data(log, url, params)
  91. if resp_json.get("code") != 0:
  92. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  93. break
  94. # 解析并处理当前页数据
  95. result = parse_card_bag_reward_buyer_data(log, resp_json, pid, sql_pool, group_id, max_trading_time)
  96. if not result:
  97. log.debug("No more data to process.")
  98. break
  99. has_more, next_seq = result
  100. if not has_more:
  101. log.debug("No more pages to fetch.")
  102. break
  103. seq = next_seq
  104. # time.sleep(random.uniform(0.5, 1))
  105. def parse_card_bag_reward_buyer_data(log, resp_json, pid, sql_pool, group_id, max_trading_time):
  106. """
  107. 解析 卡包赏 购买数据,并返回是否有更多数据及下一页的seq
  108. :param log: 日志记录器
  109. :param resp_json: 响应数据
  110. :param pid: pid
  111. :param sql_pool: sql_pool
  112. :param group_id: group_id
  113. :param max_trading_time: max_trading_time
  114. :return: (has_more: bool, next_seq: str)
  115. """
  116. log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
  117. data = resp_json.get("data", {})
  118. records = data.get("records", [])
  119. if not records:
  120. return False, None
  121. info_list = []
  122. for item in records:
  123. # print(item)
  124. trading_time = item.get("time") # 交易时间
  125. trading_time_str = utils.transform_ms(log, trading_time)
  126. if not trading_time_str:
  127. log.debug(f"Invalid trading time: {trading_time_str}")
  128. continue # 跳过无效时间
  129. # 字符串 -> datetime
  130. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  131. if max_trading_time and trading_time <= max_trading_time:
  132. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  133. return False, None
  134. productName = item.get("productName")
  135. userName = item.get("userName")
  136. userId = item.get("userId")
  137. level = item.get("level") # 1代表A赏, 以此类推
  138. levelType = item.get("levelType")
  139. seq = item.get("seq") # 抽卡顺序
  140. buy_dict = {
  141. "pid": pid,
  142. "group_id": group_id,
  143. "product_name": productName,
  144. "user_name": userName,
  145. "user_id": userId,
  146. "level": level,
  147. "level_type": levelType,
  148. "seq": seq,
  149. "trading_time": trading_time_str
  150. }
  151. # print(buy_dict)
  152. info_list.append(buy_dict)
  153. if info_list:
  154. try:
  155. sql_pool.insert_many(table="qiandao_mall_card_bag_reward_buy_record", data_list=info_list)
  156. sql_pool.update_one(
  157. query="UPDATE qiandao_mall_card_bag_reward_list_record SET bag_state = 1 WHERE pid = %s",
  158. args=(pid,))
  159. except Exception as e:
  160. log.error(f"{inspect.currentframe().f_code.co_name} call insert_many error: {e}")
  161. has_more = data.get("hasMore", False)
  162. next_seq = records[-1].get("seq") if has_more else None # 取最后一条的 seq
  163. return has_more, next_seq
  164. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  165. def qd_card_bag_reward_main(log):
  166. """
  167. 主函数
  168. :param log: logger对象
  169. """
  170. log.info(
  171. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  172. # 配置 MySQL 连接池
  173. sql_pool = MySQLConnectionPool(log=log)
  174. if not sql_pool.check_pool_health():
  175. log.error("数据库连接池异常")
  176. raise RuntimeError("数据库连接池异常")
  177. try:
  178. sql_token = sql_pool.select_one("SELECT token FROM qiandao_token LIMIT 1")
  179. token = sql_token[0]
  180. sql_pid2_list = sql_pool.select_all("SELECT DISTINCT pid FROM qiandao_mall_card_bag_reward_list_record")
  181. # "SELECT DISTINCT pid FROM qiandao_mall_card_bag_reward_list_record WHERE is_sold_out = 1"
  182. sql_pid2_list = [item[0] for item in sql_pid2_list]
  183. # 测试
  184. if not sql_pid2_list:
  185. # sql_pid2_list = ["873770249084734603"]
  186. log.debug("No sql_pid2_list")
  187. return
  188. for pid2 in sql_pid2_list:
  189. try:
  190. get_group_id_list_for_pid_one_page(log, token, pid2, sql_pool)
  191. except Exception as e:
  192. log.error(f"{inspect.currentframe().f_code.co_name} for sql_pid2_list: {pid2}, error: {e}")
  193. except Exception as e:
  194. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  195. finally:
  196. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  197. if __name__ == '__main__':
  198. qd_card_bag_reward_main(log=logger)