# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2026/1/28 11:12 import inspect import requests from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed """ SuperVault """ # logger.remove() # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", # level="DEBUG", retention="7 day") HEADERS = { "User-Agent": "okhttp/4.9.0", # "Connection": "Keep-Alive", # "Accept-Encoding": "gzip", "Authorization": "", "CXX-APP-API-VERSION": "V2", # 必须添加 # "deviceType": "2", # "udid": "20f902c10f6163a19bf137d801731d9f", # "time": str(int(time.time() * 1000)), "Content-Type": "application/json; charset=UTF-8" } def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_video_url(log, pid): """ 获取视频地址 :param log: logger对象 :param pid: 视频地址 :return: 视频地址 """ log.debug(f"正在获取视频地址: {pid}") url = "https://cxx.cardsvault.net/app/teamup/detail" params = { # "id": "1730" "id": str(pid) } response = requests.get(url, headers=HEADERS, params=params, timeout=22) response.raise_for_status() result = response.json() liveInfo = result.get("data", {}).get("liveInfo", {}) live_id = liveInfo.get("id") if liveInfo else None vodUrl = liveInfo.get("vod_info", {}).get("vodUrl") return live_id, vodUrl @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_vod_single_page(log, page_num=1, token=""): """ 获取单页数据 :param log: logger对象 :param page_num: 页码 :param token: token :return: 数据 """ url = "https://cxx.cardsvault.net/app/teamup/vod/list" data = { "pageSize": 20, "pageNum": page_num } HEADERS["Authorization"] = token response = requests.post(url, headers=HEADERS, json=data, timeout=22) response.raise_for_status() result = response.json() # print(result) if result.get("status") == 200: data = result.get("data", {}) total = data.get("total", 0) current_page = data.get("pageNum", 1) items = data.get("data", []) log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录") log.debug(f"当前页数据数量: {len(items)}") return { "total": total, "current_page": current_page, "items": items, } else: log.error(f"API 返回错误: {result.get('msg', '未知错误')}") return None def parse_list_items(log, items): """ 解析列表项 :param log: logger对象 :param items: 列表项 :return: 解析后的列表项 """ parsed_items = [] log.debug(f"正在解析列表项.................") for item in items: pid = item.get("id") serial = item.get("serial") # 编号 title = item.get("title") type_name = item.get("typeName") # 随机卡种 isPre = item.get("isPre") count = item.get("count") totalPrice = item.get("totalPrice") totalPrice = totalPrice / 100 if totalPrice else 0 signPrice = item.get("signPrice") signPrice = signPrice / 100 if signPrice else 0 sellTime = item.get("sellTime") sellDays = item.get("sellDays") status = item.get("status") # 9:完成 8:待发货 groupNum = item.get("groupNum") description = item.get("description") createTime = item.get("createTime") completionTime = item.get("completionTime") # 完成时间 cover_url = item.get("cover", {}).get("url") # 封面图 anchor_id = item.get("anchor", {}).get("id") anchor_userName = item.get("anchor", {}).get("userName") soldCount = item.get("soldCount") detailUrl = item.get("detailUrl") goodsUrl = item.get("goodsUrl") standardName = item.get("standardName") # 规格 liveTaskTime = item.get("liveTaskTime") # 直播时间 try: live_id, vodUrl = get_video_url(log, pid) except Exception as e: log.error(f"Error getting video URL: {e}") live_id, vodUrl = None, None parsed_item = { "pid": pid, "title": title, "serial": serial, "type_name": type_name, "is_pre": isPre, "count": count, "total_price": totalPrice, "sign_price": signPrice, "sell_time": sellTime, "sell_days": sellDays, "status": status, "group_num": groupNum, "description": description, "create_time": createTime, "completion_time": completionTime, "cover_url": cover_url, "anchor_id": anchor_id, "anchor_username": anchor_userName, "sold_count": soldCount, "detail_url": detailUrl, "goods_url": goodsUrl, "standard_name": standardName, "live_task_time": liveTaskTime, "live_id": live_id, "vod_url": vodUrl } # print(parsed_item) parsed_items.append(parsed_item) return parsed_items def get_vod_list(log, sql_pool, token): """ 获取列表数据 :param log: logger对象 :param sql_pool: 数据库连接池 :param token: token """ page_num = 1 total_pages = 999 items_per_page = 20 # pageSize while page_num <= total_pages: log.debug(f"正在获取第 {page_num} 页的数据.................") page_result = get_vod_single_page(log, page_num, token) if not page_result: log.error(f"获取第 {page_num} 页失败 !!!") break # 第一次请求时更新真实的总页数 if page_num == 1: total_count = page_result["total"] total_pages = (total_count + items_per_page - 1) // items_per_page log.info(f"总共 {total_pages} 页") # 每页获取后立即解析 items = parse_list_items(log, page_result["items"]) sql_pool.insert_many(table="super_vault_product_record", data_list=items, ignore=True) page_num += 1 # ---------------------------------------------------------------------------------------------------------------------- def get_report_single_page(log, page_num, detail_id, token): """ 获取单页数据 :param log: logger对象 :param page_num: 页码 :param detail_id: 商品id :param token: token :return: 数据 """ log.debug(f"正在获取第 {page_num} 页的 <拆卡报告> 数据.................") # token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJDSEFPWElOWElORyNBUFAiLCJhdWQiOiJDSEFPWElOWElORyIsIm5iZiI6MTc2OTU4MTI5NiwiZGF0YSI6Ijk1MjMiLCJpc3MiOiI3ViNweHlQZSIsImV4cCI6MTc3MDc4MTI5NiwiaWF0IjoxNzY5NTgxMjk2LCJqdGkiOiIyYjkwNzZhMS0wYjU1LTQ0ZjItOGZlZC0yMWZiZmI0ZjUyYWIifQ.iDzTZLDslCP0y2nc2Jp4TGEsNbQiCRKcUeRsIyG3iOg" url = "https://cxx.cardsvault.net/app/teamup/report/list" data = { "pageSize": 20, "my": 0, "pageNum": page_num, # "tid": 1780 "tid": detail_id } HEADERS["Authorization"] = token response = requests.post(url, headers=HEADERS, json=data, timeout=22) # print(response.text) response.raise_for_status() result = response.json() if result.get("status") == 200: data = result.get("data", {}) total = data.get("total", 0) current_page = data.get("pageNum", 1) items = data.get("data", []) log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录") log.debug(f"当前页数据数量: {len(items)}") return { "total": total, "current_page": current_page, "items": items } else: log.error(f"API 返回错误: {result.get('msg', '未知错误')}") return None def parse_report_items(log, detail_id, items): """ 解析列表项 :param log: logger对象 :param detail_id: 商品id :param items: 列表项 :return: 解析后的列表项 """ parsed_items = [] log.debug(f"正在解析 <拆卡报告> 列表项.................") for item in items: userName = item.get("userName") level = item.get("level") teamNameCn = item.get("teamNameCn") teamNameEn = item.get("teamNameEn") count = item.get("count") picture_url = item.get("picture", {}).get("url") alias = item.get("alias") # 别名 createTime = item.get("createTime") data_dict = { "pid": detail_id, "user_name": userName, "level": level, "team_name_cn": teamNameCn, "team_name_en": teamNameEn, "count": count, "picture_url": picture_url, "alias": alias, "create_time": createTime } parsed_items.append(data_dict) return parsed_items def get_report_list(log, detail_id, token, sql_pool): """ 获取列表数据 :param log: logger对象 :param detail_id: 商品id :param token: token :param sql_pool: 数据库连接池 """ page_num = 1 total_pages = 9 items_per_page = 20 # pageSize while page_num <= total_pages: log.debug(f"正在获取第 {page_num} 页的数据.................") page_result = get_report_single_page(log, page_num, detail_id, token) if not page_result: log.error(f"获取第 {page_num} 页失败 !!!") break # 第一次请求时更新真实的总页数 if page_num == 1: total_count = page_result["total"] if total_count == 0: log.info("No new records found.") # 更改状态为2 sql_pool.update_one_or_dict( table="super_vault_product_record", data={"report_state": 2}, condition={"pid": detail_id} ) break total_pages = (total_count + items_per_page - 1) // items_per_page log.info(f"总共 {total_pages} 页") items = parse_report_items(log, detail_id, page_result["items"]) sql_pool.insert_many(table="super_vault_report_record", data_list=items, ignore=True) page_num += 1 @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def cxx_his_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: token = sql_pool.select_one("SELECT token FROM super_vault_token") # 获取所有 pid try: get_vod_list(log, sql_pool, token[0]) except Exception as e: log.error(f"Error fetching last_product_id: {e}") # time.sleep(5) # 获取所有 report_state = 0 的 pid sql_detail_id_list = sql_pool.select_all("SELECT pid FROM super_vault_product_record WHERE report_state = 0") if sql_detail_id_list: sql_detail_id_list = [item[0] for item in sql_detail_id_list] for detail_id in sql_detail_id_list: try: get_report_list(log, detail_id, token[0], sql_pool) sql_pool.update_one_or_dict( table="super_vault_product_record", data={"report_state": 1}, condition={"pid": detail_id} ) except Exception as e: log.error(f"Error fetching last_product_id: {e}") # 更改状态为3 sql_pool.update_one_or_dict( table="super_vault_product_record", data={"report_state": 3}, condition={"pid": detail_id} ) else: log.info("No new records found.") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': # get_vod_list(logger, None, '') # get_vod_single_page(logger, 1) cxx_his_main(logger) # get_report_single_page(logger,1, '','')