| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Parameter-based client for jhs_raw_codec_rpc.js
- Usage (import-based):
- from raw_codec_rpc.jhs_raw_codec_client import call_codec
- enc = call_codec({
- "op": "enc",
- "url": "https://api.jihuanshe.com/api/market/auction-products?sorting=completed&page=2&token=..."
- })
- dec = call_codec({
- "op": "dec",
- "request_url": "https://api.jihuanshe.com/api/market/banners?raw_data=...&token=...",
- "response_raw_data": "BASE64_CIPHER"
- })
- """
- from pathlib import Path
- from typing import Any, Dict, Optional
- import time
- import json
- import os
- import subprocess
- import tempfile
- import frida
- PKG = "com.jihuanshe"
- SCRIPT_PATH = Path(__file__).with_name("jhs_raw_codec_rpc.js")
- ENV_DEVICE_ID = "FRIDA_DEVICE_ID"
- ENV_DEBUG = "JHS_CODEC_DEBUG"
- ENV_CLI_TARGET_SEC = "FRIDA_CLI_TARGET_SEC"
- class JhsRawCodecClient:
- def _on_script_message(self, message, data) -> None:
- msg_type = message.get("type")
- if msg_type == "log":
- level = message.get("level", "log")
- payload = message.get("payload", "")
- print(f"[frida:{level}] {payload}")
- return
- if msg_type == "error":
- desc = message.get("description", "script error")
- stack = message.get("stack")
- print(f"[frida:error] {desc}")
- if stack:
- print(stack)
- return
- print(f"[frida:{msg_type}] {message}")
- def _resolve_device(self, device_id: Optional[str]):
- """
- 解析并返回 Frida 设备对象。
- 优先使用显式传入的 device_id;若为空则回退到 USB 设备。
- Args:
- device_id: 目标设备 ID,例如 emulator-5554。
- Returns:
- frida.core.Device: 已连接设备对象。
- """
- if device_id:
- return frida.get_device_manager().get_device(device_id, timeout=5)
- return frida.get_usb_device(timeout=5)
- def _log(self, msg: str) -> None:
- if self.debug:
- print(f"[JhsRawCodecClient] {msg}")
- def _find_pid_by_identifier(self) -> int:
- """
- 通过应用标识符(package name)查找目标应用的进程 PID。
- Returns:
- int: 找到则返回 PID,未找到返回 0。
- """
- # Prefer app identifier lookup; attach("name") in frida-python matches process name.
- try:
- for app in self.device.enumerate_applications():
- if app.identifier == self.package and app.pid:
- return int(app.pid)
- except Exception:
- pass
- return 0
- def _find_pid_by_process(self) -> int:
- """
- 通过进程名查找目标进程 PID(作为 identifier 查找的兜底方案)。
- Returns:
- int: 找到则返回 PID,未找到返回 0。
- """
- try:
- for p in self.device.enumerate_processes():
- if p.name == self.package:
- return int(p.pid)
- except Exception:
- pass
- return 0
- def __init__(
- self,
- package: str = PKG,
- device_id: Optional[str] = None,
- cli_target_sec: Optional[int] = None,
- ):
- """
- 初始化客户端并附加到目标 App 进程,随后加载 RPC 脚本。
- Args:
- package: 目标应用包名,默认使用常量 PKG。
- device_id: Frida 设备 ID。未传时读取环境变量 FRIDA_DEVICE_ID,
- 若仍为空则使用默认 USB 设备选择逻辑。
- cli_target_sec: CLI 兜底模式的 frida `-t` 秒数。未传时读取
- 环境变量 FRIDA_CLI_TARGET_SEC,默认 3 秒。
- Raises:
- RuntimeError: 多次重试后仍无法附加到目标进程。
- """
- self.package = package
- self.device_id = device_id or os.getenv(ENV_DEVICE_ID)
- self.debug = os.getenv(ENV_DEBUG, "").strip().lower() in {"1", "true", "yes", "on"}
- target_sec = cli_target_sec
- if target_sec is None:
- target_sec = int(os.getenv(ENV_CLI_TARGET_SEC, "3"))
- self.cli_target_sec = max(1, int(target_sec))
- self.device = self._resolve_device(self.device_id)
- self.session = None
- self._prefer_cli = False
- last_err = None
- for _ in range(6):
- try:
- pid = self._find_pid_by_identifier() or self._find_pid_by_process()
- if pid:
- self.session = self.device.attach(pid)
- break
- last_err = frida.ProcessNotFoundError(
- f"unable to find running app/process for '{self.package}'"
- )
- time.sleep(1.0)
- except frida.ProcessNotFoundError as e:
- last_err = e
- time.sleep(1.0)
- if self.session is None:
- raise RuntimeError(
- f"unable to attach '{self.package}', please open app and keep it running"
- ) from last_err
- code = SCRIPT_PATH.read_text(encoding="utf-8")
- self.script = self.session.create_script(code)
- self.script.on("message", self._on_script_message)
- self.script.load()
- def __enter__(self):
- """上下文管理器入口,返回当前客户端实例。"""
- return self
- def __exit__(self, exc_type, exc, tb):
- """上下文管理器退出时释放 Frida 资源。"""
- self.close()
- return False
- def close(self) -> None:
- """关闭并清理资源:卸载脚本、断开会话连接。"""
- try:
- self.script.unload()
- except Exception:
- pass
- try:
- self.session.detach()
- except Exception:
- pass
- def encrypt(self, url: str) -> Dict[str, Any]:
- """
- 调用 JS RPC 的 encrypt 方法,对请求 URL 进行加密处理。
- Args:
- url: 原始请求 URL。
- Returns:
- Dict[str, Any]: JS 侧返回的加密结果字典。
- """
- return self.script.exports_sync.encrypt(url)
- def decrypt(self, request_url_with_raw_data: str, response_raw_data: str) -> Dict[str, Any]:
- """
- 调用 JS RPC 的 decrypt 方法,对响应中的 raw_data 进行解密。
- Args:
- request_url_with_raw_data: 包含 raw_data 参数的请求 URL。
- response_raw_data: 响应中的加密 raw_data 字符串。
- Returns:
- Dict[str, Any]: JS 侧返回的解密结果字典。
- """
- return self.script.exports_sync.decrypt(request_url_with_raw_data, response_raw_data)
- def call(self, params: Dict[str, Any]) -> Dict[str, Any]:
- """
- 统一调用入口,转发到 JS RPC 的 call 方法。
- 当 Python 会话环境缺失 Java bridge(如部分 Gadget 场景)时,
- 自动降级为 CLI 注入方式调用。
- Args:
- params: RPC 调用参数字典。
- Returns:
- Dict[str, Any]: RPC 返回结果。
- """
- if self._prefer_cli:
- self._log("call path: cli(preferred)")
- return self._call_via_cli(params)
- try:
- self._log("call path: python rpc")
- return self.script.exports_sync.call(params)
- except Exception as e:
- msg = str(e)
- # In some embedded Gadget setups, Python session scripts miss Java bridge.
- if "Java is not defined" in msg or "ReferenceError: 'Java' is not defined" in msg:
- # Once detected, skip the slow exception path on later calls.
- self._prefer_cli = True
- self._log("python rpc missing Java bridge, switch to cli fallback")
- return self._call_via_cli(params)
- raise
- def _call_via_cli(self, params: Dict[str, Any]) -> Dict[str, Any]:
- """
- 使用 frida CLI 作为兜底方案执行一次 RPC 调用。
- 实现流程:
- 1) 拼接临时 JS(注入参数并调用 rpc.exports.call)
- 2) 通过 frida 命令行注入目标进程执行
- 3) 从标准输出解析约定的结果前缀
- Args:
- params: RPC 调用参数字典。
- Returns:
- Dict[str, Any]: RPC 返回结果。
- Raises:
- RuntimeError: CLI 调用失败或未解析到结果行。
- """
- js_src = SCRIPT_PATH.read_text(encoding="utf-8")
- params_json = json.dumps(params, ensure_ascii=False)
- wrapper = (
- "const __PARAMS = " + params_json + ";\n"
- + js_src
- + "\nsetImmediate(function(){\n"
- " rpc.exports.call(__PARAMS)\n"
- " .then(function(r){ console.log('[CODEC-RESULT]' + JSON.stringify(r)); })\n"
- " .catch(function(e){ console.log('[CODEC-ERROR]' + e); });\n"
- "});\n"
- )
- fd, tmp_path = tempfile.mkstemp(prefix="jhs_codec_", suffix=".js")
- os.close(fd)
- proc = None
- try:
- Path(tmp_path).write_text(wrapper, encoding="utf-8")
- cli_cmd = ["frida"]
- if self.device_id:
- cli_cmd.extend(["-D", self.device_id])
- else:
- cli_cmd.append("-U")
- cli_cmd.extend(["-N", self.package, "-l", tmp_path, "-q", "-t", str(self.cli_target_sec)])
- self._log("spawn cli: " + " ".join(cli_cmd))
- proc = subprocess.Popen(
- cli_cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- text=True,
- encoding="utf-8",
- errors="replace",
- )
- deadline = time.time() + max(10, self.cli_target_sec + 8)
- out_lines = []
- err_lines = []
- while time.time() < deadline:
- if proc.stdout is None:
- break
- line = proc.stdout.readline()
- if line:
- line = line.rstrip("\r\n")
- out_lines.append(line)
- if line.startswith("[CODEC-RESULT]"):
- result = json.loads(line[len("[CODEC-RESULT]"):])
- if proc.poll() is None:
- proc.terminate()
- return result
- if line.startswith("[CODEC-ERROR]"):
- if proc.poll() is None:
- proc.terminate()
- raise RuntimeError(line)
- continue
- if proc.poll() is not None:
- break
- if proc.stderr is not None:
- err_lines.extend(proc.stderr.read().splitlines())
- out = "\n".join(out_lines)
- err = "\n".join(err_lines)
- raise RuntimeError(
- "cli codec call failed: no result line\n"
- + "stdout:\n" + out + "\n"
- + "stderr:\n" + err
- )
- finally:
- if proc is not None and proc.poll() is None:
- try:
- proc.terminate()
- except Exception:
- pass
- try:
- os.remove(tmp_path)
- except Exception:
- pass
- def encrypt_url(url: str, package: str = PKG, device_id: Optional[str] = None) -> Dict[str, Any]:
- """
- 便捷函数:创建临时客户端,执行 URL 加密并自动释放资源。
- Args:
- url: 原始请求 URL。
- package: 目标应用包名。
- device_id: Frida 设备 ID(如 emulator-5554),可不传。
- Returns:
- Dict[str, Any]: 加密结果字典。
- """
- with JhsRawCodecClient(package=package, device_id=device_id) as client:
- return client.encrypt(url)
- def decrypt_raw_data(
- request_url: str,
- response_raw_data: str,
- package: str = PKG,
- device_id: Optional[str] = None,
- ) -> Dict[str, Any]:
- """
- 便捷函数:创建临时客户端,执行 raw_data 解密并自动释放资源。
- Args:
- request_url: 包含 raw_data 参数的请求 URL。
- response_raw_data: 响应中的加密 raw_data。
- package: 目标应用包名。
- device_id: Frida 设备 ID(如 emulator-5554),可不传。
- Returns:
- Dict[str, Any]: 解密结果字典。
- """
- with JhsRawCodecClient(package=package, device_id=device_id) as client:
- return client.decrypt(request_url, response_raw_data)
- def call_codec(
- params: Dict[str, Any],
- package: str = PKG,
- device_id: Optional[str] = None,
- ) -> Dict[str, Any]:
- """
- 对外统一调用入口:根据 params["op"] 执行 enc/dec。
- Args:
- params: 调用参数字典,支持两种格式:
- enc: {"op": "enc", "url": "..."}
- dec: {"op": "dec", "request_url": "...", "response_raw_data": "..."}
- package: 目标应用包名。
- device_id: Frida 设备 ID(如 emulator-5554),可不传。
- 也可通过环境变量 FRIDA_DEVICE_ID 指定。
- Returns:
- Dict[str, Any]: 编解码结果字典。
- Raises:
- TypeError: 当 params 不是 dict 时抛出。
- """
- if not isinstance(params, dict):
- raise TypeError("params must be a dict")
- with JhsRawCodecClient(package=package, device_id=device_id) as client:
- return client.call(params)
|