super_vault_history_spider.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2026/1/28 11:12
  5. import inspect
  6. import requests
  7. from loguru import logger
  8. from mysql_pool import MySQLConnectionPool
  9. from tenacity import retry, stop_after_attempt, wait_fixed
  10. """
  11. SuperVault
  12. """
  13. # logger.remove()
  14. # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  15. # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  16. # level="DEBUG", retention="7 day")
  17. HEADERS = {
  18. "User-Agent": "okhttp/4.9.0",
  19. # "Connection": "Keep-Alive",
  20. # "Accept-Encoding": "gzip",
  21. "Authorization": "",
  22. "CXX-APP-API-VERSION": "V2", # 必须添加
  23. # "deviceType": "2",
  24. # "udid": "20f902c10f6163a19bf137d801731d9f",
  25. # "time": str(int(time.time() * 1000)),
  26. "Content-Type": "application/json; charset=UTF-8"
  27. }
  28. def after_log(retry_state):
  29. """
  30. retry 回调
  31. :param retry_state: RetryCallState 对象
  32. """
  33. # 检查 args 是否存在且不为空
  34. if retry_state.args and len(retry_state.args) > 0:
  35. log = retry_state.args[0] # 获取传入的 logger
  36. else:
  37. log = logger # 使用全局 logger
  38. if retry_state.outcome.failed:
  39. log.warning(
  40. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  41. else:
  42. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  43. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  44. def get_proxys(log):
  45. """
  46. 获取代理
  47. :return: 代理
  48. """
  49. tunnel = "x371.kdltps.com:15818"
  50. kdl_username = "t13753103189895"
  51. kdl_password = "o0yefv6z"
  52. try:
  53. proxies = {
  54. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
  55. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
  56. }
  57. return proxies
  58. except Exception as e:
  59. log.error(f"Error getting proxy: {e}")
  60. raise e
  61. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  62. def get_video_url(log, pid):
  63. """
  64. 获取视频地址
  65. :param log: logger对象
  66. :param pid: 视频地址
  67. :return: 视频地址
  68. """
  69. log.debug(f"正在获取视频地址: {pid}")
  70. url = "https://cxx.cardsvault.net/app/teamup/detail"
  71. params = {
  72. # "id": "1730"
  73. "id": str(pid)
  74. }
  75. response = requests.get(url, headers=HEADERS, params=params, timeout=22)
  76. response.raise_for_status()
  77. result = response.json()
  78. liveInfo = result.get("data", {}).get("liveInfo", {})
  79. live_id = liveInfo.get("id") if liveInfo else None
  80. vodUrl = liveInfo.get("vod_info", {}).get("vodUrl")
  81. return live_id, vodUrl
  82. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  83. def get_vod_single_page(log, page_num=1, token=""):
  84. """
  85. 获取单页数据
  86. :param log: logger对象
  87. :param page_num: 页码
  88. :param token: token
  89. :return: 数据
  90. """
  91. url = "https://cxx.cardsvault.net/app/teamup/vod/list"
  92. data = {
  93. "pageSize": 20,
  94. "pageNum": page_num
  95. }
  96. HEADERS["Authorization"] = token
  97. response = requests.post(url, headers=HEADERS, json=data, timeout=22)
  98. response.raise_for_status()
  99. result = response.json()
  100. # print(result)
  101. if result.get("status") == 200:
  102. data = result.get("data", {})
  103. total = data.get("total", 0)
  104. current_page = data.get("pageNum", 1)
  105. items = data.get("data", [])
  106. log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
  107. log.debug(f"当前页数据数量: {len(items)}")
  108. return {
  109. "total": total,
  110. "current_page": current_page,
  111. "items": items,
  112. }
  113. else:
  114. log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
  115. return None
  116. def parse_list_items(log, items):
  117. """
  118. 解析列表项
  119. :param log: logger对象
  120. :param items: 列表项
  121. :return: 解析后的列表项
  122. """
  123. parsed_items = []
  124. log.debug(f"正在解析列表项.................")
  125. for item in items:
  126. pid = item.get("id")
  127. serial = item.get("serial") # 编号
  128. title = item.get("title")
  129. type_name = item.get("typeName") # 随机卡种
  130. isPre = item.get("isPre")
  131. count = item.get("count")
  132. totalPrice = item.get("totalPrice")
  133. totalPrice = totalPrice / 100 if totalPrice else 0
  134. signPrice = item.get("signPrice")
  135. signPrice = signPrice / 100 if signPrice else 0
  136. sellTime = item.get("sellTime")
  137. sellDays = item.get("sellDays")
  138. status = item.get("status") # 9:完成 8:待发货
  139. groupNum = item.get("groupNum")
  140. description = item.get("description")
  141. createTime = item.get("createTime")
  142. completionTime = item.get("completionTime") # 完成时间
  143. cover_url = item.get("cover", {}).get("url") # 封面图
  144. anchor_id = item.get("anchor", {}).get("id")
  145. anchor_userName = item.get("anchor", {}).get("userName")
  146. soldCount = item.get("soldCount")
  147. detailUrl = item.get("detailUrl")
  148. goodsUrl = item.get("goodsUrl")
  149. standardName = item.get("standardName") # 规格
  150. liveTaskTime = item.get("liveTaskTime") # 直播时间
  151. try:
  152. live_id, vodUrl = get_video_url(log, pid)
  153. except Exception as e:
  154. log.error(f"Error getting video URL: {e}")
  155. live_id, vodUrl = None, None
  156. parsed_item = {
  157. "pid": pid,
  158. "title": title,
  159. "serial": serial,
  160. "type_name": type_name,
  161. "is_pre": isPre,
  162. "count": count,
  163. "total_price": totalPrice,
  164. "sign_price": signPrice,
  165. "sell_time": sellTime,
  166. "sell_days": sellDays,
  167. "status": status,
  168. "group_num": groupNum,
  169. "description": description,
  170. "create_time": createTime,
  171. "completion_time": completionTime,
  172. "cover_url": cover_url,
  173. "anchor_id": anchor_id,
  174. "anchor_username": anchor_userName,
  175. "sold_count": soldCount,
  176. "detail_url": detailUrl,
  177. "goods_url": goodsUrl,
  178. "standard_name": standardName,
  179. "live_task_time": liveTaskTime,
  180. "live_id": live_id,
  181. "vod_url": vodUrl
  182. }
  183. # print(parsed_item)
  184. parsed_items.append(parsed_item)
  185. return parsed_items
  186. def get_vod_list(log, sql_pool, token):
  187. """
  188. 获取列表数据
  189. :param log: logger对象
  190. :param sql_pool: 数据库连接池
  191. :param token: token
  192. """
  193. page_num = 1
  194. total_pages = 999
  195. items_per_page = 20 # pageSize
  196. while page_num <= total_pages:
  197. log.debug(f"正在获取第 {page_num} 页的数据.................")
  198. page_result = get_vod_single_page(log, page_num, token)
  199. if not page_result:
  200. log.error(f"获取第 {page_num} 页失败 !!!")
  201. break
  202. # 第一次请求时更新真实的总页数
  203. if page_num == 1:
  204. total_count = page_result["total"]
  205. total_pages = (total_count + items_per_page - 1) // items_per_page
  206. log.info(f"总共 {total_pages} 页")
  207. # 每页获取后立即解析
  208. items = parse_list_items(log, page_result["items"])
  209. sql_pool.insert_many(table="super_vault_product_record", data_list=items, ignore=True)
  210. page_num += 1
  211. # ----------------------------------------------------------------------------------------------------------------------
  212. def get_report_single_page(log, page_num, detail_id, token):
  213. """
  214. 获取单页数据
  215. :param log: logger对象
  216. :param page_num: 页码
  217. :param detail_id: 商品id
  218. :param token: token
  219. :return: 数据
  220. """
  221. log.debug(f"正在获取第 {page_num} 页的 <拆卡报告> 数据.................")
  222. # token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJDSEFPWElOWElORyNBUFAiLCJhdWQiOiJDSEFPWElOWElORyIsIm5iZiI6MTc2OTU4MTI5NiwiZGF0YSI6Ijk1MjMiLCJpc3MiOiI3ViNweHlQZSIsImV4cCI6MTc3MDc4MTI5NiwiaWF0IjoxNzY5NTgxMjk2LCJqdGkiOiIyYjkwNzZhMS0wYjU1LTQ0ZjItOGZlZC0yMWZiZmI0ZjUyYWIifQ.iDzTZLDslCP0y2nc2Jp4TGEsNbQiCRKcUeRsIyG3iOg"
  223. url = "https://cxx.cardsvault.net/app/teamup/report/list"
  224. data = {
  225. "pageSize": 20,
  226. "my": 0,
  227. "pageNum": page_num,
  228. # "tid": 1780
  229. "tid": detail_id
  230. }
  231. HEADERS["Authorization"] = token
  232. response = requests.post(url, headers=HEADERS, json=data, timeout=22)
  233. # print(response.text)
  234. response.raise_for_status()
  235. result = response.json()
  236. if result.get("status") == 200:
  237. data = result.get("data", {})
  238. total = data.get("total", 0)
  239. current_page = data.get("pageNum", 1)
  240. items = data.get("data", [])
  241. log.info(f"当前查询的是 ->->-> 第 {current_page} 页,共 {total} 条记录")
  242. log.debug(f"当前页数据数量: {len(items)}")
  243. return {
  244. "total": total,
  245. "current_page": current_page,
  246. "items": items
  247. }
  248. else:
  249. log.error(f"API 返回错误: {result.get('msg', '未知错误')}")
  250. return None
  251. def parse_report_items(log, detail_id, items):
  252. """
  253. 解析列表项
  254. :param log: logger对象
  255. :param detail_id: 商品id
  256. :param items: 列表项
  257. :return: 解析后的列表项
  258. """
  259. parsed_items = []
  260. log.debug(f"正在解析 <拆卡报告> 列表项.................")
  261. for item in items:
  262. userName = item.get("userName")
  263. level = item.get("level")
  264. teamNameCn = item.get("teamNameCn")
  265. teamNameEn = item.get("teamNameEn")
  266. count = item.get("count")
  267. picture_url = item.get("picture", {}).get("url")
  268. alias = item.get("alias") # 别名
  269. createTime = item.get("createTime")
  270. data_dict = {
  271. "pid": detail_id,
  272. "user_name": userName,
  273. "level": level,
  274. "team_name_cn": teamNameCn,
  275. "team_name_en": teamNameEn,
  276. "count": count,
  277. "picture_url": picture_url,
  278. "alias": alias,
  279. "create_time": createTime
  280. }
  281. parsed_items.append(data_dict)
  282. return parsed_items
  283. def get_report_list(log, detail_id, token, sql_pool):
  284. """
  285. 获取列表数据
  286. :param log: logger对象
  287. :param detail_id: 商品id
  288. :param token: token
  289. :param sql_pool: 数据库连接池
  290. """
  291. page_num = 1
  292. total_pages = 9
  293. items_per_page = 20 # pageSize
  294. while page_num <= total_pages:
  295. log.debug(f"正在获取第 {page_num} 页的数据.................")
  296. page_result = get_report_single_page(log, page_num, detail_id, token)
  297. if not page_result:
  298. log.error(f"获取第 {page_num} 页失败 !!!")
  299. break
  300. # 第一次请求时更新真实的总页数
  301. if page_num == 1:
  302. total_count = page_result["total"]
  303. if total_count == 0:
  304. log.info("No new records found.")
  305. # 更改状态为2
  306. sql_pool.update_one_or_dict(
  307. table="super_vault_product_record",
  308. data={"report_state": 2},
  309. condition={"pid": detail_id}
  310. )
  311. break
  312. total_pages = (total_count + items_per_page - 1) // items_per_page
  313. log.info(f"总共 {total_pages} 页")
  314. items = parse_report_items(log, detail_id, page_result["items"])
  315. sql_pool.insert_many(table="super_vault_report_record", data_list=items, ignore=True)
  316. page_num += 1
  317. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  318. def cxx_his_main(log):
  319. """
  320. 主函数
  321. :param log: logger对象
  322. """
  323. log.info(
  324. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  325. # 配置 MySQL 连接池
  326. sql_pool = MySQLConnectionPool(log=log)
  327. if not sql_pool.check_pool_health():
  328. log.error("数据库连接池异常")
  329. raise RuntimeError("数据库连接池异常")
  330. try:
  331. token = sql_pool.select_one("SELECT token FROM super_vault_token")
  332. # 获取所有 pid
  333. try:
  334. get_vod_list(log, sql_pool, token[0])
  335. except Exception as e:
  336. log.error(f"Error fetching last_product_id: {e}")
  337. # time.sleep(5)
  338. # 获取所有 report_state = 0 的 pid
  339. sql_detail_id_list = sql_pool.select_all("SELECT pid FROM super_vault_product_record WHERE report_state = 0")
  340. if sql_detail_id_list:
  341. sql_detail_id_list = [item[0] for item in sql_detail_id_list]
  342. for detail_id in sql_detail_id_list:
  343. try:
  344. get_report_list(log, detail_id, token[0], sql_pool)
  345. sql_pool.update_one_or_dict(
  346. table="super_vault_product_record",
  347. data={"report_state": 1},
  348. condition={"pid": detail_id}
  349. )
  350. except Exception as e:
  351. log.error(f"Error fetching last_product_id: {e}")
  352. # 更改状态为3
  353. sql_pool.update_one_or_dict(
  354. table="super_vault_product_record",
  355. data={"report_state": 3},
  356. condition={"pid": detail_id}
  357. )
  358. else:
  359. log.info("No new records found.")
  360. except Exception as e:
  361. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  362. finally:
  363. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  364. if __name__ == '__main__':
  365. # get_vod_list(logger, None, '')
  366. # get_vod_single_page(logger, 1)
  367. cxx_his_main(logger)
  368. # get_report_single_page(logger,1, '','')