request_live_detail.py 12 KB


  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/2/13 11:26
  5. import base64
  6. import inspect
  7. import json
  8. import random
  9. import time
  10. import requests
  11. from typing import Dict
  12. from loguru import logger
  13. from datetime import datetime
  14. from Crypto.Cipher import AES
  15. from urllib.parse import quote
  16. from Crypto.Util.Padding import unpad
  17. from tenacity import retry, stop_after_attempt, wait_fixed
  18. base_live_url = "https://service.kaogujia.com"
  19. def after_log(retry_state):
  20. """
  21. retry 回调
  22. :param retry_state: RetryCallState 对象
  23. """
  24. log = retry_state.args[0] # 获取传入的 logger
  25. if retry_state.outcome.failed:
  26. log.warning(
  27. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  28. else:
  29. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  30. def decrypt_data(log, par_url, encrypted_data) -> Dict[str, str]:
  31. """
  32. 解密数据
  33. :param log:
  34. :param par_url:
  35. :param encrypted_data:
  36. :return:
  37. """
  38. log.info("开始解密数据 ->->->->->->->->->->->->->->->->->->->->->")
  39. if not isinstance(par_url, str):
  40. return {}
  41. # 对应原js中的str函数
  42. def transform_str(input_str):
  43. encoded = quote(input_str)
  44. return base64.b64encode(encoded.encode()).decode()
  45. str_result = transform_str(par_url) * 3
  46. org_key = str_result[:16]
  47. org_iv = str_result[12:28]
  48. # 使用Crypto库解析key和iv
  49. ikey = org_key.encode('utf-8')
  50. iiv = org_iv.encode('utf-8')
  51. # 解密
  52. cipher = AES.new(ikey, AES.MODE_CBC, iiv)
  53. decrypted_text = unpad(cipher.decrypt(base64.b64decode(encrypted_data)), AES.block_size).decode('utf-8')
  54. decrypted_text = json.loads(decrypted_text)
  55. # print(decrypted_text)
  56. return decrypted_text
  57. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  58. def get_sales_one_page(log, sql_info: tuple, page, headers):
  59. """
  60. 获取 单页 商品售卖信息
  61. :param headers:
  62. :param log:
  63. :param sql_info:
  64. :param page:
  65. :return dec_data: 解密后的数据
  66. """
  67. # uid = 'ZB8m8eWLDjG'
  68. # room_id = 'kL5wMRBEVWboo'
  69. # date_code = '20250206'
  70. uid = sql_info[1]
  71. room_id = sql_info[2]
  72. date_code = sql_info[3]
  73. # url = "https://service.kaogujia.com/api/live/skulist"
  74. par_url = '/api/live/skulist'
  75. url = f"{base_live_url}{par_url}"
  76. params = {
  77. "limit": "10",
  78. "page": page,
  79. "sort_field": "sales",
  80. "sort": "0"
  81. }
  82. data = {
  83. "room_id": room_id,
  84. "uid": uid,
  85. "date_code": date_code
  86. }
  87. response = requests.post(url, headers=headers, params=params, json=data, timeout=10)
  88. # print(response.text)
  89. response.raise_for_status()
  90. resp_json = response.json()
  91. if resp_json:
  92. if resp_json.get("code") == 200:
  93. enc_data = resp_json.get("data")
  94. dec_data = decrypt_data(log, par_url, enc_data)
  95. # print(dec_data)
  96. return dec_data
  97. else:
  98. log.warning(f"{inspect.currentframe().f_code.co_name}: {resp_json.get('message')}")
  99. else:
  100. log.warning(f"{inspect.currentframe().f_code.co_name} get resp_json")
  101. def parse_sales_list(log, all_items: list) -> list:
  102. """
  103. 解析商品售卖列表
  104. :param log:
  105. :param all_items:
  106. :return data_list: data_list -> list
  107. """
  108. try:
  109. data_list = []
  110. for item in all_items:
  111. promotion_id = item.get("promotion_id")
  112. product_id = item.get("product_id")
  113. product_title = item.get("title")
  114. product_cover = item.get("cover")
  115. product_sales = item.get("sales")
  116. # product_gmv = item.get("gmv")
  117. product_putaway_time = item.get("putaway")
  118. product_putaway_time = datetime.fromtimestamp(product_putaway_time).strftime(
  119. '%Y-%m-%d %H:%M:%S') if product_putaway_time else ""
  120. product_sold_out_time = item.get("sold_out")
  121. product_sold_out_time = datetime.fromtimestamp(product_sold_out_time).strftime(
  122. '%Y-%m-%d %H:%M:%S') if product_sold_out_time else ""
  123. dd_dict = {
  124. "promotion_id": promotion_id,
  125. "product_id": product_id,
  126. "product_title": product_title,
  127. "product_cover": product_cover,
  128. "product_sales": product_sales,
  129. # "product_gmv": product_gmv,
  130. "product_putaway_time": product_putaway_time,
  131. "product_sold_out_time": product_sold_out_time
  132. }
  133. data_list.append(dd_dict)
  134. return data_list
  135. except Exception as e:
  136. log.error(f"{inspect.currentframe().f_code.co_name} Error: {e}")
  137. return []
  138. def get_sales_list(log, sql_info: tuple, headers):
  139. """
  140. 获取商品售卖列表
  141. :param headers:
  142. :param log:
  143. :param sql_info:
  144. :return parse_data_list: 解析后的列表
  145. """
  146. page = 1
  147. all_items = []
  148. while True:
  149. try:
  150. log.debug(f"{inspect.currentframe().f_code.co_name}: 正在获取第 {page} 页数据")
  151. dec_data = get_sales_one_page(log, sql_info, page, headers)
  152. time.sleep(random.uniform(1.5, 2.5))
  153. items = dec_data.get('items', [])
  154. if not items:
  155. log.debug(f"{inspect.currentframe().f_code.co_name}: 没有更多数据")
  156. break # 如果没有更多数据,退出循环
  157. all_items.extend(items)
  158. pagination = dec_data.get('pagination', {})
  159. total_count = pagination.get('total_count', 0)
  160. if len(all_items) >= total_count:
  161. log.debug(f"{inspect.currentframe().f_code.co_name}: 已获取所有数据")
  162. break # 如果已获取所有数据,退出循环
  163. page += 1
  164. except Exception as e:
  165. log.error(f"{inspect.currentframe().f_code.co_name}, Error fetching page {page}: {e}")
  166. time.sleep(random.uniform(1.5, 2.5))
  167. break # 发生错误时退出循环
  168. parse_data_list = parse_sales_list(log, all_items)
  169. return parse_data_list
  170. def parse_live_detail(log, dec_data, sql_info: tuple, headers) -> list:
  171. """
  172. 解析直播详情
  173. :param headers:
  174. :param log:
  175. :param dec_data:
  176. :param sql_info:
  177. :return: info_list
  178. """
  179. try:
  180. log.info("开始解析 live_detail 数据......................")
  181. is_live = dec_data.get("is_live")
  182. if is_live == 0:
  183. live_create_time = dec_data.get("create_time") # 开播时间
  184. live_create_time = datetime.fromtimestamp(live_create_time).strftime(
  185. '%Y-%m-%d %H:%M:%S') if live_create_time else ""
  186. # live_update_time = dec_data.get("update_time")
  187. live_finish_time = dec_data.get("finish_time") # 下播时间
  188. live_finish_time = datetime.fromtimestamp(live_finish_time).strftime(
  189. '%Y-%m-%d %H:%M:%S') if live_finish_time else ""
  190. duration = dec_data.get("duration") # 直播时长(单位:秒)
  191. # 流量数据解析
  192. watch_users = dec_data.get("flow").get("watch_users") # 观看人次
  193. avg_users = dec_data.get("flow").get("avg_users") # 平均在线人数
  194. peak_users = dec_data.get("flow").get("peak_users") # 人气峰值
  195. through = dec_data.get("flow").get("through") # 穿透率
  196. exposed_num = dec_data.get("flow").get("exposed_num") # 曝光量
  197. stay_duration = dec_data.get("stay_duration") # 平均停留时长(单位:秒)
  198. new_fans_count = dec_data.get("flow").get("new_fans_count") # 新增粉丝数
  199. inc_fans_clubs = dec_data.get("flow").get("inc_fans_clubs") # 新增粉丝团
  200. turn_ratio = dec_data.get("flow").get("turn_ratio") # 转粉率
  201. interaction_ratio = dec_data.get("flow").get("ratio") # 互动率
  202. # 成交数据解析
  203. gmv = dec_data.get("volume").get("gmv") # 直播销售额
  204. sales = dec_data.get("volume").get("sales") # 直播销量
  205. atv = dec_data.get("volume").get("atv") # 平均件单价
  206. explain_duration = dec_data.get("volume").get("explain_duration") # 讲解时长(单位:秒)
  207. sku_count = dec_data.get("volume").get("sku_count") # 推广商品数
  208. uv = dec_data.get("volume").get("uv") # UV价值
  209. cvr = dec_data.get("volume").get("cvr") # 转化率
  210. rpm = dec_data.get("volume").get("rpm") # RPM
  211. response_sales_list = get_sales_list(log, sql_info, headers)
  212. info_list = []
  213. for item in response_sales_list:
  214. live_detail_info = (
  215. sql_info[2], live_create_time, live_finish_time, duration, watch_users, avg_users, peak_users,
  216. through, exposed_num, stay_duration, new_fans_count, inc_fans_clubs, turn_ratio, interaction_ratio,
  217. gmv, sales, atv, explain_duration, sku_count, uv, cvr, rpm, item["promotion_id"],
  218. item["product_id"], item["product_title"], item["product_cover"], item["product_sales"],
  219. item["product_putaway_time"], item["product_sold_out_time"]
  220. )
  221. # print(live_detail_info)
  222. info_list.append(live_detail_info)
  223. return info_list
  224. # try:
  225. # save_live_detail(sql_pool, info_list)
  226. # update_state(sql_pool, sql_info[1], 1)
  227. # except Exception as e:
  228. # log.warning(f"{inspect.currentframe().f_code.co_name} 保存数据时出错: {e}")
  229. # update_state(sql_pool, sql_info[1], 2)
  230. elif is_live == 1:
  231. log.info("直播间开播中, 等待后续抓取...............")
  232. else:
  233. log.info("直播间状态is_live其他情况...............")
  234. except Exception as e:
  235. log.warning(f"{inspect.currentframe().f_code.co_name} error: {e}")
  236. return []
  237. def get_live_detail(log, sql_info: tuple, headers):
  238. """
  239. 获取直播详情数据
  240. :param headers:
  241. :param log: logger对象
  242. :param sql_info: 元组 --> ("ZB8m8eWLDjG", "kL5wMRBEVWboo", "20250206") (uid, room_id, date_code)
  243. :return: ret_info_list
  244. """
  245. # uid = 'ZB8m8eWLDjG'
  246. # room_id = 'kL5wMRBEVWboo'
  247. # date_code = '20250206'
  248. uid = sql_info[1]
  249. room_id = sql_info[2]
  250. date_code = sql_info[3]
  251. par_url = f"/api/live/detail/{uid}/{date_code}/{room_id}"
  252. url = f'{base_live_url}{par_url}'
  253. log.info("开始抓取 live_detail 数据......................")
  254. response = requests.get(url, headers=headers, timeout=10)
  255. # print(response.text)
  256. response.raise_for_status()
  257. resp_json = response.json()
  258. if resp_json:
  259. if resp_json.get("code") == 200:
  260. enc_data = resp_json.get("data")
  261. dec_data = decrypt_data(log, par_url, enc_data)
  262. # print(dec_data)
  263. ret_info_list = parse_live_detail(log, dec_data, sql_info, headers)
  264. return ret_info_list
  265. else:
  266. log.warning(f"{inspect.currentframe().f_code.co_name}: {resp_json.get('message')}")
  267. else:
  268. log.warning(f"{inspect.currentframe().f_code.co_name} get resp_json")
  269. if __name__ == '__main__':
  270. KGJ_HEADERS = {
  271. "accept": "*/*",
  272. "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
  273. "authorization": "Bearer eyJhbGciOiJIUzUxMiJ9.eyJhdWQiOiIxMDAwIiwiaXNzIjoia2FvZ3VqaWEuY29tIiwianRpIjoiNDI4OWQ1ZTdhODY4NDBjMmFiMTBiZGE3OTY1YTRhZDYiLCJzaWQiOjU2OTY1ODQsImlhdCI6MTc0MDAzODQ4NCwiZXhwIjoxNzQwNjQzMjg0LCJid2UiOjEsInR5cCI6MSwicF9id2UiOjB9.uGe1TroAEJ6VohgtOgNwf_V3pbtNUOv8ZA9R9r99TAF-Gblw8YcMp9kddrKs1CKrhe8amhVd3EYHiC6stI0YWw",
  274. "content-type": "application/json",
  275. "origin": "https://www.kaogujia.com",
  276. "priority": "u=1, i",
  277. "referer": "https://www.kaogujia.com/",
  278. "sec-ch-ua": "\"Not(A:Brand\";v=\"99\", \"Google Chrome\";v=\"133\", \"Chromium\";v=\"133\"",
  279. "sec-ch-ua-mobile": "?0",
  280. "sec-ch-ua-platform": "\"Windows\"",
  281. "sec-fetch-dest": "empty",
  282. "sec-fetch-mode": "cors",
  283. "sec-fetch-site": "same-site",
  284. "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
  285. "version_code": "3.1"
  286. }
  287. s_info = (1, "ZB8m8eWLDjG", "kL5wMRBEVWboo", "20250206")
  288. get_live_detail(logger, s_info, KGJ_HEADERS)
  289. # get_sales_one_page(logger, None, None,1)
  290. # get_sales_list(logger, None)