# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2026/1/28 11:12 import inspect import requests from loguru import logger from datetime import datetime from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed """ SuperVault """ # logger.remove() # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", # level="DEBUG", retention="7 day") HEADERS = { "User-Agent": "okhttp/4.9.0", # "Connection": "Keep-Alive", # "Accept-Encoding": "gzip", "Authorization": "", "CXX-APP-API-VERSION": "V2", # 必须添加 # "deviceType": "2", # "udid": "20f902c10f6163a19bf137d801731d9f", # "time": str(int(time.time() * 1000)), "Content-Type": "application/json; charset=UTF-8" } def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_vod_single_page(log, page_num=1): """ 获取单页数据 :param log: logger对象 :param page_num: 页码 :return: 数据 """ url = "https://cxx.cardsvault.net/app/teamup/list" data = { "pageSize": 20, "pageNum": page_num } response = requests.post(url, headers=HEADERS, json=data, timeout=22) response.raise_for_status() result = response.json() # print(result) if result.get("status") == 200: data = result.get("data", {}) total = data.get("total", 0) current_page = data.get("pageNum", 1) items = data.get("data", []) log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录") log.debug(f"当前页数据数量: {len(items)}") return { "total": total, "current_page": current_page, "items": items, } else: log.error(f"API 返回错误: {result.get('msg', '未知错误')}") return None def parse_list_items(log, items): """ 解析列表项 :param log: logger对象 :param items: 列表项 :return: 解析后的列表项 """ parsed_items = [] log.debug(f"正在解析列表项.................") for item in items: pid = item.get("id") serial = item.get("serial") # 编号 title = item.get("title") type_name = item.get("typeName") # 随机卡种 isPre = item.get("isPre") count = item.get("count") totalPrice = item.get("totalPrice") totalPrice = totalPrice / 100 if totalPrice else 0 signPrice = item.get("signPrice") signPrice = signPrice / 100 if signPrice else 0 sellTime = item.get("sellTime") sellDays = item.get("sellDays") status = item.get("status") # 9:完成 8:待发货 statusName = item.get("statusName") description = item.get("description") createTime = item.get("createTime") cover_url = item.get("cover", {}).get("url") # 封面图 anchor_id = item.get("anchor", {}).get("id") anchor_userName = item.get("anchor", {}).get("userName") soldCount = item.get("soldCount") detailUrl = item.get("detailUrl") goodsUrl = item.get("goodsUrl") standardName = item.get("standardName") # 规格 crawl_date = datetime.now().strftime("%Y-%m-%d") parsed_item = { "pid": pid, "title": title, "serial": serial, "type_name": type_name, "is_pre": isPre, "count": count, "total_price": totalPrice, "sign_price": signPrice, "sell_time": sellTime, "sell_days": sellDays, "status": status, "status_name": statusName, "description": description, "create_time": createTime, "cover_url": cover_url, "anchor_id": anchor_id, "anchor_username": anchor_userName, "sold_count": soldCount, "detail_url": detailUrl, "goods_url": goodsUrl, "standard_name": standardName, "crawl_date": crawl_date } # print(parsed_item) parsed_items.append(parsed_item) return parsed_items def get_vod_list(log, sql_pool): """ 获取列表数据 :param log: logger对象 :param sql_pool: 数据库连接池 """ page_num = 1 total_pages = 9 items_per_page = 20 # pageSize while page_num <= total_pages: log.debug(f"正在获取第 {page_num} 页的数据.................") page_result = get_vod_single_page(log, page_num) if not page_result: log.error(f"获取第 {page_num} 页失败 !!!") break # 第一次请求时更新真实的总页数 if page_num == 1: total_count = page_result["total"] total_pages = (total_count + items_per_page - 1) // items_per_page log.info(f"总共 {total_pages} 页") # 每页获取后立即解析 items = parse_list_items(log, page_result["items"]) sql_pool.insert_many(table="super_vault_on_sale_record", data_list=items, ignore=True) page_num += 1 @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def cxx_sale_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: # 获取所有 pid try: get_vod_list(log, sql_pool) except Exception as e: log.error(f"Error fetching last_product_id: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': # get_vod_list(logger, None) # get_vod_single_page(logger, 1) cxx_sale_main(logger) # schedule_task()