| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/2/8 13:11
- import json
- import random
- import time
- import base64
- import schedule
- import urllib3
- import requests
- from typing import Dict
- from loguru import logger
- from urllib.parse import quote
- from Crypto.Cipher import AES
- from Crypto.Util.Padding import unpad
- from datetime import datetime, timedelta
- from mysq_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- from request_live_detail import get_live_detail
- urllib3.disable_warnings()
- logger.remove()
- logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="14 day")
- MAX_PAGE = 100
- # HEADERS = settings.KGJ_HEADERS
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- log = retry_state.args[0] # 获取传入的 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_proxys(log):
- """
- 获取代理
- :return: 代理
- """
- # tunnel = "h991.kdltps.com:15818"
- # kdl_username = "t12136177769785"
- # kdl_password = "ety9bdi8"
- tunnel = "x371.kdltps.com:15818"
- kdl_username = "t13753103189895"
- kdl_password = "o0yefv6z"
- try:
- proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
- }
- return proxies
- except Exception as e:
- log.error(f"Error getting proxy: {e}")
- raise e
- def decimal_to_percent(decimal_value):
- if not decimal_value:
- return "0%"
- # 将小数转换为百分比并保留一位小数
- percentage = decimal_value * 100
- # 使用格式化去除末尾多余的零
- return f"{percentage:g}%" # 'g' 格式会自动去掉不必要的零
- def decimal_to_percent_with_dist(decimal_value):
- if not decimal_value:
- return "0%"
- # 将小数转换为百分比
- percentage = decimal_value * 100
- # 使用格式化字符串,最多保留两位小数,去掉不必要的零
- formatted_percentage = f"{percentage:.2f}".rstrip('0').rstrip('.')
- return f"{formatted_percentage}%"
- def convert_seconds_to_hours_minutes(total_seconds: int):
- # 计算完整的小时数
- hours = total_seconds // 3600
- remaining_seconds_after_hours = total_seconds % 3600
- # 计算完整的分钟数
- minutes = remaining_seconds_after_hours // 60
- result = f"{hours}小时{minutes}分" if hours > 0 else f"{minutes}分" if minutes > 0 else "0分"
- return result
- def get_date(offset):
- """
- 获取指定偏移量的日期,格式为 YYYYMMDD。
- 参数:
- offset (int): 日期偏移量,0 表示今天,-1 表示昨天,-6 表示6天前,-7 表示7天前
- 返回:
- str: 格式为 YYYYMMDD 的日期字符串
- """
- today = datetime.today()
- target_date = today + timedelta(days=offset)
- return target_date.strftime('%Y%m%d')
- def decrypt_data(log, par_url, encrypted_data) -> Dict[str, str]:
- """
- 解密数据
- :param log:
- :param par_url:
- :param encrypted_data:
- :return:
- """
- log.info("开始解密数据 ->->->->->->->->->->->->->->->->->->->->->")
- if not isinstance(par_url, str):
- return {}
- # 对应原js中的str函数
- def transform_str(input_str):
- encoded = quote(input_str)
- return base64.b64encode(encoded.encode()).decode()
- str_result = transform_str(par_url) * 3
- org_key = str_result[:16]
- org_iv = str_result[12:28]
- # 使用Crypto库解析key和iv
- ikey = org_key.encode('utf-8')
- iiv = org_iv.encode('utf-8')
- # 解密
- cipher = AES.new(ikey, AES.MODE_CBC, iiv)
- decrypted_text = unpad(cipher.decrypt(base64.b64decode(encrypted_data)), AES.block_size).decode('utf-8')
- decrypted_text = json.loads(decrypted_text)
- # print(decrypted_text)
- return decrypted_text
- def save_product_list(sql_pool, info_list):
- sql = """
- INSERT INTO kgj_kapai_product_list_record (product_id, title, price, price_str, cos_ratio, sales, live_sales, video_sales, other_sales, live_ratio, video_ratio, other_ratio, shop_id, shop_name, shop_cover, keyword)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- sql_pool.insert_all(sql, info_list)
- def save_live_detail(sql_pool, info_list):
- """
- 保存直播详情数据
- :param sql_pool: 数据库连接池对象
- :param info_list: info_list 列表
- """
- sql = """
- INSERT INTO kgj_kapai_live_detail_record (room_id, live_create_time, live_finish_time, duration, watch_users, avg_users, peak_users, through, exposed_num, stay_duration, new_fans_count, inc_fans_clubs, turn_ratio, interaction_ratio, gmv, sales, atv, explain_duration, sku_count, uv, cvr, rpm, promotion_id, product_id, product_title, product_cover, product_sales, product_putaway_time, product_sold_out_time)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- sql_pool.insert_all(sql, info_list)
- def update_linked_live_state(sql_pool, sql_id):
- """
- 更新 kgj_linked_live_record 状态
- :param sql_pool: 数据库连接池对象
- :param sql_id: sql_id
- """
- sql = f"UPDATE kgj_kapai_linked_live_record SET live_detail_state = 1 WHERE id = %s"
- sql_pool.update_one(sql, (sql_id,))
- def parse_product_list(log, resp_json: dict, sql_pool, keyword, stop_paging):
- log.info("开始解析 product_list 数据......................")
- items = resp_json.get("items", [])
- info_list = []
- for item in items:
- sales = item.get("stat", {}).get("sales")
- # 检查 sales 是否为 0 或 '0'
- if sales == 0 or sales == '0':
- stop_paging[0] = True # 设置停止翻页的标志
- log.info("已到达销量为0的页码,停止当前产品列表的翻页..............")
- break # 停止解析当前页面的产品列表
- product_id = item.get("product_id")
- # 20250523 删除根据 product_id 去重
- # if product_id in sql_product_id_list:
- # log.debug(f"{product_id} 已存在,跳过..............")
- # continue
- title = item.get("title")
- # 如果标题中 包含["卡夹", "卡砖", "卡膜", "鼠标垫"]中的任意一个,则将keyword设置为 "周边"
- if any(keyword in title for keyword in ["卡夹", "卡砖", "卡膜", "鼠标垫"]):
- log.debug(f"{title} 包含关键字,为周边产品..............")
- keyword = "周边"
- price = item.get("price")
- price_str = item.get("price_str")
- cos_ratio = item.get("cos_ratio")
- live_sales = item.get("stat", {}).get("live_sales")
- video_sales = item.get("stat", {}).get("video_sales")
- other_sales = item.get("stat", {}).get("other_sales")
- live_ratio = item.get("market_type", {}).get("live_ratio")
- live_ratio = decimal_to_percent(live_ratio) if live_ratio else "0%"
- video_ratio = item.get("market_type", {}).get("video_ratio")
- video_ratio = decimal_to_percent(video_ratio) if video_ratio else "0%"
- other_ratio = item.get("market_type", {}).get("other_ratio")
- other_ratio = decimal_to_percent(other_ratio) if other_ratio else "0%"
- shop_id = item.get("shop_id")
- shop_name = item.get("shop_name")
- shop_cover = item.get("shop_cover")
- info = (product_id, title, price, price_str, cos_ratio, sales, live_sales, video_sales, other_sales, live_ratio,
- video_ratio, other_ratio, shop_id, shop_name, shop_cover, keyword)
- info_list.append(info)
- if info_list:
- log.info(f"解析到 {len(info_list)} 条数据......................")
- save_product_list(sql_pool, info_list)
- else:
- log.info("没有解析到数据......................")
- # save_product_list(sql_pool, info_list)
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_kgj_product_one_page(log, page, keyword, sql_pool, stop_paging, headers):
- url = "https://service.kaogujia.com/api/sku/search"
- params = {
- "limit": "50",
- "page": str(page),
- "sort_field": "sales",
- "sort": "0"
- }
- data = {
- "period": 7,
- "keyword": keyword
- }
- response = requests.post(url, headers=headers, params=params, json=data)
- # print(response.text)
- # print(response)
- response.raise_for_status()
- resp_json = response.json()
- if resp_json:
- if resp_json.get("code") == 200:
- enc_data = resp_json.get("data")
- par_url = '/api/sku/search'
- dec_data = decrypt_data(log, par_url, enc_data)
- # print(dec_data)
- parse_product_list(log, dec_data, sql_pool, keyword, stop_paging)
- else:
- log.warning(f"Error get_kgj_product_one_page: {resp_json.get('message')}")
- else:
- log.warning(f"Error resp_json")
- def get_kgj_product_list(log, keyword, sql_pool, headers):
- stop_paging = [False] # 使用列表来存储标志,以便在函数间传递
- for page in range(1, MAX_PAGE + 1):
- try:
- log.info(f"Getting kgj product list page {page}, keyword:{keyword}")
- get_kgj_product_one_page(log, page, keyword, sql_pool, stop_paging, headers)
- except Exception as e:
- log.error(f"Error getting kgj product list: {e}")
- time.sleep(random.randint(4, 6))
- continue
- time.sleep(random.randint(4, 6))
- if stop_paging[0]: # 检查停止翻页的标志
- log.info("停止翻页,因为 sales 为 0 或 '0'")
- break
- def save_product_overview(sql_pool, info_list):
- sql = """
- INSERT INTO kgj_kapai_product_overview_record (product_id, date_all, sales_str, live_sales_str, video_sales_str, other_sales_str, users_str, lives_str, videos_str, live_ratio, video_ratio, other_ratio)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- sql_pool.insert_all(sql, info_list)
- def update_state(sql_pool, product_id, state_str, state_int):
- """
- 更新 kgj_product_list_record 状态
- :param sql_pool: 数据库连接池对象
- :param product_id: 产品ID
- :param state_str: 状态字段名称
- :param state_int: 状态值
- """
- sql = f"UPDATE kgj_kapai_product_list_record SET {state_str} = %s WHERE product_id = %s"
- sql_pool.update_one(sql, (state_int, product_id))
- def get_product_overview_percent(log, product_id, headers):
- log.info(f"Getting kgj product overview percent, product_id:{product_id}")
- url = "https://service.kaogujia.com/api/sku/overview/dist"
- data = {
- "product_id": product_id,
- "from_dt": get_date(-7),
- "to_dt": get_date(-1)
- }
- response = requests.post(url, headers=headers, json=data)
- response.raise_for_status()
- resp_json = response.json()
- if resp_json:
- if resp_json.get("code") == 200:
- enc_data = resp_json.get("data")
- par_url = '/api/sku/overview/dist'
- dec_data = decrypt_data(log, par_url, enc_data)
- # print(dec_data)
- sales_list = dec_data.get("sales")
- live_ratio = "0%"
- video_ratio = "0%"
- other_ratio = "0%"
- for sales in sales_list:
- if sales.get("name") == "直播带货":
- live_ratio = sales.get("percent")
- live_ratio = decimal_to_percent_with_dist(live_ratio) if live_ratio else "0%"
- if sales.get("name") == "视频带货":
- video_ratio = sales.get("percent")
- video_ratio = decimal_to_percent_with_dist(video_ratio) if video_ratio else "0%"
- if sales.get("name") == "商品卡":
- other_ratio = sales.get("percent")
- other_ratio = decimal_to_percent_with_dist(other_ratio) if other_ratio else "1%"
- return live_ratio, video_ratio, other_ratio
- else:
- log.warning(f"Error get_kgj_product_overview: {resp_json.get('message')}")
- # update_state(sql_pool, product_id, "product_state", 2)
- return "0%", "0%", "0%"
- else:
- log.warning(f"Error get_kgj_product_overview resp_json")
- # update_state(sql_pool, product_id, "product_state", 2)
- return "0%", "0%", "0%"
- def parse_product_overview(log, sql_pool, resp_json: dict, product_id, headers):
- log.info("开始解析 product_overview 数据......................")
- trend_list = resp_json.get("trend_list", [])
- if not trend_list:
- log.debug(f"parse_product_overview trend_list is empty")
- update_state(sql_pool, product_id, "product_state", 2)
- else:
- # 获取商品概览页的占比信息
- live_ratio, video_ratio, other_ratio = get_product_overview_percent(log, product_id, headers)
- info_list = []
- for trend in trend_list:
- date_all = trend.get("date_all")
- sales_str = trend.get("sales_str")
- live_sales_str = trend.get("live_sales_str")
- video_sales_str = trend.get("video_sales_str")
- other_sales_str = trend.get("other_sales_str")
- users_str = trend.get("users_str")
- lives_str = trend.get("lives_str")
- videos_str = trend.get("videos_str")
- info = (
- product_id, date_all, sales_str, live_sales_str, video_sales_str, other_sales_str, users_str, lives_str,
- videos_str, live_ratio, video_ratio, other_ratio)
- info_list.append(info)
- save_product_overview(sql_pool, info_list)
- # sql = "UPDATE kgj_product_list_record SET product_state = 2 WHERE product_id = %s"
- update_state(sql_pool, product_id, "product_state", 1)
- def get_kgj_product_overview(log, sql_pool, product_id, headers):
- url = f"https://service.kaogujia.com/api/sku/trend/{product_id}"
- params = {
- "begin": get_date(-7),
- # "begin": "20250115",
- # "end": "20250121"
- "end": get_date(-1)
- }
- response = requests.get(url, headers=headers, params=params)
- # print(response.text)
- response.raise_for_status()
- resp_json = response.json()
- if resp_json:
- if resp_json.get("code") == 200:
- enc_data = resp_json.get("data")
- par_url = f'/api/sku/trend/{product_id}'
- dec_data = decrypt_data(log, par_url, enc_data)
- # print(dec_data)
- parse_product_overview(log, sql_pool, dec_data, product_id, headers)
- else:
- log.warning(f"Error get_kgj_product_overview: {resp_json.get('message')}")
- update_state(sql_pool, product_id, "product_state", 2)
- else:
- log.warning(f"Error get_kgj_product_overview resp_json")
- update_state(sql_pool, product_id, "product_state", 2)
- def save_linded_live(sql_pool, info_list):
- sql = """
- INSERT INTO kgj_kapai_linked_live_record (product_id, title, duration_str, pub_time_str, uid, nick_name, fans_count, price, price_str, sales, gmv, pi, room_id, date_code)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- sql_pool.insert_all(sql, info_list)
- def parse_linked_live(log, dec_data, sql_pool, product_id):
- log.info("开始解析 linked_live 数据......................")
- items = dec_data.get("items", [])
- if not items:
- log.debug(f"parse_linked_live items is empty")
- update_state(sql_pool, product_id, "live_state", 2)
- else:
- info_list = []
- for item in items:
- title = item.get("title")
- duration = item.get("duration")
- duration_str = convert_seconds_to_hours_minutes(duration)
- pub_time = item.get("pub_time")
- pub_time_str = datetime.fromtimestamp(pub_time).strftime('%Y-%m-%d %H:%M:%S') if pub_time else ""
- uid = item.get("uid")
- nick_name = item.get("nick_name")
- fans_count = item.get("fans_count")
- price = item.get("price")
- price_str = item.get("price_str")
- sales = item.get("sales")
- gmv = item.get("gmv")
- pi = item.get("pi")
- room_id = item.get("room_id")
- date_code = item.get("date_code")
- info = (
- product_id, title, duration_str, pub_time_str, uid, nick_name, fans_count, price, price_str, sales, gmv,
- pi, room_id, date_code)
- info_list.append(info)
- save_linded_live(sql_pool, info_list)
- update_state(sql_pool, product_id, "live_state", 1)
- def get_linked_live(log, sql_pool, product_id, headers):
- url = "https://service.kaogujia.com/api/sku/live/list"
- params = {
- "limit": "10",
- "page": "1",
- "sort_field": "gmv",
- "sort": "0"
- }
- data = {
- "keyword": "",
- "min_time": get_date(-6),
- "max_time": get_date(0),
- "product_id": product_id
- }
- response = requests.post(url, headers=headers, params=params, json=data)
- # print(response.text)
- response.raise_for_status()
- resp_json = response.json()
- if resp_json:
- if resp_json.get("code") == 200:
- enc_data = resp_json.get("data")
- par_url = '/api/sku/live/list'
- dec_data = decrypt_data(log, par_url, enc_data)
- # print(dec_data)
- parse_linked_live(log, dec_data, sql_pool, product_id)
- else:
- log.warning(f"Error get_linked_live: {resp_json.get('message')}")
- update_state(sql_pool, product_id, "live_state", 2)
- else:
- log.warning(f"Error get_linked_live resp_json")
- update_state(sql_pool, product_id, "live_state", 2)
- @retry(stop=stop_after_attempt(500), wait=wait_fixed(600), after=after_log)
- def kgj_kapai_main(log):
- log.info("开始运行 kgj_kapai_main 爬虫任务............................................................")
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool:
- log.error("数据库连接失败")
- raise Exception("数据库连接失败")
- kgj_token = sql_pool.select_one("SELECT token FROM kgj_token")
- # 195的账号
- KGJ_HEADERS = {
- "accept": "*/*",
- "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
- "authorization": kgj_token[0],
- "content-type": "application/json",
- "origin": "https://www.kaogujia.com",
- "priority": "u=1, i",
- "referer": "https://www.kaogujia.com/",
- "sec-ch-ua": "\"Not(A:Brand\";v=\"99\", \"Google Chrome\";v=\"133\", \"Chromium\";v=\"133\"",
- "sec-ch-ua-mobile": "?0",
- "sec-ch-ua-platform": "\"Windows\"",
- "sec-fetch-dest": "empty",
- "sec-fetch-mode": "cors",
- "sec-fetch-site": "same-site",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
- "version_code": "3.1"
- }
- try:
- keyword = "卡牌"
- log.info("开始获取 product_list 数据............................................................")
- # sql_product_id_list = sql_pool.select_all("SELECT DISTINCT product_id FROM kgj_kapai_product_list_record")
- # sql_product_id_list = [item[0] for item in sql_product_id_list]
- try:
- get_kgj_product_list(log, keyword, sql_pool, KGJ_HEADERS)
- except Exception as e:
- log.error(f"Error main -> getting kgj product list: {e}")
- # sql_product_id_list.clear()
- log.info("product_list 数据获取完成............................................................")
- time.sleep(5)
- log.info("开始获取 product_overview 数据............................................................")
- product_id_list_for_product = sql_pool.select_all(
- "SELECT product_id FROM kgj_kapai_product_list_record WHERE product_state = 0")
- product_id_list = [item[0] for item in product_id_list_for_product]
- for product_id in product_id_list:
- try:
- log.info(f"开始获取 product_id: {product_id} 的 product_overview 数据............................")
- get_kgj_product_overview(log, sql_pool, product_id, KGJ_HEADERS)
- except Exception as e:
- log.error(f"Error main -> getting kgj product overview: {e}")
- time.sleep(random.randint(4, 6))
- continue
- time.sleep(random.randint(4, 6))
- log.info("product_overview 数据获取完成............................................................")
- time.sleep(5)
- log.info("开始获取 linked_live 数据............................................................")
- product_id_list_for_live = sql_pool.select_all(
- "SELECT product_id FROM kgj_kapai_product_list_record WHERE live_state = 0")
- product_id_list = [item[0] for item in product_id_list_for_live]
- for product_id in product_id_list:
- try:
- log.info(f"开始获取 product_id: {product_id} 的 linked_live 数据............................")
- get_linked_live(log, sql_pool, product_id, KGJ_HEADERS)
- except Exception as e:
- log.error(f"Error main -> getting kgj linked_live: {e}")
- time.sleep(random.randint(4, 6))
- continue
- time.sleep(random.randint(4, 6))
- log.info("linked_live 数据获取完成............................................................")
- time.sleep(5)
- log.info("开始获取 live_detail 数据............................................................")
- sql_room_id_list = sql_pool.select_all(
- "SELECT id, uid, room_id, date_code FROM kgj_kapai_linked_live_record WHERE live_detail_state = 0 and uid is not null and room_id is not null and date_code is not null")
- sql_room_id_list = [item for item in sql_room_id_list]
- if sql_room_id_list:
- for sql_info in sql_room_id_list:
- try:
- log.info(f"开始获取 room_id: {sql_info[2]} 的 live_detail 数据............................")
- live_detail_info_list = get_live_detail(log, sql_info, KGJ_HEADERS)
- save_live_detail(sql_pool, live_detail_info_list)
- update_linked_live_state(sql_pool, sql_info[0])
- except Exception as e:
- log.error(f"Error main -> getting kgj live_detail: {e}")
- # update_linked_live_state(sql_pool, sql_info[0])
- time.sleep(random.randint(4, 6))
- continue
- time.sleep(random.randint(4, 6))
- log.info("live_detail 数据获取完成............................................................")
- except Exception as e:
- log.error(e)
- finally:
- log.info("爬虫程序 kgj_kapai_main 运行结束,等待下一轮的采集任务.............")
- def schedule_task():
- """
- 设置定时任务
- """
- # 立即运行一次任务
- # kgj_kapai_main(logger)
- # 设置定时任务 考古加 -> 卡牌 一周一次 卡牌类的周三跑 抓取时间比较久 和其他几个类错开时间
- schedule.every().wednesday.at("01:01").do(kgj_kapai_main, logger)
- while True:
- schedule.run_pending()
- time.sleep(1)
- if __name__ == '__main__':
- schedule_task()
|