| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2026/1/28 11:12
- import inspect
- import requests
- from loguru import logger
- from datetime import datetime
- from mysql_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- """
- SuperVault
- """
- # logger.remove()
- # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- # level="DEBUG", retention="7 day")
- HEADERS = {
- "User-Agent": "okhttp/4.9.0",
- # "Connection": "Keep-Alive",
- # "Accept-Encoding": "gzip",
- "Authorization": "",
- "CXX-APP-API-VERSION": "V2", # 必须添加
- # "deviceType": "2",
- # "udid": "20f902c10f6163a19bf137d801731d9f",
- # "time": str(int(time.time() * 1000)),
- "Content-Type": "application/json; charset=UTF-8"
- }
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_vod_single_page(log, page_num=1):
- """
- 获取单页数据
- :param log: logger对象
- :param page_num: 页码
- :return: 数据
- """
- url = "https://cxx.cardsvault.net/app/teamup/list"
- data = {
- "pageSize": 20,
- "pageNum": page_num
- }
- response = requests.post(url, headers=HEADERS, json=data, timeout=22)
- response.raise_for_status()
- result = response.json()
- # print(result)
- if result.get("status") == 200:
- data = result.get("data", {})
- total = data.get("total", 0)
- current_page = data.get("pageNum", 1)
- items = data.get("data", [])
- log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
- log.debug(f"当前页数据数量: {len(items)}")
- return {
- "total": total,
- "current_page": current_page,
- "items": items,
- }
- else:
- log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
- return None
- def parse_list_items(log, items):
- """
- 解析列表项
- :param log: logger对象
- :param items: 列表项
- :return: 解析后的列表项
- """
- parsed_items = []
- log.debug(f"正在解析列表项.................")
- for item in items:
- pid = item.get("id")
- serial = item.get("serial") # 编号
- title = item.get("title")
- type_name = item.get("typeName") # 随机卡种
- isPre = item.get("isPre")
- count = item.get("count")
- totalPrice = item.get("totalPrice")
- totalPrice = totalPrice / 100 if totalPrice else 0
- signPrice = item.get("signPrice")
- signPrice = signPrice / 100 if signPrice else 0
- sellTime = item.get("sellTime")
- sellDays = item.get("sellDays")
- status = item.get("status") # 9:完成 8:待发货
- statusName = item.get("statusName")
- description = item.get("description")
- createTime = item.get("createTime")
- cover_url = item.get("cover", {}).get("url") # 封面图
- anchor_id = item.get("anchor", {}).get("id")
- anchor_userName = item.get("anchor", {}).get("userName")
- soldCount = item.get("soldCount")
- detailUrl = item.get("detailUrl")
- goodsUrl = item.get("goodsUrl")
- standardName = item.get("standardName") # 规格
- crawl_date = datetime.now().strftime("%Y-%m-%d")
- parsed_item = {
- "pid": pid,
- "title": title,
- "serial": serial,
- "type_name": type_name,
- "is_pre": isPre,
- "count": count,
- "total_price": totalPrice,
- "sign_price": signPrice,
- "sell_time": sellTime,
- "sell_days": sellDays,
- "status": status,
- "status_name": statusName,
- "description": description,
- "create_time": createTime,
- "cover_url": cover_url,
- "anchor_id": anchor_id,
- "anchor_username": anchor_userName,
- "sold_count": soldCount,
- "detail_url": detailUrl,
- "goods_url": goodsUrl,
- "standard_name": standardName,
- "crawl_date": crawl_date
- }
- # print(parsed_item)
- parsed_items.append(parsed_item)
- return parsed_items
- def get_vod_list(log, sql_pool):
- """
- 获取列表数据
- :param log: logger对象
- :param sql_pool: 数据库连接池
- """
- page_num = 1
- total_pages = 9
- items_per_page = 20 # pageSize
- while page_num <= total_pages:
- log.debug(f"正在获取第 {page_num} 页的数据.................")
- page_result = get_vod_single_page(log, page_num)
- if not page_result:
- log.error(f"获取第 {page_num} 页失败 !!!")
- break
- # 第一次请求时更新真实的总页数
- if page_num == 1:
- total_count = page_result["total"]
- total_pages = (total_count + items_per_page - 1) // items_per_page
- log.info(f"总共 {total_pages} 页")
- # 每页获取后立即解析
- items = parse_list_items(log, page_result["items"])
- sql_pool.insert_many(table="super_vault_on_sale_record", data_list=items, ignore=True)
- page_num += 1
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def cxx_sale_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- # 获取所有 pid
- try:
- get_vod_list(log, sql_pool)
- except Exception as e:
- log.error(f"Error fetching last_product_id: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- if __name__ == '__main__':
- # get_vod_list(logger, None)
- # get_vod_single_page(logger, 1)
- cxx_sale_main(logger)
- # schedule_task()
|