|
|
@@ -0,0 +1,605 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+# Author : Charley
|
|
|
+# Python : 3.10.8
|
|
|
+# Date : 2025/2/8 13:11
|
|
|
+import json
|
|
|
+import random
|
|
|
+import time
|
|
|
+import base64
|
|
|
+import schedule
|
|
|
+import urllib3
|
|
|
+import requests
|
|
|
+from typing import Dict
|
|
|
+from loguru import logger
|
|
|
+from urllib.parse import quote
|
|
|
+from Crypto.Cipher import AES
|
|
|
+from Crypto.Util.Padding import unpad
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from mysq_pool import MySQLConnectionPool
|
|
|
+from tenacity import retry, stop_after_attempt, wait_fixed
|
|
|
+
|
|
|
+from request_live_detail import get_live_detail
|
|
|
+
|
|
|
+urllib3.disable_warnings()
|
|
|
+
|
|
|
+logger.remove()
|
|
|
+logger.add("./kapai_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
|
|
|
+ format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
|
|
|
+ level="DEBUG", retention="14 day")
|
|
|
+
|
|
|
+MAX_PAGE = 100
|
|
|
+
|
|
|
+
|
|
|
+# HEADERS = settings.KGJ_HEADERS
|
|
|
+
|
|
|
+
|
|
|
+def after_log(retry_state):
|
|
|
+ """
|
|
|
+ retry 回调
|
|
|
+ :param retry_state: RetryCallState 对象
|
|
|
+ """
|
|
|
+ log = retry_state.args[0] # 获取传入的 logger
|
|
|
+ if retry_state.outcome.failed:
|
|
|
+ log.warning(
|
|
|
+ f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
|
|
|
+ else:
|
|
|
+ log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
|
|
|
+
|
|
|
+
|
|
|
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
|
|
|
+def get_proxys(log):
|
|
|
+ """
|
|
|
+ 获取代理
|
|
|
+ :return: 代理
|
|
|
+ """
|
|
|
+ # tunnel = "h991.kdltps.com:15818"
|
|
|
+ # kdl_username = "t12136177769785"
|
|
|
+ # kdl_password = "ety9bdi8"
|
|
|
+
|
|
|
+ tunnel = "x371.kdltps.com:15818"
|
|
|
+ kdl_username = "t13753103189895"
|
|
|
+ kdl_password = "o0yefv6z"
|
|
|
+ try:
|
|
|
+ proxies = {
|
|
|
+ "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
|
|
|
+ "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
|
|
|
+ }
|
|
|
+ return proxies
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"Error getting proxy: {e}")
|
|
|
+ raise e
|
|
|
+
|
|
|
+
|
|
|
+def decimal_to_percent(decimal_value):
|
|
|
+ if not decimal_value:
|
|
|
+ return "0%"
|
|
|
+
|
|
|
+ # 将小数转换为百分比并保留一位小数
|
|
|
+ percentage = decimal_value * 100
|
|
|
+ # 使用格式化去除末尾多余的零
|
|
|
+ return f"{percentage:g}%" # 'g' 格式会自动去掉不必要的零
|
|
|
+
|
|
|
+
|
|
|
+def decimal_to_percent_with_dist(decimal_value):
|
|
|
+ if not decimal_value:
|
|
|
+ return "0%"
|
|
|
+
|
|
|
+ # 将小数转换为百分比
|
|
|
+ percentage = decimal_value * 100
|
|
|
+
|
|
|
+ # 使用格式化字符串,最多保留两位小数,去掉不必要的零
|
|
|
+ formatted_percentage = f"{percentage:.2f}".rstrip('0').rstrip('.')
|
|
|
+
|
|
|
+ return f"{formatted_percentage}%"
|
|
|
+
|
|
|
+
|
|
|
+def convert_seconds_to_hours_minutes(total_seconds: int):
|
|
|
+ # 计算完整的小时数
|
|
|
+ hours = total_seconds // 3600
|
|
|
+ remaining_seconds_after_hours = total_seconds % 3600
|
|
|
+
|
|
|
+ # 计算完整的分钟数
|
|
|
+ minutes = remaining_seconds_after_hours // 60
|
|
|
+ result = f"{hours}小时{minutes}分" if hours > 0 else f"{minutes}分" if minutes > 0 else "0分"
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+def get_date(offset):
|
|
|
+ """
|
|
|
+ 获取指定偏移量的日期,格式为 YYYYMMDD。
|
|
|
+
|
|
|
+ 参数:
|
|
|
+ offset (int): 日期偏移量,0 表示今天,-1 表示昨天,-6 表示6天前,-7 表示7天前
|
|
|
+
|
|
|
+ 返回:
|
|
|
+ str: 格式为 YYYYMMDD 的日期字符串
|
|
|
+ """
|
|
|
+ today = datetime.today()
|
|
|
+ target_date = today + timedelta(days=offset)
|
|
|
+ return target_date.strftime('%Y%m%d')
|
|
|
+
|
|
|
+
|
|
|
+def decrypt_data(log, par_url, encrypted_data) -> Dict[str, str]:
|
|
|
+ """
|
|
|
+ 解密数据
|
|
|
+ :param log:
|
|
|
+ :param par_url:
|
|
|
+ :param encrypted_data:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ log.info("开始解密数据 ->->->->->->->->->->->->->->->->->->->->->")
|
|
|
+ if not isinstance(par_url, str):
|
|
|
+ return {}
|
|
|
+
|
|
|
+ # 对应原js中的str函数
|
|
|
+ def transform_str(input_str):
|
|
|
+ encoded = quote(input_str)
|
|
|
+ return base64.b64encode(encoded.encode()).decode()
|
|
|
+
|
|
|
+ str_result = transform_str(par_url) * 3
|
|
|
+ org_key = str_result[:16]
|
|
|
+ org_iv = str_result[12:28]
|
|
|
+
|
|
|
+ # 使用Crypto库解析key和iv
|
|
|
+ ikey = org_key.encode('utf-8')
|
|
|
+ iiv = org_iv.encode('utf-8')
|
|
|
+
|
|
|
+ # 解密
|
|
|
+ cipher = AES.new(ikey, AES.MODE_CBC, iiv)
|
|
|
+ decrypted_text = unpad(cipher.decrypt(base64.b64decode(encrypted_data)), AES.block_size).decode('utf-8')
|
|
|
+ decrypted_text = json.loads(decrypted_text)
|
|
|
+ # print(decrypted_text)
|
|
|
+
|
|
|
+ return decrypted_text
|
|
|
+
|
|
|
+
|
|
|
+def save_product_list(sql_pool, info_list):
|
|
|
+ sql = """
|
|
|
+ INSERT INTO kgj_kapai_product_list_record (product_id, title, price, price_str, cos_ratio, sales, live_sales, video_sales, other_sales, live_ratio, video_ratio, other_ratio, shop_id, shop_name, shop_cover, keyword)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
|
+ """
|
|
|
+ sql_pool.insert_all(sql, info_list)
|
|
|
+
|
|
|
+
|
|
|
+def save_live_detail(sql_pool, info_list):
|
|
|
+ """
|
|
|
+ 保存直播详情数据
|
|
|
+ :param sql_pool: 数据库连接池对象
|
|
|
+ :param info_list: info_list 列表
|
|
|
+ """
|
|
|
+ sql = """
|
|
|
+ INSERT INTO kgj_kapai_live_detail_record (room_id, live_create_time, live_finish_time, duration, watch_users, avg_users, peak_users, through, exposed_num, stay_duration, new_fans_count, inc_fans_clubs, turn_ratio, interaction_ratio, gmv, sales, atv, explain_duration, sku_count, uv, cvr, rpm, promotion_id, product_id, product_title, product_cover, product_sales, product_putaway_time, product_sold_out_time)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
|
+ """
|
|
|
+ sql_pool.insert_all(sql, info_list)
|
|
|
+
|
|
|
+
|
|
|
+def update_linked_live_state(sql_pool, sql_id):
|
|
|
+ """
|
|
|
+ 更新 kgj_linked_live_record 状态
|
|
|
+ :param sql_pool: 数据库连接池对象
|
|
|
+ :param sql_id: sql_id
|
|
|
+ """
|
|
|
+ sql = f"UPDATE kgj_kapai_linked_live_record SET live_detail_state = 1 WHERE id = %s"
|
|
|
+ sql_pool.update_one(sql, (sql_id,))
|
|
|
+
|
|
|
+
|
|
|
+def parse_product_list(log, resp_json: dict, sql_pool, keyword, stop_paging):
|
|
|
+ log.info("开始解析 product_list 数据......................")
|
|
|
+ items = resp_json.get("items", [])
|
|
|
+
|
|
|
+ info_list = []
|
|
|
+ for item in items:
|
|
|
+ sales = item.get("stat", {}).get("sales")
|
|
|
+ # 检查 sales 是否为 0 或 '0'
|
|
|
+ if sales == 0 or sales == '0':
|
|
|
+ stop_paging[0] = True # 设置停止翻页的标志
|
|
|
+ log.info("已到达销量为0的页码,停止当前产品列表的翻页..............")
|
|
|
+ break # 停止解析当前页面的产品列表
|
|
|
+
|
|
|
+ product_id = item.get("product_id")
|
|
|
+
|
|
|
+ # 20250523 删除根据 product_id 去重
|
|
|
+ # if product_id in sql_product_id_list:
|
|
|
+ # log.debug(f"{product_id} 已存在,跳过..............")
|
|
|
+ # continue
|
|
|
+ title = item.get("title")
|
|
|
+ # 如果标题中 包含["卡夹", "卡砖", "卡膜", "鼠标垫"]中的任意一个,则将keyword设置为 "周边"
|
|
|
+ if any(keyword in title for keyword in ["卡夹", "卡砖", "卡膜", "鼠标垫"]):
|
|
|
+ log.debug(f"{title} 包含关键字,为周边产品..............")
|
|
|
+ keyword = "周边"
|
|
|
+
|
|
|
+ price = item.get("price")
|
|
|
+ price_str = item.get("price_str")
|
|
|
+ cos_ratio = item.get("cos_ratio")
|
|
|
+
|
|
|
+ live_sales = item.get("stat", {}).get("live_sales")
|
|
|
+ video_sales = item.get("stat", {}).get("video_sales")
|
|
|
+ other_sales = item.get("stat", {}).get("other_sales")
|
|
|
+
|
|
|
+ live_ratio = item.get("market_type", {}).get("live_ratio")
|
|
|
+ live_ratio = decimal_to_percent(live_ratio) if live_ratio else "0%"
|
|
|
+
|
|
|
+ video_ratio = item.get("market_type", {}).get("video_ratio")
|
|
|
+ video_ratio = decimal_to_percent(video_ratio) if video_ratio else "0%"
|
|
|
+
|
|
|
+ other_ratio = item.get("market_type", {}).get("other_ratio")
|
|
|
+ other_ratio = decimal_to_percent(other_ratio) if other_ratio else "0%"
|
|
|
+
|
|
|
+ shop_id = item.get("shop_id")
|
|
|
+ shop_name = item.get("shop_name")
|
|
|
+ shop_cover = item.get("shop_cover")
|
|
|
+
|
|
|
+ info = (product_id, title, price, price_str, cos_ratio, sales, live_sales, video_sales, other_sales, live_ratio,
|
|
|
+ video_ratio, other_ratio, shop_id, shop_name, shop_cover, keyword)
|
|
|
+ info_list.append(info)
|
|
|
+ if info_list:
|
|
|
+ log.info(f"解析到 {len(info_list)} 条数据......................")
|
|
|
+ save_product_list(sql_pool, info_list)
|
|
|
+ else:
|
|
|
+ log.info("没有解析到数据......................")
|
|
|
+ # save_product_list(sql_pool, info_list)
|
|
|
+
|
|
|
+
|
|
|
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
|
|
|
+def get_kgj_product_one_page(log, page, keyword, sql_pool, stop_paging, headers):
|
|
|
+ url = "https://service.kaogujia.com/api/sku/search"
|
|
|
+ params = {
|
|
|
+ "limit": "50",
|
|
|
+ "page": str(page),
|
|
|
+ "sort_field": "sales",
|
|
|
+ "sort": "0"
|
|
|
+ }
|
|
|
+ data = {
|
|
|
+ "period": 7,
|
|
|
+ "keyword": keyword
|
|
|
+ }
|
|
|
+ response = requests.post(url, headers=headers, params=params, json=data)
|
|
|
+ # print(response.text)
|
|
|
+ # print(response)
|
|
|
+ response.raise_for_status()
|
|
|
+ resp_json = response.json()
|
|
|
+ if resp_json:
|
|
|
+ if resp_json.get("code") == 200:
|
|
|
+ enc_data = resp_json.get("data")
|
|
|
+ par_url = '/api/sku/search'
|
|
|
+ dec_data = decrypt_data(log, par_url, enc_data)
|
|
|
+ # print(dec_data)
|
|
|
+ parse_product_list(log, dec_data, sql_pool, keyword, stop_paging)
|
|
|
+ else:
|
|
|
+ log.warning(f"Error get_kgj_product_one_page: {resp_json.get('message')}")
|
|
|
+ else:
|
|
|
+ log.warning(f"Error resp_json")
|
|
|
+
|
|
|
+
|
|
|
+def get_kgj_product_list(log, keyword, sql_pool, headers):
|
|
|
+ stop_paging = [False] # 使用列表来存储标志,以便在函数间传递
|
|
|
+ for page in range(1, MAX_PAGE + 1):
|
|
|
+ try:
|
|
|
+ log.info(f"Getting kgj product list page {page}, keyword:{keyword}")
|
|
|
+ get_kgj_product_one_page(log, page, keyword, sql_pool, stop_paging, headers)
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"Error getting kgj product list: {e}")
|
|
|
+ time.sleep(random.randint(4, 6))
|
|
|
+ continue
|
|
|
+
|
|
|
+ time.sleep(random.randint(4, 6))
|
|
|
+
|
|
|
+ if stop_paging[0]: # 检查停止翻页的标志
|
|
|
+ log.info("停止翻页,因为 sales 为 0 或 '0'")
|
|
|
+ break
|
|
|
+
|
|
|
+
|
|
|
+def save_product_overview(sql_pool, info_list):
|
|
|
+ sql = """
|
|
|
+ INSERT INTO kgj_kapai_product_overview_record (product_id, date_all, sales_str, live_sales_str, video_sales_str, other_sales_str, users_str, lives_str, videos_str, live_ratio, video_ratio, other_ratio)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
|
+ """
|
|
|
+ sql_pool.insert_all(sql, info_list)
|
|
|
+
|
|
|
+
|
|
|
+def update_state(sql_pool, product_id, state_str, state_int):
|
|
|
+ """
|
|
|
+ 更新 kgj_product_list_record 状态
|
|
|
+ :param sql_pool: 数据库连接池对象
|
|
|
+ :param product_id: 产品ID
|
|
|
+ :param state_str: 状态字段名称
|
|
|
+ :param state_int: 状态值
|
|
|
+ """
|
|
|
+ sql = f"UPDATE kgj_kapai_product_list_record SET {state_str} = %s WHERE product_id = %s"
|
|
|
+ sql_pool.update_one(sql, (state_int, product_id))
|
|
|
+
|
|
|
+
|
|
|
+def get_product_overview_percent(log, product_id, headers):
|
|
|
+ log.info(f"Getting kgj product overview percent, product_id:{product_id}")
|
|
|
+ url = "https://service.kaogujia.com/api/sku/overview/dist"
|
|
|
+ data = {
|
|
|
+ "product_id": product_id,
|
|
|
+ "from_dt": get_date(-7),
|
|
|
+ "to_dt": get_date(-1)
|
|
|
+ }
|
|
|
+ response = requests.post(url, headers=headers, json=data)
|
|
|
+ response.raise_for_status()
|
|
|
+ resp_json = response.json()
|
|
|
+
|
|
|
+ if resp_json:
|
|
|
+ if resp_json.get("code") == 200:
|
|
|
+ enc_data = resp_json.get("data")
|
|
|
+ par_url = '/api/sku/overview/dist'
|
|
|
+ dec_data = decrypt_data(log, par_url, enc_data)
|
|
|
+ # print(dec_data)
|
|
|
+ sales_list = dec_data.get("sales")
|
|
|
+ live_ratio = "0%"
|
|
|
+ video_ratio = "0%"
|
|
|
+ other_ratio = "0%"
|
|
|
+ for sales in sales_list:
|
|
|
+ if sales.get("name") == "直播带货":
|
|
|
+ live_ratio = sales.get("percent")
|
|
|
+ live_ratio = decimal_to_percent_with_dist(live_ratio) if live_ratio else "0%"
|
|
|
+ if sales.get("name") == "视频带货":
|
|
|
+ video_ratio = sales.get("percent")
|
|
|
+ video_ratio = decimal_to_percent_with_dist(video_ratio) if video_ratio else "0%"
|
|
|
+ if sales.get("name") == "商品卡":
|
|
|
+ other_ratio = sales.get("percent")
|
|
|
+ other_ratio = decimal_to_percent_with_dist(other_ratio) if other_ratio else "1%"
|
|
|
+
|
|
|
+ return live_ratio, video_ratio, other_ratio
|
|
|
+
|
|
|
+ else:
|
|
|
+ log.warning(f"Error get_kgj_product_overview: {resp_json.get('message')}")
|
|
|
+ # update_state(sql_pool, product_id, "product_state", 2)
|
|
|
+ return "0%", "0%", "0%"
|
|
|
+ else:
|
|
|
+ log.warning(f"Error get_kgj_product_overview resp_json")
|
|
|
+ # update_state(sql_pool, product_id, "product_state", 2)
|
|
|
+ return "0%", "0%", "0%"
|
|
|
+
|
|
|
+
|
|
|
+def parse_product_overview(log, sql_pool, resp_json: dict, product_id, headers):
|
|
|
+ log.info("开始解析 product_overview 数据......................")
|
|
|
+ trend_list = resp_json.get("trend_list", [])
|
|
|
+ if not trend_list:
|
|
|
+ log.debug(f"parse_product_overview trend_list is empty")
|
|
|
+ update_state(sql_pool, product_id, "product_state", 2)
|
|
|
+ else:
|
|
|
+ # 获取商品概览页的占比信息
|
|
|
+ live_ratio, video_ratio, other_ratio = get_product_overview_percent(log, product_id, headers)
|
|
|
+ info_list = []
|
|
|
+ for trend in trend_list:
|
|
|
+ date_all = trend.get("date_all")
|
|
|
+ sales_str = trend.get("sales_str")
|
|
|
+ live_sales_str = trend.get("live_sales_str")
|
|
|
+ video_sales_str = trend.get("video_sales_str")
|
|
|
+ other_sales_str = trend.get("other_sales_str")
|
|
|
+ users_str = trend.get("users_str")
|
|
|
+ lives_str = trend.get("lives_str")
|
|
|
+ videos_str = trend.get("videos_str")
|
|
|
+ info = (
|
|
|
+ product_id, date_all, sales_str, live_sales_str, video_sales_str, other_sales_str, users_str, lives_str,
|
|
|
+ videos_str, live_ratio, video_ratio, other_ratio)
|
|
|
+ info_list.append(info)
|
|
|
+ save_product_overview(sql_pool, info_list)
|
|
|
+ # sql = "UPDATE kgj_product_list_record SET product_state = 2 WHERE product_id = %s"
|
|
|
+ update_state(sql_pool, product_id, "product_state", 1)
|
|
|
+
|
|
|
+
|
|
|
+def get_kgj_product_overview(log, sql_pool, product_id, headers):
|
|
|
+ url = f"https://service.kaogujia.com/api/sku/trend/{product_id}"
|
|
|
+ params = {
|
|
|
+ "begin": get_date(-7),
|
|
|
+ # "begin": "20250115",
|
|
|
+ # "end": "20250121"
|
|
|
+ "end": get_date(-1)
|
|
|
+ }
|
|
|
+ response = requests.get(url, headers=headers, params=params)
|
|
|
+ # print(response.text)
|
|
|
+ response.raise_for_status()
|
|
|
+ resp_json = response.json()
|
|
|
+
|
|
|
+ if resp_json:
|
|
|
+ if resp_json.get("code") == 200:
|
|
|
+ enc_data = resp_json.get("data")
|
|
|
+ par_url = f'/api/sku/trend/{product_id}'
|
|
|
+ dec_data = decrypt_data(log, par_url, enc_data)
|
|
|
+ # print(dec_data)
|
|
|
+ parse_product_overview(log, sql_pool, dec_data, product_id, headers)
|
|
|
+ else:
|
|
|
+ log.warning(f"Error get_kgj_product_overview: {resp_json.get('message')}")
|
|
|
+ update_state(sql_pool, product_id, "product_state", 2)
|
|
|
+ else:
|
|
|
+ log.warning(f"Error get_kgj_product_overview resp_json")
|
|
|
+ update_state(sql_pool, product_id, "product_state", 2)
|
|
|
+
|
|
|
+
|
|
|
+def save_linded_live(sql_pool, info_list):
|
|
|
+ sql = """
|
|
|
+ INSERT INTO kgj_kapai_linked_live_record (product_id, title, duration_str, pub_time_str, uid, nick_name, fans_count, price, price_str, sales, gmv, pi, room_id, date_code)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
|
+ """
|
|
|
+ sql_pool.insert_all(sql, info_list)
|
|
|
+
|
|
|
+
|
|
|
+def parse_linked_live(log, dec_data, sql_pool, product_id):
|
|
|
+ log.info("开始解析 linked_live 数据......................")
|
|
|
+ items = dec_data.get("items", [])
|
|
|
+ if not items:
|
|
|
+ log.debug(f"parse_linked_live items is empty")
|
|
|
+ update_state(sql_pool, product_id, "live_state", 2)
|
|
|
+ else:
|
|
|
+ info_list = []
|
|
|
+ for item in items:
|
|
|
+ title = item.get("title")
|
|
|
+ duration = item.get("duration")
|
|
|
+ duration_str = convert_seconds_to_hours_minutes(duration)
|
|
|
+ pub_time = item.get("pub_time")
|
|
|
+ pub_time_str = datetime.fromtimestamp(pub_time).strftime('%Y-%m-%d %H:%M:%S') if pub_time else ""
|
|
|
+ uid = item.get("uid")
|
|
|
+ nick_name = item.get("nick_name")
|
|
|
+ fans_count = item.get("fans_count")
|
|
|
+ price = item.get("price")
|
|
|
+ price_str = item.get("price_str")
|
|
|
+ sales = item.get("sales")
|
|
|
+ gmv = item.get("gmv")
|
|
|
+ pi = item.get("pi")
|
|
|
+ room_id = item.get("room_id")
|
|
|
+ date_code = item.get("date_code")
|
|
|
+
|
|
|
+ info = (
|
|
|
+ product_id, title, duration_str, pub_time_str, uid, nick_name, fans_count, price, price_str, sales, gmv,
|
|
|
+ pi, room_id, date_code)
|
|
|
+ info_list.append(info)
|
|
|
+ save_linded_live(sql_pool, info_list)
|
|
|
+ update_state(sql_pool, product_id, "live_state", 1)
|
|
|
+
|
|
|
+
|
|
|
+def get_linked_live(log, sql_pool, product_id, headers):
|
|
|
+ url = "https://service.kaogujia.com/api/sku/live/list"
|
|
|
+ params = {
|
|
|
+ "limit": "10",
|
|
|
+ "page": "1",
|
|
|
+ "sort_field": "gmv",
|
|
|
+ "sort": "0"
|
|
|
+ }
|
|
|
+ data = {
|
|
|
+ "keyword": "",
|
|
|
+ "min_time": get_date(-6),
|
|
|
+ "max_time": get_date(0),
|
|
|
+ "product_id": product_id
|
|
|
+ }
|
|
|
+ response = requests.post(url, headers=headers, params=params, json=data)
|
|
|
+ # print(response.text)
|
|
|
+ response.raise_for_status()
|
|
|
+ resp_json = response.json()
|
|
|
+ if resp_json:
|
|
|
+ if resp_json.get("code") == 200:
|
|
|
+ enc_data = resp_json.get("data")
|
|
|
+ par_url = '/api/sku/live/list'
|
|
|
+ dec_data = decrypt_data(log, par_url, enc_data)
|
|
|
+ # print(dec_data)
|
|
|
+ parse_linked_live(log, dec_data, sql_pool, product_id)
|
|
|
+ else:
|
|
|
+ log.warning(f"Error get_linked_live: {resp_json.get('message')}")
|
|
|
+ update_state(sql_pool, product_id, "live_state", 2)
|
|
|
+ else:
|
|
|
+ log.warning(f"Error get_linked_live resp_json")
|
|
|
+ update_state(sql_pool, product_id, "live_state", 2)
|
|
|
+
|
|
|
+
|
|
|
+@retry(stop=stop_after_attempt(500), wait=wait_fixed(600), after=after_log)
|
|
|
+def kgj_kapai_main(log):
|
|
|
+ log.info("开始运行 kgj_kapai_main 爬虫任务............................................................")
|
|
|
+ sql_pool = MySQLConnectionPool(log=log)
|
|
|
+ if not sql_pool:
|
|
|
+ log.error("数据库连接失败")
|
|
|
+ raise Exception("数据库连接失败")
|
|
|
+
|
|
|
+ kgj_token = sql_pool.select_one("SELECT token FROM kgj_token")
|
|
|
+ # 195的账号
|
|
|
+ KGJ_HEADERS = {
|
|
|
+ "accept": "*/*",
|
|
|
+ "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
|
|
|
+ "authorization": kgj_token[0],
|
|
|
+ "content-type": "application/json",
|
|
|
+ "origin": "https://www.kaogujia.com",
|
|
|
+ "priority": "u=1, i",
|
|
|
+ "referer": "https://www.kaogujia.com/",
|
|
|
+ "sec-ch-ua": "\"Not(A:Brand\";v=\"99\", \"Google Chrome\";v=\"133\", \"Chromium\";v=\"133\"",
|
|
|
+ "sec-ch-ua-mobile": "?0",
|
|
|
+ "sec-ch-ua-platform": "\"Windows\"",
|
|
|
+ "sec-fetch-dest": "empty",
|
|
|
+ "sec-fetch-mode": "cors",
|
|
|
+ "sec-fetch-site": "same-site",
|
|
|
+ "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
|
|
|
+ "version_code": "3.1"
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ keyword = "卡牌"
|
|
|
+ log.info("开始获取 product_list 数据............................................................")
|
|
|
+ # sql_product_id_list = sql_pool.select_all("SELECT DISTINCT product_id FROM kgj_kapai_product_list_record")
|
|
|
+ # sql_product_id_list = [item[0] for item in sql_product_id_list]
|
|
|
+ try:
|
|
|
+ get_kgj_product_list(log, keyword, sql_pool, KGJ_HEADERS)
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"Error main -> getting kgj product list: {e}")
|
|
|
+
|
|
|
+ # sql_product_id_list.clear()
|
|
|
+ log.info("product_list 数据获取完成............................................................")
|
|
|
+
|
|
|
+ time.sleep(5)
|
|
|
+
|
|
|
+ log.info("开始获取 product_overview 数据............................................................")
|
|
|
+ product_id_list_for_product = sql_pool.select_all(
|
|
|
+ "SELECT product_id FROM kgj_kapai_product_list_record WHERE product_state = 0")
|
|
|
+ product_id_list = [item[0] for item in product_id_list_for_product]
|
|
|
+ for product_id in product_id_list:
|
|
|
+ try:
|
|
|
+ log.info(f"开始获取 product_id: {product_id} 的 product_overview 数据............................")
|
|
|
+ get_kgj_product_overview(log, sql_pool, product_id, KGJ_HEADERS)
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"Error main -> getting kgj product overview: {e}")
|
|
|
+ time.sleep(random.randint(4, 6))
|
|
|
+ continue
|
|
|
+ time.sleep(random.randint(4, 6))
|
|
|
+ log.info("product_overview 数据获取完成............................................................")
|
|
|
+
|
|
|
+ time.sleep(5)
|
|
|
+
|
|
|
+ log.info("开始获取 linked_live 数据............................................................")
|
|
|
+ product_id_list_for_live = sql_pool.select_all(
|
|
|
+ "SELECT product_id FROM kgj_kapai_product_list_record WHERE live_state = 0")
|
|
|
+ product_id_list = [item[0] for item in product_id_list_for_live]
|
|
|
+ for product_id in product_id_list:
|
|
|
+ try:
|
|
|
+ log.info(f"开始获取 product_id: {product_id} 的 linked_live 数据............................")
|
|
|
+ get_linked_live(log, sql_pool, product_id, KGJ_HEADERS)
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"Error main -> getting kgj linked_live: {e}")
|
|
|
+ time.sleep(random.randint(4, 6))
|
|
|
+ continue
|
|
|
+ time.sleep(random.randint(4, 6))
|
|
|
+ log.info("linked_live 数据获取完成............................................................")
|
|
|
+
|
|
|
+ time.sleep(5)
|
|
|
+
|
|
|
+ log.info("开始获取 live_detail 数据............................................................")
|
|
|
+ sql_room_id_list = sql_pool.select_all(
|
|
|
+ "SELECT id, uid, room_id, date_code FROM kgj_kapai_linked_live_record WHERE live_detail_state = 0 and uid is not null and room_id is not null and date_code is not null")
|
|
|
+ sql_room_id_list = [item for item in sql_room_id_list]
|
|
|
+ if sql_room_id_list:
|
|
|
+ for sql_info in sql_room_id_list:
|
|
|
+ try:
|
|
|
+ log.info(f"开始获取 room_id: {sql_info[2]} 的 live_detail 数据............................")
|
|
|
+ live_detail_info_list = get_live_detail(log, sql_info, KGJ_HEADERS)
|
|
|
+ save_live_detail(sql_pool, live_detail_info_list)
|
|
|
+ update_linked_live_state(sql_pool, sql_info[0])
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"Error main -> getting kgj live_detail: {e}")
|
|
|
+ # update_linked_live_state(sql_pool, sql_info[0])
|
|
|
+ time.sleep(random.randint(4, 6))
|
|
|
+ continue
|
|
|
+ time.sleep(random.randint(4, 6))
|
|
|
+ log.info("live_detail 数据获取完成............................................................")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ log.error(e)
|
|
|
+ finally:
|
|
|
+ log.info("爬虫程序 kgj_kapai_main 运行结束,等待下一轮的采集任务.............")
|
|
|
+
|
|
|
+
|
|
|
+def schedule_task():
|
|
|
+ """
|
|
|
+ 设置定时任务
|
|
|
+ """
|
|
|
+ # 立即运行一次任务
|
|
|
+ # kgj_kapai_main(logger)
|
|
|
+
|
|
|
+ # 设置定时任务 考古加 -> 卡牌 一周一次 卡牌类的周三跑 抓取时间比较久 和其他几个类错开时间
|
|
|
+ schedule.every().wednesday.at("01:01").do(kgj_kapai_main, logger)
|
|
|
+ while True:
|
|
|
+ schedule.run_pending()
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ schedule_task()
|