# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2026/4/23 13:46 import json import time import requests import inspect import schedule from loguru import logger from typing import Any, Dict from datetime import datetime from mysql_pool import MySQLConnectionPool from jhs_raw_codec_client import JhsRawCodecClient from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type # TOKEN = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJlbnYiOiJwcm9kdWN0aW9uIiwic3ViIjoyODI3NDU4LCJpc3MiOiJodHRwOi8vYXBpLmppaHVhbnNoZS5jb20vYXBpL21hcmtldC9hdXRoL2xvZ2luLW9yLXNpZ251cCIsImlhdCI6MTc3NTYzNzQzNSwiZXhwIjoxNzgwODIxNDM1LCJuYmYiOjE3NzU2Mzc0MzUsImp0aSI6InhiT3NsdUJRTzVWeHRabHQifQ.uHz7M-U0ewPgi5Qzr5P4eJbSdIUO_i_hmVE-0jsaG2Y" # DEVICE_ID = "127.0.0.1:5557" # adb connect 127.0.0.1:5557 DEVICE_ID = "25051FDD4S018P" # adb connect 127.0.0.1:5557 CLI_TARGET_SEC = 2 TIMEOUT_SEC = 15 BASE_URL = "https://api.jihuanshe.com/api/market/auction-products" HEADERS = { "User-Agent": "Model/google,Pixel5 OS/30 Version/3.36.2", "Connection": "Keep-Alive", "Accept-Encoding": "gzip", "x-device-id": "6efe93931488e176", } logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(json.JSONDecodeError), after=after_log) def fetch_market_page( log, page: int, token: str, client: JhsRawCodecClient, session: requests.Session, headers: Dict[str, str], timeout_sec: int = TIMEOUT_SEC, ) -> Dict[str, Any]: """ 请求并解密单页数据。 复用方式: - `client` 和 `session` 由外层创建一次并长期复用 - 调用本函数时只传不同 page 即可 """ log.info(f"Fetching page {page}......................") url_for_enc = f"{BASE_URL}?sorting=completed&page={page}&token={token}" enc = client.call({"op": "enc", "url": url_for_enc}) raw_data = enc["raw_data"] resp = session.get( BASE_URL, headers=headers, params={"raw_data": raw_data, "token": token}, timeout=timeout_sec, ) resp.raise_for_status() body = resp.json() response_raw_data = body["raw_data"] request_url_for_dec = f"{BASE_URL}?raw_data={raw_data}&token={token}" dec = client.call( { "op": "dec", "request_url": request_url_for_dec, "response_raw_data": response_raw_data, } ) response_body = dec.get("response_body", "") parsed: Any = response_body if isinstance(response_body, str): try: parsed = json.loads(response_body) except Exception: log.error(f"Error parsing response body: {response_body}") pass return { "page": page, "enc": enc, "http_json": body, "dec": dec, "decoded": parsed, } def parse_data(resp_data, sql_pool): """ 解析数据 :param resp_data: 响应数据 :param sql_pool: 数据库连接池 """ data_list = resp_data.get("raw_data",{}).get("data", []) info_list = [] for data in data_list: seller_username = data.get("seller_username") product_id = data.get("auction_product_id") app_id = data.get("app_id") auction_product_name = data.get("auction_product_name") auction_product_images = data.get("auction_product_image") game_key = data.get("game_key") language_text = data.get("language_text") authenticator_name = data.get("authenticator_name") grading = data.get("grading") starting_price = data.get("starting_price") max_bid_price = data.get("max_bid_price") status = data.get("status") auction_product_start_timestamp = data.get('auction_product_start_timestamp') auction_product_start_time = datetime.fromtimestamp(auction_product_start_timestamp).strftime( '%Y-%m-%d %H:%M:%S') if auction_product_start_timestamp else None auction_product_end_timestamp = data.get('auction_product_end_timestamp') auction_product_end_time = datetime.fromtimestamp(auction_product_end_timestamp).strftime( '%Y-%m-%d %H:%M:%S') if auction_product_end_timestamp else None bid_count = data.get("bid_count") card_number = data.get("number") rarity = data.get("rarity") data_dict = { "seller_username": seller_username, "product_id": product_id, "app_id": app_id, "auction_product_name": auction_product_name, "auction_product_images": auction_product_images, "game_key": game_key, "language_text": language_text, "authenticator_name": authenticator_name, "grading": grading, "starting_price": starting_price, "max_bid_price": max_bid_price, "status": status, "auction_product_start_time": auction_product_start_time, "auction_product_end_time": auction_product_end_time, "bid_count": bid_count, "card_number": card_number, "rarity": rarity, } # print(data_dict) # print(type(data)) info_list.append(data_dict) if info_list: sql_pool.insert_many(table="jhs_product_record", data_list=info_list, ignore=True) def get_market_list(log, token: str, sql_pool): page = 1 max_page = 800 with JhsRawCodecClient(device_id=DEVICE_ID, cli_target_sec=CLI_TARGET_SEC) as codec_client: with requests.Session() as http_sess: while page < max_page: try: result = fetch_market_page( log=log, page=page, token=token, client=codec_client, session=http_sess, headers=HEADERS, ) # print(page, result["decoded"]) try: parse_data(result["decoded"], sql_pool) except Exception as e: log.error(f"Error parsing page {page}: {e}") except Exception as e: log.error(f"Error fetching page {page}: {e}") page += 1 @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def jhs_rpc_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool: log.error("MySQL数据库连接失败") raise Exception("MySQL数据库连接失败") try: jhs_token = sql_pool.select_one('SELECT token FROM jhs_token WHERE id = 1') get_market_list(log, jhs_token[0], sql_pool) except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') def schedule_task(): """ 设置定时任务 """ # jhs_rpc_main(log=logger) schedule.every().day.at("05:00").do(jhs_rpc_main, log=logger) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": schedule_task() # jhs_rpc_main(log=logger)