# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/7/2 16:02 import time from datetime import datetime import utils import inspect import schedule from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed logger.remove() logger.add("./logs/mall_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") CRAWL_CATEGORY = '潮玩' def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") def get_toy_mall_list(log, sql_pool,sql_mall_list): page = 1 max_page = 500 total_count = 0 while page <= max_page: len_item, totalCount = get_toy_mall_single_page(log, page, sql_pool,sql_mall_list) if len_item < 20: log.debug( f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount} ---------------") break total_count += len_item if total_count >= int(totalCount): log.debug(f"total_count: {total_count} totalCount: {totalCount}") break page += 1 # time.sleep(random.uniform(0.1, 1)) def get_toy_mall_single_page(log, page, sql_pool,sql_mall_list): url = "https://api.qiandao.cn/shelf-web/list-home-recommender" params = { # "offset": 0, "offset": (page - 1) * 20, "limit": 20, "cmd": "b2c_homepage_feed", "name": "manghe", "project": "channel", # "type[]": "BLIND_BOX_MACHINE", # "type[]": "KUJI", # "type[]": "NEW_LUCKY_BAG", # "type[]": "B2C", "type": "", "withLive": "true" } try: resp_json = utils.request_get_data(logger, url, params, proxy=False) # print(resp_json) if not resp_json: log.error(f"get_toy_mall_single_page page:{page}") if resp_json.get("code", -1) == 0: rows = resp_json.get("data", {}).get("rows", []) total_count = resp_json.get("data", {}).get("count", 0) try: parse_mall_data(log, rows, sql_pool,sql_mall_list) except Exception as e: log.error(f"parse_mall_data error: {e}") return len(rows), total_count else: log.error(f"{inspect.currentframe().f_code.co_name} -> page:{page} error: {resp_json.get('message')}") return 0, 0 except Exception as e: logger.error(f"Request failed: {e}") return 0, 0 def parse_mall_data(log, rows, sql_pool,sql_mall_list): info_list = [] for row in rows: pid = row.get("id") if pid in sql_mall_list: log.info(f"pid:{pid} is exist, skip .......") continue p_type = row.get("type") orgId = row.get("orgId") title = row.get("name") isSoldOut = row.get("isSoldOut") price = row.get("price", {}).get("unitPriceOfCash") if row.get("price") else None soldAmountText = row.get("soldAmountText") # 已售 nickname = row.get("org", {}).get("nickname", '') if row.get("org") else None # sell_time = row.get("sellTime") # sell_time = utils.translate_s(log, sell_time) if sell_time or sell_time != '0' else None # images = row.get("coverImages", []) # images = '|'.join(images) data_dict = { "pid": pid, "p_type": p_type, "title": title, "price": price, "sold_amount_text": soldAmountText, "org_id": orgId, "nickname": nickname, # "description": description, "is_sold_out": isSoldOut, # "sell_time": sell_time, # "images": images, "crawl_category": CRAWL_CATEGORY } info_list.append(data_dict) # sql_mall_list.append(pid) if info_list: query = """ INSERT INTO qiandao_mall_list_record (pid, p_type, title, price, sold_amount_text, org_id, nickname, is_sold_out, crawl_category) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE p_type = VALUES(p_type), title = VALUES(title), price = VALUES(price), sold_amount_text = VALUES(sold_amount_text), org_id = VALUES(org_id), nickname = VALUES(nickname), is_sold_out = VALUES(is_sold_out), crawl_category = VALUES(crawl_category) """ args_list = [(item['pid'], item['p_type'], item['title'], item['price'], item['sold_amount_text'], item['org_id'], item['nickname'], item['is_sold_out'], item['crawl_category']) for item in info_list] try: sql_pool.insert_many(query=query, args_list=args_list) except Exception as e: log.error(f"parse_mall_data -> sql_pool.insert_many error: {e}") # ---------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_group_id_list_for_pid_one_page(log, token, pid, sql_pool): """ 获取 每个pid的 group_id列表 :param log: :param token: :param pid: :param sql_pool: """ log.debug(f"--------------- {inspect.currentframe().f_code.co_name}, pid:{pid} ---------------") url = "https://api.qiandao.cn/b2c-web/v1/kuji/query/group/next" params = { # "shelfId": "776833910955877820" "shelfId": pid } try: resp_json = utils.request_get_data(log, url, params, token) # print(resp_json) if resp_json.get("code") == 0: parse_group_id_list(log, resp_json, pid, sql_pool) elif resp_json.get("code", 0) == 2600030: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") sql_pool.update_one( query="UPDATE qiandao_mall_list_record SET bag_state = 2 WHERE pid = %s", args=(pid,)) return else: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") except Exception as e: log.error(f"Request failed: {e}") def parse_group_id_list(log, resp_json, pid, sql_pool): log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------") data = resp_json.get("data", {}).get("groupSummary", []) for item in data: group_id = item.get("groupId") try: # 获取 该组 购买数据的最大时间。 max_trading_time = sql_pool.select_one( query=f"SELECT MAX(trading_time) FROM qiandao_mall_card_bag_reward_buy_record WHERE group_id = %s AND crawl_category = '{CRAWL_CATEGORY}'", args=(group_id,)) max_trading_time = max_trading_time[0] if max_trading_time else None get_card_bag_reward_buyer_data(log, group_id, pid, sql_pool, max_trading_time) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_card_bag_reward_buyer_data(log, group_id, pid, sql_pool, max_trading_time): """ 获取 卡包赏 购买数据(支持自动翻页) :param log: 日志记录器 :param group_id: 组ID :param pid: pid :param sql_pool: sql_pool :param max_trading_time: max_trading_time """ url = "https://api.qiandao.cn/box/kuji/query/v3/draw-records" limit = "10" # 每页数量 seq = "0" # 初始seq值 while True: params = { "groupId": group_id, "limit": limit, "seq": seq } resp_json = utils.request_get_data(log, url, params) if resp_json.get("code") != 0: log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}") break # 解析并处理当前页数据 result = parse_card_bag_reward_buyer_data(log, resp_json, pid, sql_pool, group_id, max_trading_time) if not result: log.debug("No more data to process.") break has_more, next_seq = result if not has_more: log.debug("No more pages to fetch.") break seq = next_seq # time.sleep(random.uniform(0.5, 1)) def parse_card_bag_reward_buyer_data(log, resp_json, pid, sql_pool, group_id, max_trading_time): """ 解析 卡包赏 购买数据,并返回是否有更多数据及下一页的seq :param log: 日志记录器 :param resp_json: 响应数据 :param pid: pid :param sql_pool: sql_pool :param group_id: group_id :param max_trading_time: max_trading_time :return: (has_more: bool, next_seq: str) """ log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------") data = resp_json.get("data", {}) records = data.get("records", []) if not records: return False, None info_list = [] for item in records: # print(item) trading_time = item.get("time") # 交易时间 trading_time_str = utils.transform_ms(log, trading_time) if not trading_time_str: log.debug(f"Invalid trading time: {trading_time_str}") continue # 跳过无效时间 # 字符串 -> datetime trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S") if max_trading_time and trading_time <= max_trading_time: log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据") return False, None productName = item.get("productName") userName = item.get("userName") userId = item.get("userId") level = item.get("level") # 1代表A赏, 以此类推 levelType = item.get("levelType") seq = item.get("seq") # 抽卡顺序 buy_dict = { "pid": pid, "group_id": group_id, "product_name": productName, "user_name": userName, "user_id": userId, "level": level, "level_type": levelType, "seq": seq, "trading_time": trading_time_str, "crawl_category": CRAWL_CATEGORY } # print(buy_dict) info_list.append(buy_dict) if info_list: try: sql_pool.insert_many(table="qiandao_mall_card_bag_reward_buy_record", data_list=info_list) sql_pool.update_one( query="UPDATE qiandao_mall_list_record SET bag_state = 1 WHERE pid = %s", args=(pid,)) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} call insert_many error: {e}") has_more = data.get("hasMore", False) next_seq = records[-1].get("seq") if has_more else None # 取最后一条的 seq return has_more, next_seq @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def qd_mall_main(log=logger): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: # 请求列表页 try: sql_mall_list = sql_pool.select_all( f"# SELECT DISTINCT pid FROM qiandao_mall_list_record WHERE crawl_category = '{CRAWL_CATEGORY}'") sql_mall_list = [item[0] for item in sql_mall_list] get_toy_mall_list(log, sql_pool, sql_mall_list) # get_toy_mall_list(log, sql_pool) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} error: {e}") time.sleep(5) # 请求详情页 sql_token = sql_pool.select_one("SELECT token FROM qiandao_token LIMIT 1") token = sql_token[0] sql_pid_one_list = sql_pool.select_all( f"SELECT DISTINCT pid FROM qiandao_mall_list_record WHERE p_type = 'KUJI' AND crawl_category = '{CRAWL_CATEGORY}'") sql_pid_one_list = [item[0] for item in sql_pid_one_list] # sql_pid_two_list = sql_pool.select_all( # f"# SELECT DISTINCT pid FROM qiandao_mall_list_record WHERE p_type = 'GACHA' AND crawl_category = '{CRAWL_CATEGORY}'") # sql_pid_two_list = [item[0] for item in sql_pid_two_list] # sql_pid_three_list = sql_pool.select_all( # f"# SELECT DISTINCT pid FROM qiandao_mall_list_record WHERE p_type = 'LIVE_GROUPON' AND crawl_category = '{CRAWL_CATEGORY}'") # sql_pid_three_list = [item[0] for item in sql_pid_three_list] # 测试 if not sql_pid_one_list: # sql_pid2_list = ["873770249084734603"] log.debug("No sql_pid_one_list") return for pid2 in sql_pid_one_list: try: get_group_id_list_for_pid_one_page(log, token, pid2, sql_pool) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} for sql_pid2_list: {pid2}, error: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') def schedule_task(): """ 爬虫模块 定时任务 的启动文件 """ # 立即运行一次任务 # qd_mall_main() # 设置定时任务 schedule.every().day.at("00:01").do(qd_mall_main) while True: schedule.run_pending() time.sleep(1) if __name__ == '__main__': schedule_task()