| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/2/13 11:26
- import base64
- import inspect
- import json
- import random
- import time
- import requests
- from typing import Dict
- from loguru import logger
- from datetime import datetime
- from Crypto.Cipher import AES
- from urllib.parse import quote
- from Crypto.Util.Padding import unpad
- from tenacity import retry, stop_after_attempt, wait_fixed
- base_live_url = "https://service.kaogujia.com"
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- log = retry_state.args[0] # 获取传入的 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- def decrypt_data(log, par_url, encrypted_data) -> Dict[str, str]:
- """
- 解密数据
- :param log:
- :param par_url:
- :param encrypted_data:
- :return:
- """
- log.info("开始解密数据 ->->->->->->->->->->->->->->->->->->->->->")
- if not isinstance(par_url, str):
- return {}
- # 对应原js中的str函数
- def transform_str(input_str):
- encoded = quote(input_str)
- return base64.b64encode(encoded.encode()).decode()
- str_result = transform_str(par_url) * 3
- org_key = str_result[:16]
- org_iv = str_result[12:28]
- # 使用Crypto库解析key和iv
- ikey = org_key.encode('utf-8')
- iiv = org_iv.encode('utf-8')
- # 解密
- cipher = AES.new(ikey, AES.MODE_CBC, iiv)
- decrypted_text = unpad(cipher.decrypt(base64.b64decode(encrypted_data)), AES.block_size).decode('utf-8')
- decrypted_text = json.loads(decrypted_text)
- # print(decrypted_text)
- return decrypted_text
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_sales_one_page(log, sql_info: tuple, page, headers):
- """
- 获取 单页 商品售卖信息
- :param headers:
- :param log:
- :param sql_info:
- :param page:
- :return dec_data: 解密后的数据
- """
- # uid = 'ZB8m8eWLDjG'
- # room_id = 'kL5wMRBEVWboo'
- # date_code = '20250206'
- uid = sql_info[1]
- room_id = sql_info[2]
- date_code = sql_info[3]
- # url = "https://service.kaogujia.com/api/live/skulist"
- par_url = '/api/live/skulist'
- url = f"{base_live_url}{par_url}"
- params = {
- "limit": "10",
- "page": page,
- "sort_field": "sales",
- "sort": "0"
- }
- data = {
- "room_id": room_id,
- "uid": uid,
- "date_code": date_code
- }
- response = requests.post(url, headers=headers, params=params, json=data, timeout=10)
- # print(response.text)
- response.raise_for_status()
- resp_json = response.json()
- if resp_json:
- if resp_json.get("code") == 200:
- enc_data = resp_json.get("data")
- dec_data = decrypt_data(log, par_url, enc_data)
- # print(dec_data)
- return dec_data
- else:
- log.warning(f"{inspect.currentframe().f_code.co_name}: {resp_json.get('message')}")
- else:
- log.warning(f"{inspect.currentframe().f_code.co_name} get resp_json")
- def parse_sales_list(log, all_items: list) -> list:
- """
- 解析商品售卖列表
- :param log:
- :param all_items:
- :return data_list: data_list -> list
- """
- try:
- data_list = []
- for item in all_items:
- promotion_id = item.get("promotion_id")
- product_id = item.get("product_id")
- product_title = item.get("title")
- product_cover = item.get("cover")
- product_sales = item.get("sales")
- # product_gmv = item.get("gmv")
- product_putaway_time = item.get("putaway")
- product_putaway_time = datetime.fromtimestamp(product_putaway_time).strftime(
- '%Y-%m-%d %H:%M:%S') if product_putaway_time else ""
- product_sold_out_time = item.get("sold_out")
- product_sold_out_time = datetime.fromtimestamp(product_sold_out_time).strftime(
- '%Y-%m-%d %H:%M:%S') if product_sold_out_time else ""
- dd_dict = {
- "promotion_id": promotion_id,
- "product_id": product_id,
- "product_title": product_title,
- "product_cover": product_cover,
- "product_sales": product_sales,
- # "product_gmv": product_gmv,
- "product_putaway_time": product_putaway_time,
- "product_sold_out_time": product_sold_out_time
- }
- data_list.append(dd_dict)
- return data_list
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} Error: {e}")
- return []
- def get_sales_list(log, sql_info: tuple, headers):
- """
- 获取商品售卖列表
- :param headers:
- :param log:
- :param sql_info:
- :return parse_data_list: 解析后的列表
- """
- page = 1
- all_items = []
- while True:
- try:
- log.debug(f"{inspect.currentframe().f_code.co_name}: 正在获取第 {page} 页数据")
- dec_data = get_sales_one_page(log, sql_info, page, headers)
- time.sleep(random.randint(4, 6))
- items = dec_data.get('items', [])
- if not items:
- log.debug(f"{inspect.currentframe().f_code.co_name}: 没有更多数据")
- break # 如果没有更多数据,退出循环
- all_items.extend(items)
- pagination = dec_data.get('pagination', {})
- total_count = pagination.get('total_count', 0)
- if len(all_items) >= total_count:
- log.debug(f"{inspect.currentframe().f_code.co_name}: 已获取所有数据")
- break # 如果已获取所有数据,退出循环
- page += 1
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name}, Error fetching page {page}: {e}")
- time.sleep(random.randint(4, 6))
- break # 发生错误时退出循环
- parse_data_list = parse_sales_list(log, all_items)
- return parse_data_list
- def parse_live_detail(log, dec_data, sql_info: tuple, headers) -> list:
- """
- 解析直播详情
- :param headers:
- :param log:
- :param dec_data:
- :param sql_info:
- :return: info_list
- """
- try:
- log.info("开始解析 live_detail 数据......................")
- is_live = dec_data.get("is_live")
- if is_live == 0:
- live_create_time = dec_data.get("create_time") # 开播时间
- live_create_time = datetime.fromtimestamp(live_create_time).strftime(
- '%Y-%m-%d %H:%M:%S') if live_create_time else ""
- # live_update_time = dec_data.get("update_time")
- live_finish_time = dec_data.get("finish_time") # 下播时间
- live_finish_time = datetime.fromtimestamp(live_finish_time).strftime(
- '%Y-%m-%d %H:%M:%S') if live_finish_time else ""
- duration = dec_data.get("duration") # 直播时长(单位:秒)
- # 流量数据解析
- watch_users = dec_data.get("flow").get("watch_users") # 观看人次
- avg_users = dec_data.get("flow").get("avg_users") # 平均在线人数
- peak_users = dec_data.get("flow").get("peak_users") # 人气峰值
- through = dec_data.get("flow").get("through") # 穿透率
- exposed_num = dec_data.get("flow").get("exposed_num") # 曝光量
- stay_duration = dec_data.get("stay_duration") # 平均停留时长(单位:秒)
- new_fans_count = dec_data.get("flow").get("new_fans_count") # 新增粉丝数
- inc_fans_clubs = dec_data.get("flow").get("inc_fans_clubs") # 新增粉丝团
- turn_ratio = dec_data.get("flow").get("turn_ratio") # 转粉率
- interaction_ratio = dec_data.get("flow").get("ratio") # 互动率
- # 成交数据解析
- gmv = dec_data.get("volume").get("gmv") # 直播销售额
- sales = dec_data.get("volume").get("sales") # 直播销量
- atv = dec_data.get("volume").get("atv") # 平均件单价
- explain_duration = dec_data.get("volume").get("explain_duration") # 讲解时长(单位:秒)
- sku_count = dec_data.get("volume").get("sku_count") # 推广商品数
- uv = dec_data.get("volume").get("uv") # UV价值
- cvr = dec_data.get("volume").get("cvr") # 转化率
- rpm = dec_data.get("volume").get("rpm") # RPM
- response_sales_list = get_sales_list(log, sql_info, headers)
- info_list = []
- for item in response_sales_list:
- live_detail_info = (
- sql_info[2], live_create_time, live_finish_time, duration, watch_users, avg_users, peak_users,
- through, exposed_num, stay_duration, new_fans_count, inc_fans_clubs, turn_ratio, interaction_ratio,
- gmv, sales, atv, explain_duration, sku_count, uv, cvr, rpm, item["promotion_id"],
- item["product_id"], item["product_title"], item["product_cover"], item["product_sales"],
- item["product_putaway_time"], item["product_sold_out_time"]
- )
- # print(live_detail_info)
- info_list.append(live_detail_info)
- return info_list
- # try:
- # save_live_detail(sql_pool, info_list)
- # update_state(sql_pool, sql_info[1], 1)
- # except Exception as e:
- # log.warning(f"{inspect.currentframe().f_code.co_name} 保存数据时出错: {e}")
- # update_state(sql_pool, sql_info[1], 2)
- elif is_live == 1:
- log.info("直播间开播中, 等待后续抓取...............")
- else:
- log.info("直播间状态is_live其他情况...............")
- except Exception as e:
- log.warning(f"{inspect.currentframe().f_code.co_name} error: {e}")
- return []
- def get_live_detail(log, sql_info: tuple, headers):
- """
- 获取直播详情数据
- :param headers:
- :param log: logger对象
- :param sql_info: 元组 --> ("ZB8m8eWLDjG", "kL5wMRBEVWboo", "20250206") (uid, room_id, date_code)
- :return: ret_info_list
- """
- # uid = 'ZB8m8eWLDjG'
- # room_id = 'kL5wMRBEVWboo'
- # date_code = '20250206'
- uid = sql_info[1]
- room_id = sql_info[2]
- date_code = sql_info[3]
- par_url = f"/api/live/detail/{uid}/{date_code}/{room_id}"
- url = f'{base_live_url}{par_url}'
- log.info("开始抓取 live_detail 数据......................")
- response = requests.get(url, headers=headers, timeout=10)
- # print(response.text)
- response.raise_for_status()
- resp_json = response.json()
- if resp_json:
- if resp_json.get("code") == 200:
- enc_data = resp_json.get("data")
- dec_data = decrypt_data(log, par_url, enc_data)
- # print(dec_data)
- ret_info_list = parse_live_detail(log, dec_data, sql_info, headers)
- return ret_info_list
- else:
- log.warning(f"{inspect.currentframe().f_code.co_name}: {resp_json.get('message')}")
- else:
- log.warning(f"{inspect.currentframe().f_code.co_name} get resp_json")
- if __name__ == '__main__':
- KGJ_HEADERS = {
- "accept": "*/*",
- "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
- "authorization": "Bearer eyJhbGciOiJIUzUxMiJ9.eyJhdWQiOiIxMDAwIiwiaXNzIjoia2FvZ3VqaWEuY29tIiwianRpIjoiNDI4OWQ1ZTdhODY4NDBjMmFiMTBiZGE3OTY1YTRhZDYiLCJzaWQiOjU2OTY1ODQsImlhdCI6MTc0MDAzODQ4NCwiZXhwIjoxNzQwNjQzMjg0LCJid2UiOjEsInR5cCI6MSwicF9id2UiOjB9.uGe1TroAEJ6VohgtOgNwf_V3pbtNUOv8ZA9R9r99TAF-Gblw8YcMp9kddrKs1CKrhe8amhVd3EYHiC6stI0YWw",
- "content-type": "application/json",
- "origin": "https://www.kaogujia.com",
- "priority": "u=1, i",
- "referer": "https://www.kaogujia.com/",
- "sec-ch-ua": "\"Not(A:Brand\";v=\"99\", \"Google Chrome\";v=\"133\", \"Chromium\";v=\"133\"",
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": "\"Windows\"",
- "sec-fetch-dest": "empty",
- "sec-fetch-mode": "cors",
- "sec-fetch-site": "same-site",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
- "version_code": "3.1"
- }
- s_info = (1, "ZB8m8eWLDjG", "kL5wMRBEVWboo", "20250206")
- get_live_detail(logger, s_info, KGJ_HEADERS)
- # get_sales_one_page(logger, None, None,1)
- # get_sales_list(logger, None)
|