# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/6/30 19:35 import time from datetime import datetime import utils import random import inspect import schedule from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed logger.remove() logger.add("./logs/chouka_add_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") def get_draw_card_list(log, sql_pool): """ 获取 抽卡机 列表页信息 :param log: :param sql_pool: """ page = 1 max_page = 500 total_count = 0 sql_pid_list = sql_pool.select_all("SELECT pid FROM qiandao_mall_draw_card_list_record") sql_pid_list = [item[0] for item in sql_pid_list] # sql_pid_list = [] while page <= max_page: len_item, totalCount = draw_card_one_page(log, page, sql_pool, sql_pid_list) if len_item < 20: log.debug( f"--------------- page {page}, len_item: {len_item}, totalCount: {totalCount}, break !!! ---------------") break total_count += len_item if total_count >= int(totalCount): log.debug(f"total_count: {total_count} >= totalCount: {totalCount}, break...........") break page += 1 time.sleep(random.uniform(0.1, 1)) sql_pid_list.clear() @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def draw_card_one_page(log, page, sql_pool, sql_pid_list): """ 获取 抽卡机 单页信息 :param log: :param page: :param sql_pool: :param sql_pid_list: """ log.debug(f"--------------- {inspect.currentframe().f_code.co_name}, page: {page} ---------------") url = "https://api.qiandao.com/shelf-web/list-home-recommender" params = { # "offset": "0", "offset": str((page - 1) * 20), "limit": "20", "cmd": "b2c_homepage_feed", "name": "cgmarket", "project": "channel", "type": "LIVE_GROUPON", "typeIds": [1443323], "withLive": "true", "tagIds": [1701855], "quickCouponTemplateId": "0" } resp_json = utils.request_get_data(log, url, params) # print(resp_json) rows = resp_json["data"]["rows"] parse_draw_card_data(log, rows, sql_pool, sql_pid_list) len_rows = len(rows) total_count = resp_json["data"]["count"] return len_rows, total_count def parse_draw_card_data(log, resp_json, sql_pool, sql_pid_list): log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------") info_list = [] for item in resp_json: # print(item) pid = item.get("id") if pid in sql_pid_list: log.debug(f"{inspect.currentframe().f_code.co_name}, pid: {pid} already exists") continue p_type = item.get("type") orgId = item.get("orgId") title = item.get("name") # description = item.get("description") isSoldOut = item.get("isSoldOut") price = item.get("price", {}).get("unitPriceOfCash") soldAmountText = item.get("soldAmountText") nickname = item.get("org", {}).get("nickname", '') info_dict = { "pid": pid, "p_type": p_type, "title": title, "price": price, "sold_amount_text": soldAmountText, "org_id": orgId, "nickname": nickname, # "description": description, "is_sold_out": isSoldOut } # print(info_dict) sql_pid_list.append(pid) info_list.append(info_dict) if info_list: # sql_pool.insert_many(table="qiandao_mall_draw_card_list_record", data_list=info_list) query = """ INSERT INTO qiandao_mall_draw_card_list_record (pid, p_type, title, price, sold_amount_text, org_id, nickname, is_sold_out) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE p_type = VALUES(p_type), title = VALUES(title), price = VALUES(price), sold_amount_text = VALUES(sold_amount_text), org_id = VALUES(org_id), nickname = VALUES(nickname), is_sold_out = VALUES(is_sold_out) """ args_list = [(item['pid'], item['p_type'], item['title'], item['price'], item['sold_amount_text'], item['org_id'], item['nickname'], item['is_sold_out']) for item in info_list] sql_pool.insert_many(query=query, args_list=args_list) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_draw_card_details(log, sql_pool, pid): """ 获取 抽卡机 详情页信息 :param log: :param sql_pool: :param pid: :return: {'pid': '867827029033121899', 'p_type': 'GACHA', 'org_id': '710150753377039896', 'title': '0.01/包 灵焰包第2弹 可合成 累计抽卡领好礼', 'price': 1.4, 'sold_amount_text': '已售2000+', 'nickname': '哇咔咔', 'is_sold_out': False} """ url = "https://api.qiandao.com/b2c-web/v1/gacha/query/detail" params = { "shelfId": "867827029033121899" } resp_json = utils.request_get_data(log, url, params) # print(resp_json) if resp_json.get("code") == 0: parse_draw_card_details_data(log, resp_json, pid, sql_pool) else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") def parse_draw_card_details_data(log, resp_json, pid, sql_pool): log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------") data = resp_json.get("data") specification = data.get("packageConfigRamark") # 规格说明 introImages = data.get('images', {}).get("introImages", []) introImages = '|'.join(introImages) # 多图链接, |分割 categoryId = data.get("categoryId") vipPrice = data.get("vipPrice") unitPriceOfCash = data.get("unitPriceOfCash") detail_dict = { "specification": specification, "images": introImages, "category_id": categoryId, "unit_price_of_cash": unitPriceOfCash, "vip_price": vipPrice, } # print(detail_dict) sql_pool.update_one_or_dict(table="qiandao_mall_draw_card_list_record", data=detail_dict, condition={"pid": pid}) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_draw_ids(log, pid, sql_pool,max_trading_time): """ 获取 抽卡记录 id列表 信息 :param log: :param pid: :param sql_pool: :param max_trading_time: """ log.debug(f"--------------- {inspect.currentframe().f_code.co_name}, pid: {pid} ---------------") url = "https://api.qiandao.cn/b2c-web/v1/box/gacha/query/digest-draw-records" params = { # "shelfId": "855250935293687891" "shelfId": pid } resp_json = utils.request_get_data(log, url, params) # print(resp_json) if resp_json.get("code") == 0: rows = resp_json.get("data", {}).get("rows", []) draw_id_list = [item.get("id") for item in rows] # 每隔10个抽奖,获取一次 for i in range(0, len(draw_id_list), 10): log.debug( f"{inspect.currentframe().f_code.co_name}, get_draw_ids, i: {i}, len draw_id_list: {len(draw_id_list)}") try: get_draw_buy_data(log, pid, sql_pool, draw_id_list[i:i + 10],max_trading_time) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} call draw_id_list error: {e}") else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_draw_buy_data(log, pid, sql_pool, ids: list,max_trading_time): """ 获取 抽卡记录 信息 :param log: :param pid: :param sql_pool: :param ids: :param max_trading_time: """ url = "https://api.qiandao.cn/b2c-web/v1/box/gacha/query/ids-draw-records" # data = { # "ids": [ # "878724165295676667", # "878724165295676666", # "878724165295676665", # "878724165295676664", # "878724165295676663", # "878723922630058286", # "878723922630058285", # "878723922630058284", # "878723922630058283", # "878723922630058282" # ], # "shelfId": "855250935293687891" # } data = { "ids": ids, # "shelfId": "855250935293687891" "shelfId": pid } resp_json = utils.request_post_data(log, url, data) # print(resp_json) if resp_json.get("code") == 0: try: parse_draw_buy_data(log, resp_json, pid, sql_pool,max_trading_time) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} call parse_draw_buy_data error: {e}") else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") def is_draw_id_exists(log, sql_pool, draw_id): """ 判断 draw_id 是否存在于表中 :param log: 日志记录器 :param sql_pool: 数据库连接池 :param draw_id: 要检查的 draw_id :return: True 如果存在,否则 False """ try: query = """ SELECT EXISTS ( SELECT 1 FROM qiandao_mall_draw_card_buy_record WHERE draw_id = %s ) AS found """ result = sql_pool.select_one(query=query, args=(draw_id,)) return bool(result[0]) if result else False except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") return False def parse_draw_buy_data(log, resp_json, pid, sql_pool,max_trading_time): log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------") rows = resp_json.get('data', {}).get('rows', []) info_list = [] for item in rows: draw_id = item.get("id") # # 判断 draw_id 是否在 qiandao_mall_draw_card_buy_record 表中 # if is_draw_id_exists(log, sql_pool, draw_id): # log.debug(f"draw_id: {draw_id} 已存在,跳过插入") # continue productInfo = item.get("productInfo", {}) spu_id = productInfo.get("id", {}) spuName = productInfo.get("spuName") spuImage = productInfo.get("spuImage") rarity = item.get("rarity") userInfo = item.get("userInfo", {}) buyer_id = userInfo.get("id") buyer_name = userInfo.get("name") draw_time = item.get("time") trading_time_str = utils.transform_ms(log, draw_time) if not trading_time_str: log.debug(f"Invalid trading time: {trading_time_str}") continue # 跳过无效时间 # 字符串 -> datetime trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S") if max_trading_time and trading_time <= max_trading_time: log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据") break draw_dict = { "pid": pid, "draw_id": draw_id, "spu_id": spu_id, "spu_name": spuName, "spu_image": spuImage, "rarity": rarity, "buyer_id": buyer_id, "buyer_name": buyer_name, "draw_time": trading_time_str } # print(draw_dict) info_list.append(draw_dict) if info_list: try: sql_pool.insert_many(table="qiandao_mall_draw_card_buy_record", data_list=info_list) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} call insert_many error: {e}") @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def qd_draw_card_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: # 请求抽卡记录 # sql_pid2_list = sql_pool.select_all("SELECT pid FROM qiandao_mall_draw_card_list_record WHERE is_sold_out = 1") sql_pid2_list = sql_pool.select_all("SELECT pid FROM qiandao_mall_draw_card_list_record") sql_pid2_list = [item[0] for item in sql_pid2_list] # 测试 if not sql_pid2_list: # sql_pid2_list = ["855250935293687891"] log.debug("No sql_pid2_list") return for pid2 in sql_pid2_list: try: # 获取 该pid 购买数据的最大时间。 max_trading_time = sql_pool.select_one( query="SELECT MAX(draw_time) FROM qiandao_mall_draw_card_buy_record WHERE pid = %s", args=(pid2,)) max_trading_time = max_trading_time[0] if max_trading_time else None get_draw_ids(log, pid2, sql_pool,max_trading_time) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} for sql_pid2_list: {pid2}, error: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': qd_draw_card_main(log=logger)