#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Parameter-based client for jhs_raw_codec_rpc.js Usage (import-based): from raw_codec_rpc.jhs_raw_codec_client import call_codec enc = call_codec({ "op": "enc", "url": "https://api.jihuanshe.com/api/market/auction-products?sorting=completed&page=2&token=..." }) dec = call_codec({ "op": "dec", "request_url": "https://api.jihuanshe.com/api/market/banners?raw_data=...&token=...", "response_raw_data": "BASE64_CIPHER" }) """ from pathlib import Path from typing import Any, Dict, Optional import time import json import os import subprocess import tempfile import frida PKG = "com.jihuanshe" SCRIPT_PATH = Path(__file__).with_name("jhs_raw_codec_rpc.js") ENV_DEVICE_ID = "FRIDA_DEVICE_ID" ENV_DEBUG = "JHS_CODEC_DEBUG" ENV_CLI_TARGET_SEC = "FRIDA_CLI_TARGET_SEC" class JhsRawCodecClient: def _on_script_message(self, message, data) -> None: msg_type = message.get("type") if msg_type == "log": level = message.get("level", "log") payload = message.get("payload", "") print(f"[frida:{level}] {payload}") return if msg_type == "error": desc = message.get("description", "script error") stack = message.get("stack") print(f"[frida:error] {desc}") if stack: print(stack) return print(f"[frida:{msg_type}] {message}") def _resolve_device(self, device_id: Optional[str]): """ 解析并返回 Frida 设备对象。 优先使用显式传入的 device_id;若为空则回退到 USB 设备。 Args: device_id: 目标设备 ID,例如 emulator-5554。 Returns: frida.core.Device: 已连接设备对象。 """ if device_id: return frida.get_device_manager().get_device(device_id, timeout=5) return frida.get_usb_device(timeout=5) def _log(self, msg: str) -> None: if self.debug: print(f"[JhsRawCodecClient] {msg}") def _find_pid_by_identifier(self) -> int: """ 通过应用标识符(package name)查找目标应用的进程 PID。 Returns: int: 找到则返回 PID,未找到返回 0。 """ # Prefer app identifier lookup; attach("name") in frida-python matches process name. try: for app in self.device.enumerate_applications(): if app.identifier == self.package and app.pid: return int(app.pid) except Exception: pass return 0 def _find_pid_by_process(self) -> int: """ 通过进程名查找目标进程 PID(作为 identifier 查找的兜底方案)。 Returns: int: 找到则返回 PID,未找到返回 0。 """ try: for p in self.device.enumerate_processes(): if p.name == self.package: return int(p.pid) except Exception: pass return 0 def __init__( self, package: str = PKG, device_id: Optional[str] = None, cli_target_sec: Optional[int] = None, ): """ 初始化客户端并附加到目标 App 进程,随后加载 RPC 脚本。 Args: package: 目标应用包名,默认使用常量 PKG。 device_id: Frida 设备 ID。未传时读取环境变量 FRIDA_DEVICE_ID, 若仍为空则使用默认 USB 设备选择逻辑。 cli_target_sec: CLI 兜底模式的 frida `-t` 秒数。未传时读取 环境变量 FRIDA_CLI_TARGET_SEC,默认 3 秒。 Raises: RuntimeError: 多次重试后仍无法附加到目标进程。 """ self.package = package self.device_id = device_id or os.getenv(ENV_DEVICE_ID) self.debug = os.getenv(ENV_DEBUG, "").strip().lower() in {"1", "true", "yes", "on"} target_sec = cli_target_sec if target_sec is None: target_sec = int(os.getenv(ENV_CLI_TARGET_SEC, "3")) self.cli_target_sec = max(1, int(target_sec)) self.device = self._resolve_device(self.device_id) self.session = None self._prefer_cli = False last_err = None for _ in range(6): try: pid = self._find_pid_by_identifier() or self._find_pid_by_process() if pid: self.session = self.device.attach(pid) break last_err = frida.ProcessNotFoundError( f"unable to find running app/process for '{self.package}'" ) time.sleep(1.0) except frida.ProcessNotFoundError as e: last_err = e time.sleep(1.0) if self.session is None: raise RuntimeError( f"unable to attach '{self.package}', please open app and keep it running" ) from last_err code = SCRIPT_PATH.read_text(encoding="utf-8") self.script = self.session.create_script(code) self.script.on("message", self._on_script_message) self.script.load() def __enter__(self): """上下文管理器入口,返回当前客户端实例。""" return self def __exit__(self, exc_type, exc, tb): """上下文管理器退出时释放 Frida 资源。""" self.close() return False def close(self) -> None: """关闭并清理资源:卸载脚本、断开会话连接。""" try: self.script.unload() except Exception: pass try: self.session.detach() except Exception: pass def encrypt(self, url: str) -> Dict[str, Any]: """ 调用 JS RPC 的 encrypt 方法,对请求 URL 进行加密处理。 Args: url: 原始请求 URL。 Returns: Dict[str, Any]: JS 侧返回的加密结果字典。 """ return self.script.exports_sync.encrypt(url) def decrypt(self, request_url_with_raw_data: str, response_raw_data: str) -> Dict[str, Any]: """ 调用 JS RPC 的 decrypt 方法,对响应中的 raw_data 进行解密。 Args: request_url_with_raw_data: 包含 raw_data 参数的请求 URL。 response_raw_data: 响应中的加密 raw_data 字符串。 Returns: Dict[str, Any]: JS 侧返回的解密结果字典。 """ return self.script.exports_sync.decrypt(request_url_with_raw_data, response_raw_data) def call(self, params: Dict[str, Any]) -> Dict[str, Any]: """ 统一调用入口,转发到 JS RPC 的 call 方法。 当 Python 会话环境缺失 Java bridge(如部分 Gadget 场景)时, 自动降级为 CLI 注入方式调用。 Args: params: RPC 调用参数字典。 Returns: Dict[str, Any]: RPC 返回结果。 """ if self._prefer_cli: self._log("call path: cli(preferred)") return self._call_via_cli(params) try: self._log("call path: python rpc") return self.script.exports_sync.call(params) except Exception as e: msg = str(e) # In some embedded Gadget setups, Python session scripts miss Java bridge. if "Java is not defined" in msg or "ReferenceError: 'Java' is not defined" in msg: # Once detected, skip the slow exception path on later calls. self._prefer_cli = True self._log("python rpc missing Java bridge, switch to cli fallback") return self._call_via_cli(params) raise def _call_via_cli(self, params: Dict[str, Any]) -> Dict[str, Any]: """ 使用 frida CLI 作为兜底方案执行一次 RPC 调用。 实现流程: 1) 拼接临时 JS(注入参数并调用 rpc.exports.call) 2) 通过 frida 命令行注入目标进程执行 3) 从标准输出解析约定的结果前缀 Args: params: RPC 调用参数字典。 Returns: Dict[str, Any]: RPC 返回结果。 Raises: RuntimeError: CLI 调用失败或未解析到结果行。 """ js_src = SCRIPT_PATH.read_text(encoding="utf-8") params_json = json.dumps(params, ensure_ascii=False) wrapper = ( "const __PARAMS = " + params_json + ";\n" + js_src + "\nsetImmediate(function(){\n" " rpc.exports.call(__PARAMS)\n" " .then(function(r){ console.log('[CODEC-RESULT]' + JSON.stringify(r)); })\n" " .catch(function(e){ console.log('[CODEC-ERROR]' + e); });\n" "});\n" ) fd, tmp_path = tempfile.mkstemp(prefix="jhs_codec_", suffix=".js") os.close(fd) proc = None try: Path(tmp_path).write_text(wrapper, encoding="utf-8") cli_cmd = ["frida"] if self.device_id: cli_cmd.extend(["-D", self.device_id]) else: cli_cmd.append("-U") cli_cmd.extend(["-N", self.package, "-l", tmp_path, "-q", "-t", str(self.cli_target_sec)]) self._log("spawn cli: " + " ".join(cli_cmd)) proc = subprocess.Popen( cli_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", ) deadline = time.time() + max(10, self.cli_target_sec + 8) out_lines = [] err_lines = [] while time.time() < deadline: if proc.stdout is None: break line = proc.stdout.readline() if line: line = line.rstrip("\r\n") out_lines.append(line) if line.startswith("[CODEC-RESULT]"): result = json.loads(line[len("[CODEC-RESULT]"):]) if proc.poll() is None: proc.terminate() return result if line.startswith("[CODEC-ERROR]"): if proc.poll() is None: proc.terminate() raise RuntimeError(line) continue if proc.poll() is not None: break if proc.stderr is not None: err_lines.extend(proc.stderr.read().splitlines()) out = "\n".join(out_lines) err = "\n".join(err_lines) raise RuntimeError( "cli codec call failed: no result line\n" + "stdout:\n" + out + "\n" + "stderr:\n" + err ) finally: if proc is not None and proc.poll() is None: try: proc.terminate() except Exception: pass try: os.remove(tmp_path) except Exception: pass def encrypt_url(url: str, package: str = PKG, device_id: Optional[str] = None) -> Dict[str, Any]: """ 便捷函数:创建临时客户端,执行 URL 加密并自动释放资源。 Args: url: 原始请求 URL。 package: 目标应用包名。 device_id: Frida 设备 ID(如 emulator-5554),可不传。 Returns: Dict[str, Any]: 加密结果字典。 """ with JhsRawCodecClient(package=package, device_id=device_id) as client: return client.encrypt(url) def decrypt_raw_data( request_url: str, response_raw_data: str, package: str = PKG, device_id: Optional[str] = None, ) -> Dict[str, Any]: """ 便捷函数:创建临时客户端,执行 raw_data 解密并自动释放资源。 Args: request_url: 包含 raw_data 参数的请求 URL。 response_raw_data: 响应中的加密 raw_data。 package: 目标应用包名。 device_id: Frida 设备 ID(如 emulator-5554),可不传。 Returns: Dict[str, Any]: 解密结果字典。 """ with JhsRawCodecClient(package=package, device_id=device_id) as client: return client.decrypt(request_url, response_raw_data) def call_codec( params: Dict[str, Any], package: str = PKG, device_id: Optional[str] = None, ) -> Dict[str, Any]: """ 对外统一调用入口:根据 params["op"] 执行 enc/dec。 Args: params: 调用参数字典,支持两种格式: enc: {"op": "enc", "url": "..."} dec: {"op": "dec", "request_url": "...", "response_raw_data": "..."} package: 目标应用包名。 device_id: Frida 设备 ID(如 emulator-5554),可不传。 也可通过环境变量 FRIDA_DEVICE_ID 指定。 Returns: Dict[str, Any]: 编解码结果字典。 Raises: TypeError: 当 params 不是 dict 时抛出。 """ if not isinstance(params, dict): raise TypeError("params must be a dict") with JhsRawCodecClient(package=package, device_id=device_id) as client: return client.call(params)