jhs_raw_codec_client.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Parameter-based client for jhs_raw_codec_rpc.js
  5. Usage (import-based):
  6. from raw_codec_rpc.jhs_raw_codec_client import call_codec
  7. enc = call_codec({
  8. "op": "enc",
  9. "url": "https://api.jihuanshe.com/api/market/auction-products?sorting=completed&page=2&token=..."
  10. })
  11. dec = call_codec({
  12. "op": "dec",
  13. "request_url": "https://api.jihuanshe.com/api/market/banners?raw_data=...&token=...",
  14. "response_raw_data": "BASE64_CIPHER"
  15. })
  16. """
  17. from pathlib import Path
  18. from typing import Any, Dict, Optional
  19. import time
  20. import json
  21. import os
  22. import subprocess
  23. import tempfile
  24. import frida
  25. PKG = "com.jihuanshe"
  26. SCRIPT_PATH = Path(__file__).with_name("jhs_raw_codec_rpc.js")
  27. ENV_DEVICE_ID = "FRIDA_DEVICE_ID"
  28. ENV_DEBUG = "JHS_CODEC_DEBUG"
  29. ENV_CLI_TARGET_SEC = "FRIDA_CLI_TARGET_SEC"
  30. class JhsRawCodecClient:
  31. def _on_script_message(self, message, data) -> None:
  32. msg_type = message.get("type")
  33. if msg_type == "log":
  34. level = message.get("level", "log")
  35. payload = message.get("payload", "")
  36. print(f"[frida:{level}] {payload}")
  37. return
  38. if msg_type == "error":
  39. desc = message.get("description", "script error")
  40. stack = message.get("stack")
  41. print(f"[frida:error] {desc}")
  42. if stack:
  43. print(stack)
  44. return
  45. print(f"[frida:{msg_type}] {message}")
  46. def _resolve_device(self, device_id: Optional[str]):
  47. """
  48. 解析并返回 Frida 设备对象。
  49. 优先使用显式传入的 device_id;若为空则回退到 USB 设备。
  50. Args:
  51. device_id: 目标设备 ID,例如 emulator-5554。
  52. Returns:
  53. frida.core.Device: 已连接设备对象。
  54. """
  55. if device_id:
  56. return frida.get_device_manager().get_device(device_id, timeout=5)
  57. return frida.get_usb_device(timeout=5)
  58. def _log(self, msg: str) -> None:
  59. if self.debug:
  60. print(f"[JhsRawCodecClient] {msg}")
  61. def _find_pid_by_identifier(self) -> int:
  62. """
  63. 通过应用标识符(package name)查找目标应用的进程 PID。
  64. Returns:
  65. int: 找到则返回 PID,未找到返回 0。
  66. """
  67. # Prefer app identifier lookup; attach("name") in frida-python matches process name.
  68. try:
  69. for app in self.device.enumerate_applications():
  70. if app.identifier == self.package and app.pid:
  71. return int(app.pid)
  72. except Exception:
  73. pass
  74. return 0
  75. def _find_pid_by_process(self) -> int:
  76. """
  77. 通过进程名查找目标进程 PID(作为 identifier 查找的兜底方案)。
  78. Returns:
  79. int: 找到则返回 PID,未找到返回 0。
  80. """
  81. try:
  82. for p in self.device.enumerate_processes():
  83. if p.name == self.package:
  84. return int(p.pid)
  85. except Exception:
  86. pass
  87. return 0
  88. def __init__(
  89. self,
  90. package: str = PKG,
  91. device_id: Optional[str] = None,
  92. cli_target_sec: Optional[int] = None,
  93. ):
  94. """
  95. 初始化客户端并附加到目标 App 进程,随后加载 RPC 脚本。
  96. Args:
  97. package: 目标应用包名,默认使用常量 PKG。
  98. device_id: Frida 设备 ID。未传时读取环境变量 FRIDA_DEVICE_ID,
  99. 若仍为空则使用默认 USB 设备选择逻辑。
  100. cli_target_sec: CLI 兜底模式的 frida `-t` 秒数。未传时读取
  101. 环境变量 FRIDA_CLI_TARGET_SEC,默认 3 秒。
  102. Raises:
  103. RuntimeError: 多次重试后仍无法附加到目标进程。
  104. """
  105. self.package = package
  106. self.device_id = device_id or os.getenv(ENV_DEVICE_ID)
  107. self.debug = os.getenv(ENV_DEBUG, "").strip().lower() in {"1", "true", "yes", "on"}
  108. target_sec = cli_target_sec
  109. if target_sec is None:
  110. target_sec = int(os.getenv(ENV_CLI_TARGET_SEC, "3"))
  111. self.cli_target_sec = max(1, int(target_sec))
  112. self.device = self._resolve_device(self.device_id)
  113. self.session = None
  114. self._prefer_cli = False
  115. last_err = None
  116. for _ in range(6):
  117. try:
  118. pid = self._find_pid_by_identifier() or self._find_pid_by_process()
  119. if pid:
  120. self.session = self.device.attach(pid)
  121. break
  122. last_err = frida.ProcessNotFoundError(
  123. f"unable to find running app/process for '{self.package}'"
  124. )
  125. time.sleep(1.0)
  126. except frida.ProcessNotFoundError as e:
  127. last_err = e
  128. time.sleep(1.0)
  129. if self.session is None:
  130. raise RuntimeError(
  131. f"unable to attach '{self.package}', please open app and keep it running"
  132. ) from last_err
  133. code = SCRIPT_PATH.read_text(encoding="utf-8")
  134. self.script = self.session.create_script(code)
  135. self.script.on("message", self._on_script_message)
  136. self.script.load()
  137. def __enter__(self):
  138. """上下文管理器入口,返回当前客户端实例。"""
  139. return self
  140. def __exit__(self, exc_type, exc, tb):
  141. """上下文管理器退出时释放 Frida 资源。"""
  142. self.close()
  143. return False
  144. def close(self) -> None:
  145. """关闭并清理资源:卸载脚本、断开会话连接。"""
  146. try:
  147. self.script.unload()
  148. except Exception:
  149. pass
  150. try:
  151. self.session.detach()
  152. except Exception:
  153. pass
  154. def encrypt(self, url: str) -> Dict[str, Any]:
  155. """
  156. 调用 JS RPC 的 encrypt 方法,对请求 URL 进行加密处理。
  157. Args:
  158. url: 原始请求 URL。
  159. Returns:
  160. Dict[str, Any]: JS 侧返回的加密结果字典。
  161. """
  162. return self.script.exports_sync.encrypt(url)
  163. def decrypt(self, request_url_with_raw_data: str, response_raw_data: str) -> Dict[str, Any]:
  164. """
  165. 调用 JS RPC 的 decrypt 方法,对响应中的 raw_data 进行解密。
  166. Args:
  167. request_url_with_raw_data: 包含 raw_data 参数的请求 URL。
  168. response_raw_data: 响应中的加密 raw_data 字符串。
  169. Returns:
  170. Dict[str, Any]: JS 侧返回的解密结果字典。
  171. """
  172. return self.script.exports_sync.decrypt(request_url_with_raw_data, response_raw_data)
  173. def call(self, params: Dict[str, Any]) -> Dict[str, Any]:
  174. """
  175. 统一调用入口,转发到 JS RPC 的 call 方法。
  176. 当 Python 会话环境缺失 Java bridge(如部分 Gadget 场景)时,
  177. 自动降级为 CLI 注入方式调用。
  178. Args:
  179. params: RPC 调用参数字典。
  180. Returns:
  181. Dict[str, Any]: RPC 返回结果。
  182. """
  183. if self._prefer_cli:
  184. self._log("call path: cli(preferred)")
  185. return self._call_via_cli(params)
  186. try:
  187. self._log("call path: python rpc")
  188. return self.script.exports_sync.call(params)
  189. except Exception as e:
  190. msg = str(e)
  191. # In some embedded Gadget setups, Python session scripts miss Java bridge.
  192. if "Java is not defined" in msg or "ReferenceError: 'Java' is not defined" in msg:
  193. # Once detected, skip the slow exception path on later calls.
  194. self._prefer_cli = True
  195. self._log("python rpc missing Java bridge, switch to cli fallback")
  196. return self._call_via_cli(params)
  197. raise
  198. def _call_via_cli(self, params: Dict[str, Any]) -> Dict[str, Any]:
  199. """
  200. 使用 frida CLI 作为兜底方案执行一次 RPC 调用。
  201. 实现流程:
  202. 1) 拼接临时 JS(注入参数并调用 rpc.exports.call)
  203. 2) 通过 frida 命令行注入目标进程执行
  204. 3) 从标准输出解析约定的结果前缀
  205. Args:
  206. params: RPC 调用参数字典。
  207. Returns:
  208. Dict[str, Any]: RPC 返回结果。
  209. Raises:
  210. RuntimeError: CLI 调用失败或未解析到结果行。
  211. """
  212. js_src = SCRIPT_PATH.read_text(encoding="utf-8")
  213. params_json = json.dumps(params, ensure_ascii=False)
  214. wrapper = (
  215. "const __PARAMS = " + params_json + ";\n"
  216. + js_src
  217. + "\nsetImmediate(function(){\n"
  218. " rpc.exports.call(__PARAMS)\n"
  219. " .then(function(r){ console.log('[CODEC-RESULT]' + JSON.stringify(r)); })\n"
  220. " .catch(function(e){ console.log('[CODEC-ERROR]' + e); });\n"
  221. "});\n"
  222. )
  223. fd, tmp_path = tempfile.mkstemp(prefix="jhs_codec_", suffix=".js")
  224. os.close(fd)
  225. proc = None
  226. try:
  227. Path(tmp_path).write_text(wrapper, encoding="utf-8")
  228. cli_cmd = ["frida"]
  229. if self.device_id:
  230. cli_cmd.extend(["-D", self.device_id])
  231. else:
  232. cli_cmd.append("-U")
  233. cli_cmd.extend(["-N", self.package, "-l", tmp_path, "-q", "-t", str(self.cli_target_sec)])
  234. self._log("spawn cli: " + " ".join(cli_cmd))
  235. proc = subprocess.Popen(
  236. cli_cmd,
  237. stdout=subprocess.PIPE,
  238. stderr=subprocess.PIPE,
  239. text=True,
  240. encoding="utf-8",
  241. errors="replace",
  242. )
  243. deadline = time.time() + max(10, self.cli_target_sec + 8)
  244. out_lines = []
  245. err_lines = []
  246. while time.time() < deadline:
  247. if proc.stdout is None:
  248. break
  249. line = proc.stdout.readline()
  250. if line:
  251. line = line.rstrip("\r\n")
  252. out_lines.append(line)
  253. if line.startswith("[CODEC-RESULT]"):
  254. result = json.loads(line[len("[CODEC-RESULT]"):])
  255. if proc.poll() is None:
  256. proc.terminate()
  257. return result
  258. if line.startswith("[CODEC-ERROR]"):
  259. if proc.poll() is None:
  260. proc.terminate()
  261. raise RuntimeError(line)
  262. continue
  263. if proc.poll() is not None:
  264. break
  265. if proc.stderr is not None:
  266. err_lines.extend(proc.stderr.read().splitlines())
  267. out = "\n".join(out_lines)
  268. err = "\n".join(err_lines)
  269. raise RuntimeError(
  270. "cli codec call failed: no result line\n"
  271. + "stdout:\n" + out + "\n"
  272. + "stderr:\n" + err
  273. )
  274. finally:
  275. if proc is not None and proc.poll() is None:
  276. try:
  277. proc.terminate()
  278. except Exception:
  279. pass
  280. try:
  281. os.remove(tmp_path)
  282. except Exception:
  283. pass
  284. def encrypt_url(url: str, package: str = PKG, device_id: Optional[str] = None) -> Dict[str, Any]:
  285. """
  286. 便捷函数:创建临时客户端,执行 URL 加密并自动释放资源。
  287. Args:
  288. url: 原始请求 URL。
  289. package: 目标应用包名。
  290. device_id: Frida 设备 ID(如 emulator-5554),可不传。
  291. Returns:
  292. Dict[str, Any]: 加密结果字典。
  293. """
  294. with JhsRawCodecClient(package=package, device_id=device_id) as client:
  295. return client.encrypt(url)
  296. def decrypt_raw_data(
  297. request_url: str,
  298. response_raw_data: str,
  299. package: str = PKG,
  300. device_id: Optional[str] = None,
  301. ) -> Dict[str, Any]:
  302. """
  303. 便捷函数:创建临时客户端,执行 raw_data 解密并自动释放资源。
  304. Args:
  305. request_url: 包含 raw_data 参数的请求 URL。
  306. response_raw_data: 响应中的加密 raw_data。
  307. package: 目标应用包名。
  308. device_id: Frida 设备 ID(如 emulator-5554),可不传。
  309. Returns:
  310. Dict[str, Any]: 解密结果字典。
  311. """
  312. with JhsRawCodecClient(package=package, device_id=device_id) as client:
  313. return client.decrypt(request_url, response_raw_data)
  314. def call_codec(
  315. params: Dict[str, Any],
  316. package: str = PKG,
  317. device_id: Optional[str] = None,
  318. ) -> Dict[str, Any]:
  319. """
  320. 对外统一调用入口:根据 params["op"] 执行 enc/dec。
  321. Args:
  322. params: 调用参数字典,支持两种格式:
  323. enc: {"op": "enc", "url": "..."}
  324. dec: {"op": "dec", "request_url": "...", "response_raw_data": "..."}
  325. package: 目标应用包名。
  326. device_id: Frida 设备 ID(如 emulator-5554),可不传。
  327. 也可通过环境变量 FRIDA_DEVICE_ID 指定。
  328. Returns:
  329. Dict[str, Any]: 编解码结果字典。
  330. Raises:
  331. TypeError: 当 params 不是 dict 时抛出。
  332. """
  333. if not isinstance(params, dict):
  334. raise TypeError("params must be a dict")
  335. with JhsRawCodecClient(package=package, device_id=device_id) as client:
  336. return client.call(params)