super_vault_daily_spider.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2026/1/28 11:12
  5. import inspect
  6. import time
  7. import requests
  8. from loguru import logger
  9. from mysql_pool import MySQLConnectionPool
  10. from tenacity import retry, stop_after_attempt, wait_fixed
  11. """
  12. SuperVault
  13. """
  14. # logger.remove()
  15. # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  16. # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  17. # level="DEBUG", retention="7 day")
  18. HEADERS = {
  19. "User-Agent": "okhttp/4.9.0",
  20. # "Connection": "Keep-Alive",
  21. # "Accept-Encoding": "gzip",
  22. "Authorization": "",
  23. "CXX-APP-API-VERSION": "V2", # 必须添加
  24. # "deviceType": "2",
  25. # "udid": "20f902c10f6163a19bf137d801731d9f",
  26. # "time": str(int(time.time() * 1000)),
  27. "Content-Type": "application/json; charset=UTF-8"
  28. }
  29. def after_log(retry_state):
  30. """
  31. retry 回调
  32. :param retry_state: RetryCallState 对象
  33. """
  34. # 检查 args 是否存在且不为空
  35. if retry_state.args and len(retry_state.args) > 0:
  36. log = retry_state.args[0] # 获取传入的 logger
  37. else:
  38. log = logger # 使用全局 logger
  39. if retry_state.outcome.failed:
  40. log.warning(
  41. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  42. else:
  43. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  44. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  45. def get_proxys(log):
  46. """
  47. 获取代理
  48. :return: 代理
  49. """
  50. tunnel = "x371.kdltps.com:15818"
  51. kdl_username = "t13753103189895"
  52. kdl_password = "o0yefv6z"
  53. try:
  54. proxies = {
  55. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
  56. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
  57. }
  58. return proxies
  59. except Exception as e:
  60. log.error(f"Error getting proxy: {e}")
  61. raise e
  62. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  63. def get_video_url(log, pid):
  64. """
  65. 获取视频地址
  66. :param log: logger对象
  67. :param pid: 视频地址
  68. :return: 视频地址
  69. """
  70. log.debug(f"正在获取视频地址: {pid}")
  71. url = "https://cxx.cardsvault.net/app/teamup/detail"
  72. params = {
  73. # "id": "1730"
  74. "id": str(pid)
  75. }
  76. response = requests.get(url, headers=HEADERS, params=params, timeout=22)
  77. response.raise_for_status()
  78. result = response.json()
  79. liveInfo = result.get("data", {}).get("liveInfo", {})
  80. live_id = liveInfo.get("id") if liveInfo else None
  81. vodUrl = liveInfo.get("vod_info", {}).get("vodUrl")
  82. return live_id, vodUrl
  83. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  84. def get_vod_single_page(log, page_num=1, token=""):
  85. """
  86. 获取单页数据
  87. :param log: logger对象
  88. :param page_num: 页码
  89. :param token: token
  90. :return: 数据
  91. """
  92. url = "https://cxx.cardsvault.net/app/teamup/vod/list"
  93. data = {
  94. "pageSize": 20,
  95. "pageNum": page_num
  96. }
  97. HEADERS["Authorization"] = token
  98. response = requests.post(url, headers=HEADERS, json=data, timeout=22)
  99. response.raise_for_status()
  100. result = response.json()
  101. # print(result)
  102. if result.get("status") == 200:
  103. data = result.get("data", {})
  104. total = data.get("total", 0)
  105. current_page = data.get("pageNum", 1)
  106. items = data.get("data", [])
  107. log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
  108. log.debug(f"当前页数据数量: {len(items)}")
  109. return {
  110. "total": total,
  111. "current_page": current_page,
  112. "items": items,
  113. }
  114. else:
  115. log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
  116. return None
  117. def parse_list_items(log, items):
  118. """
  119. 解析列表项
  120. :param log: logger对象
  121. :param items: 列表项
  122. :return: 解析后的列表项
  123. """
  124. parsed_items = []
  125. log.debug(f"正在解析列表项.................")
  126. for item in items:
  127. pid = item.get("id")
  128. serial = item.get("serial") # 编号
  129. title = item.get("title")
  130. type_name = item.get("typeName") # 随机卡种
  131. isPre = item.get("isPre")
  132. count = item.get("count")
  133. totalPrice = item.get("totalPrice")
  134. totalPrice = totalPrice / 100 if totalPrice else 0
  135. signPrice = item.get("signPrice")
  136. signPrice = signPrice / 100 if signPrice else 0
  137. sellTime = item.get("sellTime")
  138. sellDays = item.get("sellDays")
  139. status = item.get("status") # 9:完成 8:待发货
  140. groupNum = item.get("groupNum")
  141. description = item.get("description")
  142. createTime = item.get("createTime")
  143. completionTime = item.get("completionTime") # 完成时间
  144. cover_url = item.get("cover", {}).get("url") # 封面图
  145. anchor_id = item.get("anchor", {}).get("id")
  146. anchor_userName = item.get("anchor", {}).get("userName")
  147. soldCount = item.get("soldCount")
  148. detailUrl = item.get("detailUrl")
  149. goodsUrl = item.get("goodsUrl")
  150. standardName = item.get("standardName") # 规格
  151. liveTaskTime = item.get("liveTaskTime") # 直播时间
  152. try:
  153. live_id, vodUrl = get_video_url(log, pid)
  154. except Exception as e:
  155. log.error(f"Error getting video URL: {e}")
  156. live_id, vodUrl = None, None
  157. parsed_item = {
  158. "pid": pid,
  159. "title": title,
  160. "serial": serial,
  161. "type_name": type_name,
  162. "is_pre": isPre,
  163. "count": count,
  164. "total_price": totalPrice,
  165. "sign_price": signPrice,
  166. "sell_time": sellTime,
  167. "sell_days": sellDays,
  168. "status": status,
  169. "group_num": groupNum,
  170. "description": description,
  171. "create_time": createTime,
  172. "completion_time": completionTime,
  173. "cover_url": cover_url,
  174. "anchor_id": anchor_id,
  175. "anchor_username": anchor_userName,
  176. "sold_count": soldCount,
  177. "detail_url": detailUrl,
  178. "goods_url": goodsUrl,
  179. "standard_name": standardName,
  180. "live_task_time": liveTaskTime,
  181. "live_id": live_id,
  182. "vod_url": vodUrl
  183. }
  184. # print(parsed_item)
  185. parsed_items.append(parsed_item)
  186. return parsed_items
  187. def get_vod_list(log, sql_pool, token):
  188. """
  189. 获取列表数据
  190. :param log: logger对象
  191. :param sql_pool: 数据库连接池
  192. :param token: token
  193. """
  194. page_num = 1
  195. max_pages = 2
  196. while page_num <= max_pages:
  197. log.debug(f"正在获取第 {page_num} 页的数据.................")
  198. page_result = get_vod_single_page(log, page_num, token)
  199. if not page_result:
  200. log.error(f"获取第 {page_num} 页失败 !!!")
  201. break
  202. # 每页获取后立即解析
  203. items = parse_list_items(log, page_result["items"])
  204. sql_pool.insert_many(table="super_vault_product_record", data_list=items, ignore=True)
  205. page_num += 1
  206. # ----------------------------------------------------------------------------------------------------------------------
  207. def get_report_single_page(log, page_num, detail_id, token):
  208. """
  209. 获取单页数据
  210. :param log: logger对象
  211. :param page_num: 页码
  212. :param detail_id: 商品id
  213. :param token: token
  214. :return: 数据
  215. """
  216. log.debug(f"正在获取第 {page_num} 页的 <拆卡报告> 数据.................")
  217. # token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJDSEFPWElOWElORyNBUFAiLCJhdWQiOiJDSEFPWElOWElORyIsIm5iZiI6MTc2OTU4MTI5NiwiZGF0YSI6Ijk1MjMiLCJpc3MiOiI3ViNweHlQZSIsImV4cCI6MTc3MDc4MTI5NiwiaWF0IjoxNzY5NTgxMjk2LCJqdGkiOiIyYjkwNzZhMS0wYjU1LTQ0ZjItOGZlZC0yMWZiZmI0ZjUyYWIifQ.iDzTZLDslCP0y2nc2Jp4TGEsNbQiCRKcUeRsIyG3iOg"
  218. url = "https://cxx.cardsvault.net/app/teamup/report/list"
  219. data = {
  220. "pageSize": 20,
  221. "my": 0,
  222. "pageNum": page_num,
  223. # "tid": 1780
  224. "tid": detail_id
  225. }
  226. HEADERS["Authorization"] = token
  227. response = requests.post(url, headers=HEADERS, json=data, timeout=22)
  228. # print(response.text)
  229. response.raise_for_status()
  230. result = response.json()
  231. if result.get("status") == 200:
  232. data = result.get("data", {})
  233. total = data.get("total", 0)
  234. current_page = data.get("pageNum", 1)
  235. items = data.get("data", [])
  236. log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
  237. log.debug(f"当前页数据数量: {len(items)}")
  238. return {
  239. "total": total,
  240. "current_page": current_page,
  241. "items": items
  242. }
  243. else:
  244. log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
  245. return None
  246. def parse_report_items(log, detail_id, items):
  247. """
  248. 解析列表项
  249. :param log: logger对象
  250. :param detail_id: 商品id
  251. :param items: 列表项
  252. :return: 解析后的列表项
  253. """
  254. parsed_items = []
  255. log.debug(f"正在解析 <拆卡报告> 列表项.................")
  256. for item in items:
  257. userName = item.get("userName")
  258. level = item.get("level")
  259. teamNameCn = item.get("teamNameCn")
  260. teamNameEn = item.get("teamNameEn")
  261. count = item.get("count")
  262. picture_url = item.get("picture", {}).get("url")
  263. alias = item.get("alias") # 别名
  264. createTime = item.get("createTime")
  265. data_dict = {
  266. "pid": detail_id,
  267. "user_name": userName,
  268. "level": level,
  269. "team_name_cn": teamNameCn,
  270. "team_name_en": teamNameEn,
  271. "count": count,
  272. "picture_url": picture_url,
  273. "alias": alias,
  274. "create_time": createTime
  275. }
  276. parsed_items.append(data_dict)
  277. return parsed_items
  278. def get_report_list(log, detail_id, token, sql_pool):
  279. """
  280. 获取列表数据
  281. :param log: logger对象
  282. :param detail_id: 商品id
  283. :param token: token
  284. :param sql_pool: 数据库连接池
  285. """
  286. page_num = 1
  287. total_pages = 99
  288. items_per_page = 20 # pageSize
  289. while page_num <= total_pages:
  290. log.debug(f"正在获取第 {page_num} 页的数据.................")
  291. page_result = get_report_single_page(log, page_num, detail_id, token)
  292. if not page_result:
  293. log.error(f"获取第 {page_num} 页失败 !!!")
  294. break
  295. # 第一次请求时更新真实的总页数
  296. if page_num == 1:
  297. total_count = page_result["total"]
  298. if total_count == 0:
  299. log.info("No new records found.")
  300. # 更改状态为2
  301. sql_pool.update_one_or_dict(
  302. table="super_vault_product_record",
  303. data={"report_state": 2},
  304. condition={"pid": detail_id}
  305. )
  306. break
  307. total_pages = (total_count + items_per_page - 1) // items_per_page
  308. log.info(f"总共 {total_pages} 页")
  309. items = parse_report_items(log, detail_id, page_result["items"])
  310. sql_pool.insert_many(table="super_vault_report_record", data_list=items, ignore=True)
  311. sql_pool.update_one_or_dict(
  312. table="super_vault_product_record",
  313. data={"report_state": 1},
  314. condition={"pid": detail_id}
  315. )
  316. page_num += 1
  317. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  318. def cxx_daily_main(log):
  319. """
  320. 主函数
  321. :param log: logger对象
  322. """
  323. log.info(
  324. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  325. # 配置 MySQL 连接池
  326. sql_pool = MySQLConnectionPool(log=log)
  327. if not sql_pool.check_pool_health():
  328. log.error("数据库连接池异常")
  329. raise RuntimeError("数据库连接池异常")
  330. try:
  331. token = sql_pool.select_one("SELECT token FROM super_vault_token")
  332. # 获取所有 pid
  333. # try:
  334. # get_vod_list(log, sql_pool, token[0])
  335. # except Exception as e:
  336. # log.error(f"Error fetching last_product_id: {e}")
  337. #
  338. # time.sleep(5)
  339. # 获取所有 report_state = 0 的 pid
  340. sql_detail_id_list = sql_pool.select_all("SELECT pid FROM super_vault_product_record WHERE report_state != 1")
  341. if sql_detail_id_list:
  342. sql_detail_id_list = [item[0] for item in sql_detail_id_list]
  343. for detail_id in sql_detail_id_list:
  344. try:
  345. get_report_list(log, detail_id, token[0], sql_pool)
  346. except Exception as e:
  347. log.error(f"Error fetching last_product_id: {e}")
  348. # 更改状态为3
  349. sql_pool.update_one_or_dict(
  350. table="super_vault_product_record",
  351. data={"report_state": 3},
  352. condition={"pid": detail_id}
  353. )
  354. else:
  355. log.info("No new records found.")
  356. except Exception as e:
  357. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  358. finally:
  359. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  360. if __name__ == '__main__':
  361. # get_vod_list(logger, None, '')
  362. # get_vod_single_page(logger, 1)
  363. # get_report_single_page(logger,1, '','')
  364. cxx_daily_main(logger)
  365. # schedule_task()