||
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2026/1/28 11:12
- import inspect
- import requests
- from loguru import logger
- from mysql_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- """
- SuperVault
- """
- # logger.remove()
- # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- # level="DEBUG", retention="7 day")
- HEADERS = {
- "User-Agent": "okhttp/4.9.0",
- # "Connection": "Keep-Alive",
- # "Accept-Encoding": "gzip",
- "Authorization": "",
- "CXX-APP-API-VERSION": "V2", # 必须添加
- # "deviceType": "2",
- # "udid": "20f902c10f6163a19bf137d801731d9f",
- # "time": str(int(time.time() * 1000)),
- "Content-Type": "application/json; charset=UTF-8"
- }
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_proxys(log):
- """
- 获取代理
- :return: 代理
- """
- tunnel = "x371.kdltps.com:15818"
- kdl_username = "t13753103189895"
- kdl_password = "o0yefv6z"
- try:
- proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
- }
- return proxies
- except Exception as e:
- log.error(f"Error getting proxy: {e}")
- raise e
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_video_url(log, pid):
- """
- 获取视频地址
- :param log: logger对象
- :param pid: 视频地址
- :return: 视频地址
- """
- log.debug(f"正在获取视频地址: {pid}")
- url = "https://cxx.cardsvault.net/app/teamup/detail"
- params = {
- # "id": "1730"
- "id": str(pid)
- }
- response = requests.get(url, headers=HEADERS, params=params, timeout=22)
- response.raise_for_status()
- result = response.json()
- liveInfo = result.get("data", {}).get("liveInfo", {})
- live_id = liveInfo.get("id") if liveInfo else None
- vodUrl = liveInfo.get("vod_info", {}).get("vodUrl")
- return live_id, vodUrl
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_vod_single_page(log, page_num=1, token=""):
- """
- 获取单页数据
- :param log: logger对象
- :param page_num: 页码
- :param token: token
- :return: 数据
- """
- url = "https://cxx.cardsvault.net/app/teamup/vod/list"
- data = {
- "pageSize": 20,
- "pageNum": page_num
- }
- HEADERS["Authorization"] = token
- response = requests.post(url, headers=HEADERS, json=data, timeout=22)
- response.raise_for_status()
- result = response.json()
- # print(result)
- if result.get("status") == 200:
- data = result.get("data", {})
- total = data.get("total", 0)
- current_page = data.get("pageNum", 1)
- items = data.get("data", [])
- log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
- log.debug(f"当前页数据数量: {len(items)}")
- return {
- "total": total,
- "current_page": current_page,
- "items": items,
- }
- else:
- log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
- return None
- def parse_list_items(log, items):
- """
- 解析列表项
- :param log: logger对象
- :param items: 列表项
- :return: 解析后的列表项
- """
- parsed_items = []
- log.debug(f"正在解析列表项.................")
- for item in items:
- pid = item.get("id")
- serial = item.get("serial") # 编号
- title = item.get("title")
- type_name = item.get("typeName") # 随机卡种
- isPre = item.get("isPre")
- count = item.get("count")
- totalPrice = item.get("totalPrice")
- totalPrice = totalPrice / 100 if totalPrice else 0
- signPrice = item.get("signPrice")
- signPrice = signPrice / 100 if signPrice else 0
- sellTime = item.get("sellTime")
- sellDays = item.get("sellDays")
- status = item.get("status") # 9:完成 8:待发货
- groupNum = item.get("groupNum")
- description = item.get("description")
- createTime = item.get("createTime")
- completionTime = item.get("completionTime") # 完成时间
- cover_url = item.get("cover", {}).get("url") # 封面图
- anchor_id = item.get("anchor", {}).get("id")
- anchor_userName = item.get("anchor", {}).get("userName")
- soldCount = item.get("soldCount")
- detailUrl = item.get("detailUrl")
- goodsUrl = item.get("goodsUrl")
- standardName = item.get("standardName") # 规格
- liveTaskTime = item.get("liveTaskTime") # 直播时间
- try:
- live_id, vodUrl = get_video_url(log, pid)
- except Exception as e:
- log.error(f"Error getting video URL: {e}")
- live_id, vodUrl = None, None
- parsed_item = {
- "pid": pid,
- "title": title,
- "serial": serial,
- "type_name": type_name,
- "is_pre": isPre,
- "count": count,
- "total_price": totalPrice,
- "sign_price": signPrice,
- "sell_time": sellTime,
- "sell_days": sellDays,
- "status": status,
- "group_num": groupNum,
- "description": description,
- "create_time": createTime,
- "completion_time": completionTime,
- "cover_url": cover_url,
- "anchor_id": anchor_id,
- "anchor_username": anchor_userName,
- "sold_count": soldCount,
- "detail_url": detailUrl,
- "goods_url": goodsUrl,
- "standard_name": standardName,
- "live_task_time": liveTaskTime,
- "live_id": live_id,
- "vod_url": vodUrl
- }
- # print(parsed_item)
- parsed_items.append(parsed_item)
- return parsed_items
- def get_vod_list(log, sql_pool, token):
- """
- 获取列表数据
- :param log: logger对象
- :param sql_pool: 数据库连接池
- :param token: token
- """
- page_num = 1
- total_pages = 999
- items_per_page = 20 # pageSize
- while page_num <= total_pages:
- log.debug(f"正在获取第 {page_num} 页的数据.................")
- page_result = get_vod_single_page(log, page_num, token)
- if not page_result:
- log.error(f"获取第 {page_num} 页失败 !!!")
- break
- # 第一次请求时更新真实的总页数
- if page_num == 1:
- total_count = page_result["total"]
- total_pages = (total_count + items_per_page - 1) // items_per_page
- log.info(f"总共 {total_pages} 页")
- # 每页获取后立即解析
- items = parse_list_items(log, page_result["items"])
- sql_pool.insert_many(table="super_vault_product_record", data_list=items, ignore=True)
- page_num += 1
- # ----------------------------------------------------------------------------------------------------------------------
- def get_report_single_page(log, page_num, detail_id, token):
- """
- 获取单页数据
- :param log: logger对象
- :param page_num: 页码
- :param detail_id: 商品id
- :param token: token
- :return: 数据
- """
- log.debug(f"正在获取第 {page_num} 页的 <拆卡报告> 数据.................")
- # token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJDSEFPWElOWElORyNBUFAiLCJhdWQiOiJDSEFPWElOWElORyIsIm5iZiI6MTc2OTU4MTI5NiwiZGF0YSI6Ijk1MjMiLCJpc3MiOiI3ViNweHlQZSIsImV4cCI6MTc3MDc4MTI5NiwiaWF0IjoxNzY5NTgxMjk2LCJqdGkiOiIyYjkwNzZhMS0wYjU1LTQ0ZjItOGZlZC0yMWZiZmI0ZjUyYWIifQ.iDzTZLDslCP0y2nc2Jp4TGEsNbQiCRKcUeRsIyG3iOg"
- url = "https://cxx.cardsvault.net/app/teamup/report/list"
- data = {
- "pageSize": 20,
- "my": 0,
- "pageNum": page_num,
- # "tid": 1780
- "tid": detail_id
- }
- HEADERS["Authorization"] = token
- response = requests.post(url, headers=HEADERS, json=data, timeout=22)
- # print(response.text)
- response.raise_for_status()
- result = response.json()
- if result.get("status") == 200:
- data = result.get("data", {})
- total = data.get("total", 0)
- current_page = data.get("pageNum", 1)
- items = data.get("data", [])
- log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
- log.debug(f"当前页数据数量: {len(items)}")
- return {
- "total": total,
- "current_page": current_page,
- "items": items
- }
- else:
- log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
- return None
- def parse_report_items(log, detail_id, items):
- """
- 解析列表项
- :param log: logger对象
- :param detail_id: 商品id
- :param items: 列表项
- :return: 解析后的列表项
- """
- parsed_items = []
- log.debug(f"正在解析 <拆卡报告> 列表项.................")
- for item in items:
- userName = item.get("userName")
- level = item.get("level")
- teamNameCn = item.get("teamNameCn")
- teamNameEn = item.get("teamNameEn")
- count = item.get("count")
- picture_url = item.get("picture", {}).get("url")
- alias = item.get("alias") # 别名
- createTime = item.get("createTime")
- data_dict = {
- "pid": detail_id,
- "user_name": userName,
- "level": level,
- "team_name_cn": teamNameCn,
- "team_name_en": teamNameEn,
- "count": count,
- "picture_url": picture_url,
- "alias": alias,
- "create_time": createTime
- }
- parsed_items.append(data_dict)
- return parsed_items
- def get_report_list(log, detail_id, token, sql_pool):
- """
- 获取列表数据
- :param log: logger对象
- :param detail_id: 商品id
- :param token: token
- :param sql_pool: 数据库连接池
- """
- page_num = 1
- total_pages = 9
- items_per_page = 20 # pageSize
- while page_num <= total_pages:
- log.debug(f"正在获取第 {page_num} 页的数据.................")
- page_result = get_report_single_page(log, page_num, detail_id, token)
- if not page_result:
- log.error(f"获取第 {page_num} 页失败 !!!")
- break
- # 第一次请求时更新真实的总页数
- if page_num == 1:
- total_count = page_result["total"]
- if total_count == 0:
- log.info("No new records found.")
- # 更改状态为2
- sql_pool.update_one_or_dict(
- table="super_vault_product_record",
- data={"report_state": 2},
- condition={"pid": detail_id}
- )
- break
- total_pages = (total_count + items_per_page - 1) // items_per_page
- log.info(f"总共 {total_pages} 页")
- items = parse_report_items(log, detail_id, page_result["items"])
- sql_pool.insert_many(table="super_vault_report_record", data_list=items, ignore=True)
- page_num += 1
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def cxx_his_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- token = sql_pool.select_one("SELECT token FROM super_vault_token")
- # 获取所有 pid
- try:
- get_vod_list(log, sql_pool, token[0])
- except Exception as e:
- log.error(f"Error fetching last_product_id: {e}")
- # time.sleep(5)
- # 获取所有 report_state = 0 的 pid
- sql_detail_id_list = sql_pool.select_all("SELECT pid FROM super_vault_product_record WHERE report_state = 0")
- if sql_detail_id_list:
- sql_detail_id_list = [item[0] for item in sql_detail_id_list]
- for detail_id in sql_detail_id_list:
- try:
- get_report_list(log, detail_id, token[0], sql_pool)
- sql_pool.update_one_or_dict(
- table="super_vault_product_record",
- data={"report_state": 1},
- condition={"pid": detail_id}
- )
- except Exception as e:
- log.error(f"Error fetching last_product_id: {e}")
- # 更改状态为3
- sql_pool.update_one_or_dict(
- table="super_vault_product_record",
- data={"report_state": 3},
- condition={"pid": detail_id}
- )
- else:
- log.info("No new records found.")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- if __name__ == '__main__':
- # get_vod_list(logger, None, '')
- # get_vod_single_page(logger, 1)
- cxx_his_main(logger)
- # get_report_single_page(logger,1, '','')
|