Kaynağa Gözat

feat(jhs_rpc_spider): 增加集成基于frida的编解码RPC客户端

- 添加jhs_raw_codec_client.py,实现Frida RPC客户端封装
- 新增jhs_raw_codec_rpc.js,提供加密解密的Java层Hook逻辑
- 创建jhs_rpc_spider.py,结合MySQL连接池实现数据抓取和解析
- 设计调用流程,重用Frida客户端和requests Session提升效率
- 支持CLI降级调用,增强在Gadget等环境的兼容性
- 增加异常重试和日志详细记录,提升稳定性和调试能力
- 新增mysql_pool.py,使用dbutils实现MySQL连接池管理
- 配置文件application.yml添加MySQL连接默认参数
charley 1 hafta önce
ebeveyn
işleme
44650433fd

+ 143 - 0
jhs_rpc_spider/README.md

@@ -0,0 +1,143 @@
+# raw_codec_rpc
+
+基于 Frida RPC 的 `raw_data` 编解码调用封装,核心依赖 App 运行时中的 Java 层逻辑(`gc.b.b` / `gc.a.intercept`)。
+
+## 目录说明
+
+- `jhs_raw_codec_rpc.js`:Frida 侧 RPC 脚本(实现 enc/dec)
+- `jhs_raw_codec_client.py`:Python 客户端封装(设备连接、attach、RPC 调用、CLI 兜底)
+- `demo.py`:单页请求封装示例(可直接改为多页循环)
+- `requirements.txt`:Python 依赖清单
+
+## 1. 环境要求
+
+### 1.1 PC 端(Windows)
+
+- Python:3.10+(建议和你当前一致,3.10.8)
+- ADB:可用(`adb version` 正常)
+- Frida CLI:可用(`frida --version` 正常)
+- Python 包:`frida`、`frida-tools`、`requests`
+
+### 1.2 模拟器端(Android)
+
+- 已安装并可启动目标 App:`com.jihuanshe`
+- 模拟器可 ADB 连接
+- `frida-server` 已推送到设备并可执行
+- `frida-server` 版本需与 PC 侧 `frida` 主版本一致
+
+## 2. 安装步骤
+
+### 2.1 PC 端安装
+
+在 `scripts/raw_codec_rpc` 目录下执行:
+
+```bash
+python -m venv .venv
+.venv\Scripts\activate
+pip install -r requirements.txt
+```
+
+确认命令可用:
+
+```bash
+python -c "import frida, requests; print('ok')"
+frida --version
+adb version
+```
+
+### 2.2 模拟器端安装 frida-server
+
+1. 下载与你 PC 侧 frida 主版本匹配的 `frida-server-*-android-*.xz`
+2. 解压得到 `frida-server`
+3. 推送并授权:
+
+```bash
+adb -s <device_id> push frida-server /data/local/tmp/fs
+adb -s <device_id> shell "chmod 755 /data/local/tmp/fs"
+```
+
+4. 启动 frida-server:
+
+```bash
+adb -s <device_id> shell "/data/local/tmp/fs &"
+```
+
+5. 验证连接:
+
+```bash
+frida-ls-devices
+frida-ps -D <device_id>
+```
+
+## 3. 程序运行时的模拟器状态要求
+
+- 模拟器保持开机,不要休眠
+- 目标 App 已启动,并停留在前台页面(至少已完成初始化)
+- `frida-server` 保持运行(不要被系统回收)
+- 跑批量分页时,不要频繁切换 App 到后台
+
+## 4. 关键参数说明
+
+`JhsRawCodecClient(...)` 支持:
+
+- `device_id`:指定设备 ID(多模拟器必须传)
+- `cli_target_sec`:CLI 兜底模式的 frida `-t` 秒数(建议 `2` 或 `3`)
+
+可选环境变量(不强制):
+
+- `FRIDA_DEVICE_ID`
+- `FRIDA_CLI_TARGET_SEC`
+- `JHS_CODEC_DEBUG`(`1/true/on` 开启日志)
+
+## 5. 推荐调用方式(多页复用)
+
+核心原则:**client 和 Session 放在循环外,只创建一次。**
+
+```python
+from jhs_raw_codec_client import JhsRawCodecClient
+import requests
+
+page = 1
+with JhsRawCodecClient(device_id="25051FDD4S018P", cli_target_sec=2) as client:
+    with requests.Session() as sess:
+        while page < 100:
+            result = fetch_market_page(
+                page=page,
+                token=TOKEN,
+                client=client,
+                session=sess,
+                headers=HEADERS,
+            )
+            print(page, result["decoded"])
+            page += 1
+```
+
+## 6. 性能建议
+
+- 多页任务复用单个 `JhsRawCodecClient`
+- 多页任务复用单个 `requests.Session`
+- `cli_target_sec` 建议先设 `2`,不稳定再回退 `3`
+- 仅在排障时开启 `JHS_CODEC_DEBUG`
+
+## 7. 常见问题排查
+
+### 7.1 `unable to attach ...`
+
+- 确认 App 正在运行
+- 确认 `device_id` 传对
+- 确认 `frida-server` 正在设备里运行
+
+### 7.2 `Java is not defined`
+
+- 当前会自动降级到 CLI 兜底
+- 若仍慢,优先调低 `cli_target_sec`
+
+### 7.3 多模拟器连错设备
+
+- 显式传 `device_id`
+- 用 `adb devices` / `frida-ls-devices` 核对 ID
+
+## 8. 安全说明
+
+- `TOKEN` 建议不要硬编码在仓库,改为环境变量或外部配置
+- 避免在日志中输出完整 token/raw_data(生产环境)

+ 78 - 0
jhs_rpc_spider/YamlLoader.py

@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/12/22 10:44
+import os, re
+import yaml
+
+regex = re.compile(r'^\$\{(?P<ENV>[A-Z_\-]+:)?(?P<VAL>[\w.]+)}$')
+
+
+class YamlConfig:
+    def __init__(self, config):
+        self.config = config
+    
+    def get(self, key: str):
+        return YamlConfig(self.config.get(key))
+    
+    def getValueAsString(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return os.getenv(env, group['VAL'])
+            return None
+        except:
+            return self.config[key]
+    
+    def getValueAsInt(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return int(os.getenv(env, group['VAL']))
+            return 0
+        except:
+            return int(self.config[key])
+    
+    def getValueAsBool(self, key: str):
+        try:
+            match = regex.match(self.config[key])
+            group = match.groupdict()
+            if group['ENV'] is not None:
+                env = group['ENV'][:-1]
+                return bool(os.getenv(env, group['VAL']))
+            return False
+        except:
+            return bool(self.config[key])
+
+
+def readYaml(path: str = 'application.yml', profile: str = None) -> YamlConfig:
+    if os.path.exists(path):
+        with open(path) as fd:
+            conf = yaml.load(fd, Loader=yaml.FullLoader)
+    
+    if profile is not None:
+        result = path.split('.')
+        profiledYaml = f'{result[0]}-{profile}.{result[1]}'
+        if os.path.exists(profiledYaml):
+            with open(profiledYaml) as fd:
+                conf.update(yaml.load(fd, Loader=yaml.FullLoader))
+    
+    return YamlConfig(conf)
+
+# res = readYaml()
+# mysqlConf = res.get('mysql')
+# print(mysqlConf)
+
+# print(res.getValueAsString("host"))
+# mysqlYaml = mysqlConf.getValueAsString("host")
+# print(mysqlYaml)
+# host = mysqlYaml.get("host").split(':')[-1][:-1]
+# port = mysqlYaml.get("port").split(':')[-1][:-1]
+# username = mysqlYaml.get("username").split(':')[-1][:-1]
+# password = mysqlYaml.get("password").split(':')[-1][:-1]
+# mysql_db = mysqlYaml.get("db").split(':')[-1][:-1]
+# print(host,port,username,password)

+ 6 - 0
jhs_rpc_spider/application.yml

@@ -0,0 +1,6 @@
+mysql:
+  host: ${MYSQL_HOST:100.64.0.25}
+  port: ${MYSQL_PROT:3306}
+  username: ${MYSQL_USERNAME:crawler}
+  password: ${MYSQL_PASSWORD:Pass2022}
+  db: ${MYSQL_DATABASE:crawler}

+ 393 - 0
jhs_rpc_spider/jhs_raw_codec_client.py

@@ -0,0 +1,393 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+"""
+Parameter-based client for jhs_raw_codec_rpc.js
+
+Usage (import-based):
+    from raw_codec_rpc.jhs_raw_codec_client import call_codec
+
+    enc = call_codec({
+        "op": "enc",
+        "url": "https://api.jihuanshe.com/api/market/auction-products?sorting=completed&page=2&token=..."
+    })
+
+    dec = call_codec({
+        "op": "dec",
+        "request_url": "https://api.jihuanshe.com/api/market/banners?raw_data=...&token=...",
+        "response_raw_data": "BASE64_CIPHER"
+    })
+"""
+
+from pathlib import Path
+from typing import Any, Dict, Optional
+import time
+import json
+import os
+import subprocess
+import tempfile
+
+import frida
+
+
+PKG = "com.jihuanshe"
+SCRIPT_PATH = Path(__file__).with_name("jhs_raw_codec_rpc.js")
+ENV_DEVICE_ID = "FRIDA_DEVICE_ID"
+ENV_DEBUG = "JHS_CODEC_DEBUG"
+ENV_CLI_TARGET_SEC = "FRIDA_CLI_TARGET_SEC"
+
+
+class JhsRawCodecClient:
+    def _on_script_message(self, message, data) -> None:
+        msg_type = message.get("type")
+        if msg_type == "log":
+            level = message.get("level", "log")
+            payload = message.get("payload", "")
+            print(f"[frida:{level}] {payload}")
+            return
+        if msg_type == "error":
+            desc = message.get("description", "script error")
+            stack = message.get("stack")
+            print(f"[frida:error] {desc}")
+            if stack:
+                print(stack)
+            return
+        print(f"[frida:{msg_type}] {message}")
+
+    def _resolve_device(self, device_id: Optional[str]):
+        """
+        解析并返回 Frida 设备对象。
+
+        优先使用显式传入的 device_id;若为空则回退到 USB 设备。
+
+        Args:
+            device_id: 目标设备 ID,例如 emulator-5554。
+
+        Returns:
+            frida.core.Device: 已连接设备对象。
+        """
+        if device_id:
+            return frida.get_device_manager().get_device(device_id, timeout=5)
+        return frida.get_usb_device(timeout=5)
+
+    def _log(self, msg: str) -> None:
+        if self.debug:
+            print(f"[JhsRawCodecClient] {msg}")
+
+    def _find_pid_by_identifier(self) -> int:
+        """
+        通过应用标识符(package name)查找目标应用的进程 PID。
+
+        Returns:
+            int: 找到则返回 PID,未找到返回 0。
+        """
+        # Prefer app identifier lookup; attach("name") in frida-python matches process name.
+        try:
+            for app in self.device.enumerate_applications():
+                if app.identifier == self.package and app.pid:
+                    return int(app.pid)
+        except Exception:
+            pass
+        return 0
+
+    def _find_pid_by_process(self) -> int:
+        """
+        通过进程名查找目标进程 PID(作为 identifier 查找的兜底方案)。
+
+        Returns:
+            int: 找到则返回 PID,未找到返回 0。
+        """
+        try:
+            for p in self.device.enumerate_processes():
+                if p.name == self.package:
+                    return int(p.pid)
+        except Exception:
+            pass
+        return 0
+
+    def __init__(
+        self,
+        package: str = PKG,
+        device_id: Optional[str] = None,
+        cli_target_sec: Optional[int] = None,
+    ):
+        """
+        初始化客户端并附加到目标 App 进程,随后加载 RPC 脚本。
+
+        Args:
+            package: 目标应用包名,默认使用常量 PKG。
+            device_id: Frida 设备 ID。未传时读取环境变量 FRIDA_DEVICE_ID,
+                若仍为空则使用默认 USB 设备选择逻辑。
+            cli_target_sec: CLI 兜底模式的 frida `-t` 秒数。未传时读取
+                环境变量 FRIDA_CLI_TARGET_SEC,默认 3 秒。
+
+        Raises:
+            RuntimeError: 多次重试后仍无法附加到目标进程。
+        """
+        self.package = package
+        self.device_id = device_id or os.getenv(ENV_DEVICE_ID)
+        self.debug = os.getenv(ENV_DEBUG, "").strip().lower() in {"1", "true", "yes", "on"}
+        target_sec = cli_target_sec
+        if target_sec is None:
+            target_sec = int(os.getenv(ENV_CLI_TARGET_SEC, "3"))
+        self.cli_target_sec = max(1, int(target_sec))
+        self.device = self._resolve_device(self.device_id)
+        self.session = None
+        self._prefer_cli = False
+        last_err = None
+        for _ in range(6):
+            try:
+                pid = self._find_pid_by_identifier() or self._find_pid_by_process()
+                if pid:
+                    self.session = self.device.attach(pid)
+                    break
+                last_err = frida.ProcessNotFoundError(
+                    f"unable to find running app/process for '{self.package}'"
+                )
+                time.sleep(1.0)
+            except frida.ProcessNotFoundError as e:
+                last_err = e
+                time.sleep(1.0)
+        if self.session is None:
+            raise RuntimeError(
+                f"unable to attach '{self.package}', please open app and keep it running"
+            ) from last_err
+        code = SCRIPT_PATH.read_text(encoding="utf-8")
+        self.script = self.session.create_script(code)
+        self.script.on("message", self._on_script_message)
+        self.script.load()
+
+    def __enter__(self):
+        """上下文管理器入口,返回当前客户端实例。"""
+        return self
+
+    def __exit__(self, exc_type, exc, tb):
+        """上下文管理器退出时释放 Frida 资源。"""
+        self.close()
+        return False
+
+    def close(self) -> None:
+        """关闭并清理资源:卸载脚本、断开会话连接。"""
+        try:
+            self.script.unload()
+        except Exception:
+            pass
+        try:
+            self.session.detach()
+        except Exception:
+            pass
+
+    def encrypt(self, url: str) -> Dict[str, Any]:
+        """
+        调用 JS RPC 的 encrypt 方法,对请求 URL 进行加密处理。
+
+        Args:
+            url: 原始请求 URL。
+
+        Returns:
+            Dict[str, Any]: JS 侧返回的加密结果字典。
+        """
+        return self.script.exports_sync.encrypt(url)
+
+    def decrypt(self, request_url_with_raw_data: str, response_raw_data: str) -> Dict[str, Any]:
+        """
+        调用 JS RPC 的 decrypt 方法,对响应中的 raw_data 进行解密。
+
+        Args:
+            request_url_with_raw_data: 包含 raw_data 参数的请求 URL。
+            response_raw_data: 响应中的加密 raw_data 字符串。
+
+        Returns:
+            Dict[str, Any]: JS 侧返回的解密结果字典。
+        """
+        return self.script.exports_sync.decrypt(request_url_with_raw_data, response_raw_data)
+
+    def call(self, params: Dict[str, Any]) -> Dict[str, Any]:
+        """
+        统一调用入口,转发到 JS RPC 的 call 方法。
+
+        当 Python 会话环境缺失 Java bridge(如部分 Gadget 场景)时,
+        自动降级为 CLI 注入方式调用。
+
+        Args:
+            params: RPC 调用参数字典。
+
+        Returns:
+            Dict[str, Any]: RPC 返回结果。
+        """
+        if self._prefer_cli:
+            self._log("call path: cli(preferred)")
+            return self._call_via_cli(params)
+        try:
+            self._log("call path: python rpc")
+            return self.script.exports_sync.call(params)
+        except Exception as e:
+            msg = str(e)
+            # In some embedded Gadget setups, Python session scripts miss Java bridge.
+            if "Java is not defined" in msg or "ReferenceError: 'Java' is not defined" in msg:
+                # Once detected, skip the slow exception path on later calls.
+                self._prefer_cli = True
+                self._log("python rpc missing Java bridge, switch to cli fallback")
+                return self._call_via_cli(params)
+            raise
+
+    def _call_via_cli(self, params: Dict[str, Any]) -> Dict[str, Any]:
+        """
+        使用 frida CLI 作为兜底方案执行一次 RPC 调用。
+
+        实现流程:
+        1) 拼接临时 JS(注入参数并调用 rpc.exports.call)
+        2) 通过 frida 命令行注入目标进程执行
+        3) 从标准输出解析约定的结果前缀
+
+        Args:
+            params: RPC 调用参数字典。
+
+        Returns:
+            Dict[str, Any]: RPC 返回结果。
+
+        Raises:
+            RuntimeError: CLI 调用失败或未解析到结果行。
+        """
+        js_src = SCRIPT_PATH.read_text(encoding="utf-8")
+        params_json = json.dumps(params, ensure_ascii=False)
+        wrapper = (
+            "const __PARAMS = " + params_json + ";\n"
+            + js_src
+            + "\nsetImmediate(function(){\n"
+              "  rpc.exports.call(__PARAMS)\n"
+              "    .then(function(r){ console.log('[CODEC-RESULT]' + JSON.stringify(r)); })\n"
+              "    .catch(function(e){ console.log('[CODEC-ERROR]' + e); });\n"
+              "});\n"
+        )
+        fd, tmp_path = tempfile.mkstemp(prefix="jhs_codec_", suffix=".js")
+        os.close(fd)
+        proc = None
+        try:
+            Path(tmp_path).write_text(wrapper, encoding="utf-8")
+            cli_cmd = ["frida"]
+            if self.device_id:
+                cli_cmd.extend(["-D", self.device_id])
+            else:
+                cli_cmd.append("-U")
+            cli_cmd.extend(["-N", self.package, "-l", tmp_path, "-q", "-t", str(self.cli_target_sec)])
+            self._log("spawn cli: " + " ".join(cli_cmd))
+            proc = subprocess.Popen(
+                cli_cmd,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE,
+                text=True,
+                encoding="utf-8",
+                errors="replace",
+            )
+            deadline = time.time() + max(10, self.cli_target_sec + 8)
+            out_lines = []
+            err_lines = []
+
+            while time.time() < deadline:
+                if proc.stdout is None:
+                    break
+                line = proc.stdout.readline()
+                if line:
+                    line = line.rstrip("\r\n")
+                    out_lines.append(line)
+                    if line.startswith("[CODEC-RESULT]"):
+                        result = json.loads(line[len("[CODEC-RESULT]"):])
+                        if proc.poll() is None:
+                            proc.terminate()
+                        return result
+                    if line.startswith("[CODEC-ERROR]"):
+                        if proc.poll() is None:
+                            proc.terminate()
+                        raise RuntimeError(line)
+                    continue
+
+                if proc.poll() is not None:
+                    break
+
+            if proc.stderr is not None:
+                err_lines.extend(proc.stderr.read().splitlines())
+
+            out = "\n".join(out_lines)
+            err = "\n".join(err_lines)
+            raise RuntimeError(
+                "cli codec call failed: no result line\n"
+                + "stdout:\n" + out + "\n"
+                + "stderr:\n" + err
+            )
+        finally:
+            if proc is not None and proc.poll() is None:
+                try:
+                    proc.terminate()
+                except Exception:
+                    pass
+            try:
+                os.remove(tmp_path)
+            except Exception:
+                pass
+
+
+def encrypt_url(url: str, package: str = PKG, device_id: Optional[str] = None) -> Dict[str, Any]:
+    """
+    便捷函数:创建临时客户端,执行 URL 加密并自动释放资源。
+
+    Args:
+        url: 原始请求 URL。
+        package: 目标应用包名。
+        device_id: Frida 设备 ID(如 emulator-5554),可不传。
+
+    Returns:
+        Dict[str, Any]: 加密结果字典。
+    """
+    with JhsRawCodecClient(package=package, device_id=device_id) as client:
+        return client.encrypt(url)
+
+
+def decrypt_raw_data(
+    request_url: str,
+    response_raw_data: str,
+    package: str = PKG,
+    device_id: Optional[str] = None,
+) -> Dict[str, Any]:
+    """
+    便捷函数:创建临时客户端,执行 raw_data 解密并自动释放资源。
+
+    Args:
+        request_url: 包含 raw_data 参数的请求 URL。
+        response_raw_data: 响应中的加密 raw_data。
+        package: 目标应用包名。
+        device_id: Frida 设备 ID(如 emulator-5554),可不传。
+
+    Returns:
+        Dict[str, Any]: 解密结果字典。
+    """
+    with JhsRawCodecClient(package=package, device_id=device_id) as client:
+        return client.decrypt(request_url, response_raw_data)
+
+
+def call_codec(
+    params: Dict[str, Any],
+    package: str = PKG,
+    device_id: Optional[str] = None,
+) -> Dict[str, Any]:
+    """
+    对外统一调用入口:根据 params["op"] 执行 enc/dec。
+
+    Args:
+        params: 调用参数字典,支持两种格式:
+            enc: {"op": "enc", "url": "..."}
+            dec: {"op": "dec", "request_url": "...", "response_raw_data": "..."}
+        package: 目标应用包名。
+        device_id: Frida 设备 ID(如 emulator-5554),可不传。
+            也可通过环境变量 FRIDA_DEVICE_ID 指定。
+
+    Returns:
+        Dict[str, Any]: 编解码结果字典。
+
+    Raises:
+        TypeError: 当 params 不是 dict 时抛出。
+    """
+    if not isinstance(params, dict):
+        raise TypeError("params must be a dict")
+    with JhsRawCodecClient(package=package, device_id=device_id) as client:
+        return client.call(params)

+ 312 - 0
jhs_rpc_spider/jhs_raw_codec_rpc.js

@@ -0,0 +1,312 @@
+'use strict';
+
+var _state = {
+  ready: false,
+  B: null,
+  A: null,
+  bInst: null,
+  aInst: null,
+  RequestBuilder: null,
+  ResponseBuilder: null,
+  ResponseBody: null,
+  MediaType: null,
+  Protocol: null,
+  ChainEnc: null,
+  ChainDec: null,
+  capturedUrl: '',
+  jniProbeReady: false,
+  jniAttached: {}
+};
+
+var _JNI_NAME_RE = /(enc|encrypt|cipher|aes|codec|raw|sign|decode|decrypt)/i;
+
+function _tryReadUtf8(p) {
+  if (!p || p.isNull()) return '';
+  try { return Memory.readUtf8String(p); } catch (_) {}
+  return '';
+}
+
+function _attachNativeProbe(fnPtr, tag) {
+  if (!fnPtr || fnPtr.isNull()) return false;
+  var key = fnPtr.toString();
+  if (_state.jniAttached[key]) return false;
+  _state.jniAttached[key] = tag || 'native';
+  Interceptor.attach(fnPtr, {
+    onEnter: function (args) {
+      var input = _tryReadUtf8(args[0]);
+      if (!input) input = _tryReadUtf8(args[2]);
+      console.log("input:", input);
+      console.log("key_ptr:", args[1]);
+      try {
+        console.log("key:", Memory.readUtf8String(args[1]));
+      } catch(e){}
+    }
+  });
+  console.log('[jni-probe] attached:', _state.jniAttached[key], '@', key);
+  return true;
+}
+
+function _hookRegisterNatives(regPtr, label) {
+  if (!regPtr || regPtr.isNull()) return;
+  Interceptor.attach(regPtr, {
+    onEnter: function (args) {
+      try {
+        var methods = args[2];
+        var nMethods = args[3].toInt32();
+        if (!methods || methods.isNull()) return;
+        if (nMethods <= 0 || nMethods > 1024) return;
+        var step = Process.pointerSize * 3;
+        for (var i = 0; i < nMethods; i++) {
+          var item = methods.add(i * step);
+          var namePtr = item.readPointer();
+          var sigPtr = item.add(Process.pointerSize).readPointer();
+          var fnPtr = item.add(Process.pointerSize * 2).readPointer();
+          var name = _tryReadUtf8(namePtr);
+          var sig = _tryReadUtf8(sigPtr);
+          if (!_JNI_NAME_RE.test(name) && !_JNI_NAME_RE.test(sig)) continue;
+          _attachNativeProbe(fnPtr, name + sig);
+        }
+      } catch (_) {}
+    }
+  });
+  console.log('[jni-probe] hooked RegisterNatives:', label);
+}
+
+function ensureJniProbe() {
+  if (_state.jniProbeReady) return;
+  _state.jniProbeReady = true;
+
+  try {
+    var resolver = new ApiResolver('module');
+    var regs = resolver.enumerateMatchesSync('exports:*!RegisterNatives*');
+    for (var i = 0; i < regs.length; i++) {
+      _hookRegisterNatives(regs[i].address, regs[i].name);
+    }
+  } catch (e) {
+    console.log('[jni-probe] register hook failed:', e);
+  }
+
+  try {
+    var mods = Process.enumerateModulesSync();
+    for (var m = 0; m < mods.length; m++) {
+      var mod = mods[m];
+      if (mod.name.indexOf('.so') === -1) continue;
+      var exps = [];
+      try { exps = Module.enumerateExportsSync(mod.name); } catch (_) { continue; }
+      for (var j = 0; j < exps.length; j++) {
+        var exp = exps[j];
+        if (exp.type !== 'function') continue;
+        if (exp.name.indexOf('Java_') !== 0) continue;
+        if (!_JNI_NAME_RE.test(exp.name)) continue;
+        _attachNativeProbe(exp.address, exp.name + '@' + mod.name);
+      }
+    }
+  } catch (e2) {
+    console.log('[jni-probe] export scan failed:', e2);
+  }
+}
+
+function initJava() {
+  if (_state.ready) return;
+  _state.B = Java.use('gc.b');
+  _state.A = Java.use('gc.a');
+  _state.RequestBuilder = Java.use('okhttp3.Request$Builder');
+  _state.ResponseBuilder = Java.use('okhttp3.Response$Builder');
+  _state.ResponseBody = Java.use('okhttp3.ResponseBody');
+  _state.MediaType = Java.use('okhttp3.MediaType');
+  _state.Protocol = Java.use('okhttp3.Protocol');
+
+  _state.ChainEnc = Java.registerClass({
+    name: 'com.jhs.RawCodecEncChain',
+    implements: [Java.use('okhttp3.Interceptor$Chain')],
+    methods: {
+      request: function () { return this.req.value; },
+      proceed: function (req) {
+        _state.capturedUrl = req.url().toString();
+        var body = _state.ResponseBody.create.overload('okhttp3.MediaType', 'java.lang.String')
+          .call(_state.ResponseBody, _state.MediaType.parse('application/json; charset=utf-8'), '{}');
+        return _state.ResponseBuilder.$new()
+          .request(req)
+          .protocol(_state.Protocol.valueOf('HTTP_1_1'))
+          .code(200)
+          .message('OK')
+          .body(body)
+          .build();
+      },
+      connection: function () { return null; },
+      call: function () { return null; },
+      connectTimeoutMillis: function () { return 15000; },
+      readTimeoutMillis: function () { return 15000; },
+      writeTimeoutMillis: function () { return 15000; },
+      withConnectTimeout: function (_t, _u) { return this; },
+      withReadTimeout: function (_t, _u) { return this; },
+      withWriteTimeout: function (_t, _u) { return this; }
+    },
+    fields: { req: 'okhttp3.Request' }
+  });
+
+  _state.ChainDec = Java.registerClass({
+    name: 'com.jhs.RawCodecDecChain',
+    implements: [Java.use('okhttp3.Interceptor$Chain')],
+    methods: {
+      request: function () { return this.req.value; },
+      proceed: function (req) {
+        var json = '{"raw_data":"' + this.cipher.value + '"}';
+        var body = _state.ResponseBody.create.overload('okhttp3.MediaType', 'java.lang.String')
+          .call(_state.ResponseBody, _state.MediaType.parse('application/json; charset=utf-8'), json);
+        return _state.ResponseBuilder.$new()
+          .request(req)
+          .protocol(_state.Protocol.valueOf('HTTP_1_1'))
+          .code(200)
+          .message('OK')
+          .body(body)
+          .build();
+      },
+      connection: function () { return null; },
+      call: function () { return null; },
+      connectTimeoutMillis: function () { return 15000; },
+      readTimeoutMillis: function () { return 15000; },
+      writeTimeoutMillis: function () { return 15000; },
+      withConnectTimeout: function (_t, _u) { return this; },
+      withReadTimeout: function (_t, _u) { return this; },
+      withWriteTimeout: function (_t, _u) { return this; }
+    },
+    fields: {
+      req: 'okhttp3.Request',
+      cipher: 'java.lang.String'
+    }
+  });
+
+  _state.ready = true;
+}
+
+function getCachedInstance(cacheKey, C, className) {
+  if (_state[cacheKey] !== null) return _state[cacheKey];
+
+  // Fast path: directly construct once and reuse.
+  try {
+    _state[cacheKey] = C.$new();
+    return _state[cacheKey];
+  } catch (_) {}
+
+  // Slow fallback: heap-scan only once when constructor is unavailable.
+  var out = null;
+  Java.choose(className, {
+    onMatch: function (o) {
+      if (out === null) out = Java.retain(o);
+    },
+    onComplete: function () {}
+  });
+  if (out !== null) {
+    _state[cacheKey] = out;
+    return out;
+  }
+  throw new Error('unable to resolve instance for ' + className);
+}
+
+function extractRawData(url) {
+  var m = /[?&]raw_data=([^&#]+)/.exec(url);
+  if (!m) return '';
+  var v = m[1];
+  try { v = decodeURIComponent(v); } catch (_) {}
+  return v;
+}
+
+function encryptInner(url) {
+  ensureJniProbe();
+  initJava();
+  _state.capturedUrl = '';
+  var req = _state.RequestBuilder.$new().url(url).get().build();
+  var ch = _state.ChainEnc.$new();
+  ch.req.value = req;
+  var b = getCachedInstance('bInst', _state.B, 'gc.b');
+  b.b(req, ch);
+  var outUrl = _state.capturedUrl;
+  return {
+    ok: true,
+    input_url: url,
+    output_url: outUrl,
+    raw_data: extractRawData(outUrl)
+  };
+}
+
+function decryptInner(requestUrlWithRawData, responseRawData) {
+  initJava();
+  var req = _state.RequestBuilder.$new().url(requestUrlWithRawData).get().build();
+  var ch = _state.ChainDec.$new();
+  ch.req.value = req;
+  ch.cipher.value = responseRawData;
+  var a = getCachedInstance('aInst', _state.A, 'gc.a');
+  var resp = a.intercept(ch);
+  var body = resp.body();
+  var text = body ? body.string() : '';
+  var parsed = null;
+  var rawDataPlain = null;
+  try {
+    parsed = JSON.parse(text);
+    if (parsed && parsed.raw_data !== undefined) {
+      rawDataPlain = (typeof parsed.raw_data === 'string') ? parsed.raw_data : JSON.stringify(parsed.raw_data);
+    }
+  } catch (_) {}
+  return {
+    ok: true,
+    request_url: requestUrlWithRawData,
+    response_raw_data: responseRawData,
+    response_body: text,
+    raw_data_plain: rawDataPlain
+  };
+}
+
+rpc.exports = {
+  ping: function () {
+    return 'ok';
+  },
+  call: function (params) {
+    return new Promise(function (resolve, reject) {
+      Java.perform(function () {
+        try {
+          if (!params || typeof params !== 'object') {
+            throw new Error('params must be an object');
+          }
+          var op = ('' + (params.op || '')).toLowerCase().trim();
+          if (op === 'enc') {
+            if (!params.url) throw new Error("enc requires params.url");
+            resolve(encryptInner('' + params.url));
+            return;
+          }
+          if (op === 'dec') {
+            if (!params.request_url) throw new Error("dec requires params.request_url");
+            if (!params.response_raw_data) throw new Error("dec requires params.response_raw_data");
+            resolve(decryptInner('' + params.request_url, '' + params.response_raw_data));
+            return;
+          }
+          throw new Error("params.op must be 'enc' or 'dec'");
+        } catch (e) {
+          reject('call failed: ' + e);
+        }
+      });
+    });
+  },
+  encrypt: function (url) {
+    return new Promise(function (resolve, reject) {
+      Java.perform(function () {
+        try {
+          resolve(encryptInner(url));
+        } catch (e) {
+          reject('encrypt failed: ' + e);
+        }
+      });
+    });
+  },
+  decrypt: function (requestUrlWithRawData, responseRawData) {
+    return new Promise(function (resolve, reject) {
+      Java.perform(function () {
+        try {
+          resolve(decryptInner(requestUrlWithRawData, responseRawData));
+        } catch (e) {
+          reject('decrypt failed: ' + e);
+        }
+      });
+    });
+  }
+};

+ 255 - 0
jhs_rpc_spider/jhs_rpc_spider.py

@@ -0,0 +1,255 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2026/4/23 13:46
+import json
+import time
+import requests
+import inspect
+import schedule
+from loguru import logger
+from typing import Any, Dict
+from datetime import datetime
+from mysql_pool import MySQLConnectionPool
+from jhs_raw_codec_client import JhsRawCodecClient
+from tenacity import retry, stop_after_attempt, wait_fixed
+
+# TOKEN = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJlbnYiOiJwcm9kdWN0aW9uIiwic3ViIjoyODI3NDU4LCJpc3MiOiJodHRwOi8vYXBpLmppaHVhbnNoZS5jb20vYXBpL21hcmtldC9hdXRoL2xvZ2luLW9yLXNpZ251cCIsImlhdCI6MTc3NTYzNzQzNSwiZXhwIjoxNzgwODIxNDM1LCJuYmYiOjE3NzU2Mzc0MzUsImp0aSI6InhiT3NsdUJRTzVWeHRabHQifQ.uHz7M-U0ewPgi5Qzr5P4eJbSdIUO_i_hmVE-0jsaG2Y"
+DEVICE_ID = "25051FDD4S018P"
+CLI_TARGET_SEC = 2
+TIMEOUT_SEC = 15
+
+BASE_URL = "https://api.jihuanshe.com/api/market/auction-products"
+HEADERS = {
+    "User-Agent": "Model/google,Pixel5 OS/30 Version/3.36.2",
+    "Connection": "Keep-Alive",
+    "Accept-Encoding": "gzip",
+    "x-device-id": "6efe93931488e176",
+}
+
+# logger.remove()
+# logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
+#            format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
+#            level="DEBUG", retention="7 day")
+
+
+def after_log(retry_state):
+    """
+    retry 回调
+    :param retry_state: RetryCallState 对象
+    """
+    # 检查 args 是否存在且不为空
+    if retry_state.args and len(retry_state.args) > 0:
+        log = retry_state.args[0]  # 获取传入的 logger
+    else:
+        log = logger  # 使用全局 logger
+
+    if retry_state.outcome.failed:
+        log.warning(
+            f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
+    else:
+        log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
+
+
+@retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
+def get_proxys(log):
+    """
+    获取代理
+    :return: 代理
+    """
+    tunnel = "x371.kdltps.com:15818"
+    kdl_username = "t13753103189895"
+    kdl_password = "o0yefv6z"
+    try:
+        proxies = {
+            "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
+            "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
+        }
+        return proxies
+    except Exception as e:
+        log.error(f"Error getting proxy: {e}")
+        raise e
+
+
+def fetch_market_page(
+        log,
+        page: int,
+        token: str,
+        client: JhsRawCodecClient,
+        session: requests.Session,
+        headers: Dict[str, str],
+        timeout_sec: int = TIMEOUT_SEC,
+) -> Dict[str, Any]:
+    """
+    请求并解密单页数据。
+
+    复用方式:
+    - `client` 和 `session` 由外层创建一次并长期复用
+    - 调用本函数时只传不同 page 即可
+    """
+    log.info(f"Fetching page {page}......................")
+    url_for_enc = f"{BASE_URL}?sorting=completed&page={page}&token={token}"
+    enc = client.call({"op": "enc", "url": url_for_enc})
+    raw_data = enc["raw_data"]
+
+    resp = session.get(
+        BASE_URL,
+        headers=headers,
+        params={"raw_data": raw_data, "token": token},
+        timeout=timeout_sec,
+    )
+    resp.raise_for_status()
+    body = resp.json()
+    response_raw_data = body["raw_data"]
+
+    request_url_for_dec = f"{BASE_URL}?raw_data={raw_data}&token={token}"
+    dec = client.call(
+        {
+            "op": "dec",
+            "request_url": request_url_for_dec,
+            "response_raw_data": response_raw_data,
+        }
+    )
+    response_body = dec.get("response_body", "")
+
+    parsed: Any = response_body
+    if isinstance(response_body, str):
+        try:
+            parsed = json.loads(response_body)
+        except Exception:
+            log.error(f"Error parsing response body: {response_body}")
+            pass
+
+    return {
+        "page": page,
+        "enc": enc,
+        "http_json": body,
+        "dec": dec,
+        "decoded": parsed,
+    }
+
+
+def parse_data(resp_data, sql_pool):
+    """
+    解析数据
+    :param resp_data: 响应数据
+    :param sql_pool: 数据库连接池
+    """
+    data_list = resp_data.get("raw_data",{}).get("data", [])
+
+    info_list = []
+    for data in data_list:
+        seller_username = data.get("seller_username")
+
+        product_id = data.get("auction_product_id")
+        app_id = data.get("app_id")
+        auction_product_name = data.get("auction_product_name")
+        auction_product_images = data.get("auction_product_image")
+        game_key = data.get("game_key")
+        language_text = data.get("language_text")
+        authenticator_name = data.get("authenticator_name")
+        grading = data.get("grading")
+        starting_price = data.get("starting_price")
+        max_bid_price = data.get("max_bid_price")
+        status = data.get("status")
+
+        auction_product_start_timestamp = data.get('auction_product_start_timestamp')
+        auction_product_start_time = datetime.fromtimestamp(auction_product_start_timestamp).strftime(
+            '%Y-%m-%d %H:%M:%S') if auction_product_start_timestamp else None
+        auction_product_end_timestamp = data.get('auction_product_end_timestamp')
+        auction_product_end_time = datetime.fromtimestamp(auction_product_end_timestamp).strftime(
+            '%Y-%m-%d %H:%M:%S') if auction_product_end_timestamp else None
+
+        bid_count = data.get("bid_count")
+        card_number = data.get("number")
+        rarity = data.get("rarity")
+        data_dict = {
+            "seller_username": seller_username,
+            "product_id": product_id,
+            "app_id": app_id,
+            "auction_product_name": auction_product_name,
+            "auction_product_images": auction_product_images,
+            "game_key": game_key,
+            "language_text": language_text,
+            "authenticator_name": authenticator_name,
+            "grading": grading,
+            "starting_price": starting_price,
+            "max_bid_price": max_bid_price,
+            "status": status,
+            "auction_product_start_time": auction_product_start_time,
+            "auction_product_end_time": auction_product_end_time,
+            "bid_count": bid_count,
+            "card_number": card_number,
+            "rarity": rarity,
+        }
+        print(data_dict)
+        info_list.append(data_dict)
+
+    # if info_list:
+    #     sql_pool.insert_many(table="jhs_product_record", data_list=info_list, ignore=True)
+
+
+def get_market_list(log, token: str, sql_pool):
+    page = 1
+    max_page = 1000
+
+    with JhsRawCodecClient(device_id=DEVICE_ID, cli_target_sec=CLI_TARGET_SEC) as codec_client:
+        with requests.Session() as http_sess:
+            while page < max_page:
+                result = fetch_market_page(
+                    log=log,
+                    page=page,
+                    token=token,
+                    client=codec_client,
+                    session=http_sess,
+                    headers=HEADERS,
+                )
+                # print(page, result["decoded"])
+
+                try:
+                    parse_data(result["decoded"], sql_pool)
+                except Exception as e:
+                    log.error(f"Error parsing page {page}: {e}")
+
+                page += 1
+
+
+@retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
+def jhs_rpc_main(log):
+    """
+    主函数
+    :param log: logger对象
+    """
+    log.info(
+        f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
+
+    # 配置 MySQL 连接池
+    sql_pool = MySQLConnectionPool(log=log)
+    if not sql_pool:
+        log.error("MySQL数据库连接失败")
+        raise Exception("MySQL数据库连接失败")
+
+    try:
+        jhs_token = sql_pool.select_one('SELECT token FROM jhs_token WHERE id = 1')
+        get_market_list(log, jhs_token[0], sql_pool)
+    except Exception as e:
+        log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
+    finally:
+        log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
+
+
+
+def schedule_task():
+    """
+    设置定时任务
+    """
+    jhs_rpc_main(log=logger)
+
+    schedule.every().day.at("01:31").do(jhs_rpc_main, log=logger)
+    while True:
+        schedule.run_pending()
+        time.sleep(1)
+
+
+if __name__ == "__main__":
+    schedule_task()

+ 628 - 0
jhs_rpc_spider/mysql_pool.py

@@ -0,0 +1,628 @@
+# -*- coding: utf-8 -*-
+# Author : Charley
+# Python : 3.10.8
+# Date   : 2025/3/25 14:14
+import re
+import pymysql
+import YamlLoader
+from loguru import logger
+from dbutils.pooled_db import PooledDB
+
+# 获取yaml配置
+yaml = YamlLoader.readYaml()
+mysqlYaml = yaml.get("mysql")
+sql_host = mysqlYaml.getValueAsString("host")
+sql_port = mysqlYaml.getValueAsInt("port")
+sql_user = mysqlYaml.getValueAsString("username")
+sql_password = mysqlYaml.getValueAsString("password")
+sql_db = mysqlYaml.getValueAsString("db")
+
+
+class MySQLConnectionPool:
+    """
+    MySQL连接池
+    """
+
+    def __init__(self, mincached=1, maxcached=2, maxconnections=3, log=None):
+        """
+        初始化连接池
+        :param mincached: 初始化时,链接池中至少创建的链接,0表示不创建
+        :param maxcached: 池中空闲连接的最大数目(0 或 None 表示池大小不受限制)
+        :param maxconnections: 允许的最大连接数(0 或 None 表示任意数量的连接)
+        :param log: 自定义日志记录器
+        """
+        # 使用 loguru 的 logger,如果传入了其他 logger,则使用传入的 logger
+        self.log = log or logger
+        self.pool = PooledDB(
+            creator=pymysql,
+            mincached=mincached,
+            maxcached=maxcached,
+            maxconnections=maxconnections,
+            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
+            host=sql_host,
+            port=sql_port,
+            user=sql_user,
+            password=sql_password,
+            database=sql_db,
+            ping=1,  # 0:完全关闭(更快), 1:仅在取连接时检查, 2:每次执行前检查连接有效性,防止使用已断开的连接
+            connect_timeout=5,  # 连接超时时间(秒)
+            # read_timeout=30,  # 读取超时时间(秒)
+            write_timeout=30  # 写入超时时间(秒)
+        )
+
+    def _execute(self, query, args=None, commit=False):
+        """
+        执行SQL
+        :param query: SQL语句
+        :param args: SQL参数
+        :param commit: 是否提交事务
+        :return: 查询结果
+        """
+        try:
+            with self.pool.connection() as conn:
+                with conn.cursor() as cursor:
+                    cursor.execute(query, args)
+                    if commit:
+                        conn.commit()
+                    self.log.debug(f"sql _execute, Query: {query}, Rows: {cursor.rowcount}")
+                    return cursor
+        except Exception as e:
+            if commit and conn:
+                conn.rollback()
+            self.log.exception(f"Error executing query: {e}, Query: {query}, Args: {args}")
+            raise e
+
+    def select_one(self, query, args=None):
+        """
+        执行查询,返回单个结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchone()
+
+    def select_all(self, query, args=None):
+        """
+        执行查询,返回所有结果
+        :param query: 查询语句
+        :param args: 查询参数
+        :return: 查询结果
+        """
+        cursor = self._execute(query, args)
+        return cursor.fetchall()
+
+    def insert_one(self, query, args):
+        """
+        执行单条插入语句
+        :param query: 插入语句
+        :param args: 插入参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        cursor = self._execute(query, args, commit=True)
+        return cursor.lastrowid  # 返回插入的ID
+
+    def insert_all(self, query, args_list):
+        """
+        执行批量插入语句,如果失败则逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql insert_all, SQL: {query[:100]}..., Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_all 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except pymysql.err.IntegrityError as e:
+            if "Duplicate entry" in str(e):
+                conn.rollback()
+                self.log.warning(f"批量插入遇到重复,开始逐条插入。错误: {e}")
+                rowcount = 0
+                for args in args_list:
+                    try:
+                        self.insert_one(query, args)
+                        rowcount += 1
+                    except pymysql.err.IntegrityError as e2:
+                        if "Duplicate entry" in str(e2):
+                            self.log.debug(f"跳过重复条目: {e2}")
+                        else:
+                            self.log.error(f"插入失败: {e2}")
+                    except Exception as e2:
+                        self.log.error(f"插入失败: {e2}")
+                self.log.info(f"逐条插入完成: {rowcount}/{len(args_list)}条")
+            else:
+                conn.rollback()
+                self.log.exception(f"数据库完整性错误: {e}")
+                raise e
+        except Exception as e:
+            conn.rollback()
+            self.log.exception(f"批量插入失败: {e}")
+            raise e
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def insert_one_or_dict(self, table=None, data=None, query=None, args=None, commit=True, ignore=False):
+        """
+        单条插入(支持字典或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data: 字典数据 {列名: 值}
+        :param query: 直接SQL语句(与data二选一)
+        :param args: SQL参数(query使用时必需)
+        :param commit: 是否自动提交
+        :param ignore: 是否使用ignore
+        :return: 最后插入ID
+        """
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data.keys()])
+            values = ', '.join(['%s'] * len(data))
+
+            # 构建 INSERT IGNORE 语句
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args = tuple(data.values())
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        try:
+            cursor = self._execute(query, args, commit)
+            self.log.info(f"sql insert_one_or_dict, Table: {table}, Rows: {cursor.rowcount}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data insert_one_or_dict 入库中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+            return cursor.lastrowid
+        except pymysql.err.IntegrityError as e:
+            if "Duplicate entry" in str(e):
+                self.log.warning(f"插入失败:重复条目,已跳过。错误详情: {e}")
+                return -1  # 返回 -1 表示重复条目被跳过
+            else:
+                self.log.exception(f"数据库完整性错误: {e}")
+                raise
+        except Exception as e:
+            self.log.exception(f"未知错误: {e}")
+            raise
+
+    def insert_many(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
+                    ignore=False):
+        """
+        批量插入(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :param ignore: 是否使用ignore
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+
+            # 构建 INSERT IGNORE 语句
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+            except pymysql.err.IntegrityError as e:
+                # 处理唯一索引冲突
+                if "Duplicate entry" in str(e):
+                    if ignore:
+                        # 如果使用了 INSERT IGNORE,理论上不会进这里,但以防万一
+                        self.log.warning(f"批量插入遇到重复条目(ignore模式): {e}")
+                    else:
+                        # 没有使用 IGNORE,降级为逐条插入
+                        self.log.warning(f"批量插入遇到重复条目,开始逐条插入。错误: {e}")
+                        if commit:
+                            conn.rollback()
+                        
+                        rowcount = 0
+                        for j, args in enumerate(batch):
+                            try:
+                                if data_list:
+                                    # 字典模式
+                                    self.insert_one_or_dict(
+                                        table=table,
+                                        data=dict(zip(data_list[0].keys(), args)),
+                                        commit=commit,
+                                        ignore=False  # 单条插入时手动捕获重复
+                                    )
+                                else:
+                                    # 原始SQL模式
+                                    self.insert_one(query, args)
+                                rowcount += 1
+                            except pymysql.err.IntegrityError as e2:
+                                if "Duplicate entry" in str(e2):
+                                    self.log.debug(f"跳过重复条目[{i+j+1}]: {e2}")
+                                else:
+                                    self.log.error(f"插入失败[{i+j+1}]: {e2}")
+                            except Exception as e2:
+                                self.log.error(f"插入失败[{i+j+1}]: {e2}")
+                        total += rowcount
+                        self.log.info(f"批次逐条插入完成: 成功{rowcount}/{len(batch)}条")
+                else:
+                    # 其他完整性错误
+                    self.log.exception(f"数据库完整性错误: {e}")
+                    if commit:
+                        conn.rollback()
+                    raise e
+            except Exception as e:
+                # 其他数据库错误
+                self.log.exception(f"批量插入失败: {e}")
+                if commit:
+                    conn.rollback()
+                raise e
+        if table:
+            self.log.info(f"sql insert_many, Table: {table}, Total Rows: {total}")
+        else:
+            self.log.info(f"sql insert_many, Query: {query}, Total Rows: {total}")
+        return total
+
+    def insert_many_two(self, table=None, data_list=None, query=None, args_list=None, batch_size=1000, commit=True,
+                        ignore=False):
+        """
+        批量插入(支持字典列表或原始SQL) - 备用方法
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :param ignore: 是否使用INSERT IGNORE
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+            keys = ', '.join([self._safe_identifier(k) for k in data_list[0].keys()])
+            values = ', '.join(['%s'] * len(data_list[0]))
+            ignore_clause = "IGNORE" if ignore else ""
+            query = f"INSERT {ignore_clause} INTO {self._safe_identifier(table)} ({keys}) VALUES ({values})"
+            args_list = [tuple(d.values()) for d in data_list]
+        elif query is None:
+            raise ValueError("Either data_list or query must be provided")
+    
+        total = 0
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        if commit:
+                            conn.commit()
+                        total += cursor.rowcount
+            except pymysql.err.IntegrityError as e:
+                if "Duplicate entry" in str(e) and not ignore:
+                    self.log.warning(f"批量插入遇到重复,降级为逐条插入: {e}")
+                    if commit:
+                        conn.rollback()
+                    rowcount = 0
+                    for args in batch:
+                        try:
+                            self.insert_one(query, args)
+                            rowcount += 1
+                        except pymysql.err.IntegrityError as e2:
+                            if "Duplicate entry" in str(e2):
+                                self.log.debug(f"跳过重复条目: {e2}")
+                            else:
+                                self.log.error(f"插入失败: {e2}")
+                        except Exception as e2:
+                            self.log.error(f"插入失败: {e2}")
+                    total += rowcount
+                else:
+                    self.log.exception(f"数据库完整性错误: {e}")
+                    if commit:
+                        conn.rollback()
+                    raise e
+            except Exception as e:
+                self.log.exception(f"批量插入失败: {e}")
+                if commit:
+                    conn.rollback()
+                raise e
+        self.log.info(f"sql insert_many_two, Table: {table}, Total Rows: {total}")
+        return total
+
+    def insert_too_many(self, query, args_list, batch_size=1000):
+        """
+        执行批量插入语句,分片提交, 单次插入大于十万+时可用, 如果失败则降级为逐条插入
+        :param query: 插入语句
+        :param args_list: 插入参数列表
+        :param batch_size: 每次插入的条数
+        """
+        self.log.info(f"sql insert_too_many, Query: {query}, Total Rows: {len(args_list)}")
+        for i in range(0, len(args_list), batch_size):
+            batch = args_list[i:i + batch_size]
+            try:
+                with self.pool.connection() as conn:
+                    with conn.cursor() as cursor:
+                        cursor.executemany(query, batch)
+                        conn.commit()
+                        self.log.debug(f"insert_too_many -> Total Rows: {len(batch)}")
+            except Exception as e:
+                self.log.error(f"insert_too_many error. Trying single insert. Error: {e}")
+                # 当前批次降级为单条插入
+                for args in batch:
+                    self.insert_one(query, args)
+
+    def update_one(self, query, args):
+        """
+        执行单条更新语句
+        :param query: 更新语句
+        :param args: 更新参数
+        """
+        self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_one 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        return self._execute(query, args, commit=True)
+
+    def update_all(self, query, args_list):
+        """
+        执行批量更新语句,如果失败则逐条更新
+        :param query: 更新语句
+        :param args_list: 更新参数列表
+        """
+        conn = None
+        cursor = None
+        try:
+            conn = self.pool.connection()
+            cursor = conn.cursor()
+            cursor.executemany(query, args_list)
+            conn.commit()
+            self.log.debug(f"sql update_all, SQL: {query}, Rows: {len(args_list)}")
+            self.log.info('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>data update_all 更新中>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
+        except Exception as e:
+            conn.rollback()
+            self.log.error(f"Error executing query: {e}")
+            # 如果批量更新失败,则逐条更新
+            rowcount = 0
+            for args in args_list:
+                self.update_one(query, args)
+                rowcount += 1
+            self.log.debug(f'Batch update failed. Updated {rowcount} rows individually.')
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    def update_one_or_dict(self, table=None, data=None, condition=None, query=None, args=None, commit=True):
+        """
+        单条更新(支持字典或原始SQL)
+        :param table: 表名(字典模式必需)
+        :param data: 字典数据 {列名: 值}(与 query 二选一)
+        :param condition: 更新条件,支持以下格式:
+            - 字典: {"id": 1} → "WHERE id = %s"
+            - 字符串: "id = 1" → "WHERE id = 1"(需自行确保安全)
+            - 元组: ("id = %s", [1]) → "WHERE id = %s"(参数化查询)
+        :param query: 直接SQL语句(与 data 二选一)
+        :param args: SQL参数(query 模式下必需)
+        :param commit: 是否自动提交
+        :return: 影响行数
+        :raises: ValueError 参数校验失败时抛出
+        """
+        # 参数校验
+        if data is not None:
+            if not isinstance(data, dict):
+                raise ValueError("Data must be a dictionary")
+            if table is None:
+                raise ValueError("Table name is required for dictionary update")
+            if condition is None:
+                raise ValueError("Condition is required for dictionary update")
+
+            # 构建 SET 子句
+            set_clause = ", ".join([f"{self._safe_identifier(k)} = %s" for k in data.keys()])
+            set_values = list(data.values())
+
+            # 解析条件
+            condition_clause, condition_args = self._parse_condition(condition)
+            query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
+            args = set_values + condition_args
+
+        elif query is None:
+            raise ValueError("Either data or query must be provided")
+
+        # 执行更新
+        cursor = self._execute(query, args, commit)
+        # self.log.debug(
+        #     f"Updated table={table}, rows={cursor.rowcount}, query={query[:100]}...",
+        #     extra={"table": table, "rows": cursor.rowcount}
+        # )
+        return cursor.rowcount
+
+    def _parse_condition(self, condition):
+        """
+        解析条件为 (clause, args) 格式
+        :param condition: 字典/字符串/元组
+        :return: (str, list) SQL 子句和参数列表
+        """
+        if isinstance(condition, dict):
+            clause = " AND ".join([f"{self._safe_identifier(k)} = %s" for k in condition.keys()])
+            args = list(condition.values())
+        elif isinstance(condition, str):
+            clause = condition  # 注意:需调用方确保安全
+            args = []
+        elif isinstance(condition, (tuple, list)) and len(condition) == 2:
+            clause, args = condition[0], condition[1]
+            if not isinstance(args, (list, tuple)):
+                args = [args]
+        else:
+            raise ValueError("Condition must be dict/str/(clause, args)")
+        return clause, args
+
+    def update_many(self, table=None, data_list=None, condition_list=None, query=None, args_list=None, batch_size=500,
+                    commit=True):
+        """
+        批量更新(支持字典列表或原始SQL)
+        :param table: 表名(字典插入时必需)
+        :param data_list: 字典列表 [{列名: 值}]
+        :param condition_list: 条件列表(必须为字典,与data_list等长)
+        :param query: 直接SQL语句(与data_list二选一)
+        :param args_list: SQL参数列表(query使用时必需)
+        :param batch_size: 分批大小
+        :param commit: 是否自动提交
+        :return: 影响行数
+        """
+        if data_list is not None:
+            if not data_list or not isinstance(data_list[0], dict):
+                raise ValueError("Data_list must be a non-empty list of dictionaries")
+            if condition_list is None or len(data_list) != len(condition_list):
+                raise ValueError("Condition_list must be provided and match the length of data_list")
+            if not all(isinstance(cond, dict) for cond in condition_list):
+                raise ValueError("All elements in condition_list must be dictionaries")
+
+            # 获取第一个数据项和条件项的键
+            first_data_keys = set(data_list[0].keys())
+            first_cond_keys = set(condition_list[0].keys())
+
+            # 构造基础SQL
+            set_clause = ', '.join([self._safe_identifier(k) + ' = %s' for k in data_list[0].keys()])
+            condition_clause = ' AND '.join([self._safe_identifier(k) + ' = %s' for k in condition_list[0].keys()])
+            base_query = f"UPDATE {self._safe_identifier(table)} SET {set_clause} WHERE {condition_clause}"
+            total = 0
+
+            # 分批次处理
+            for i in range(0, len(data_list), batch_size):
+                batch_data = data_list[i:i + batch_size]
+                batch_conds = condition_list[i:i + batch_size]
+                batch_args = []
+
+                # 检查当前批次的结构是否一致
+                can_batch = True
+                for data, cond in zip(batch_data, batch_conds):
+                    data_keys = set(data.keys())
+                    cond_keys = set(cond.keys())
+                    if data_keys != first_data_keys or cond_keys != first_cond_keys:
+                        can_batch = False
+                        break
+                    batch_args.append(tuple(data.values()) + tuple(cond.values()))
+
+                if not can_batch:
+                    # 结构不一致,转为单条更新
+                    for data, cond in zip(batch_data, batch_conds):
+                        self.update_one_or_dict(table=table, data=data, condition=cond, commit=commit)
+                        total += 1
+                    continue
+
+                # 执行批量更新
+                try:
+                    with self.pool.connection() as conn:
+                        with conn.cursor() as cursor:
+                            cursor.executemany(base_query, batch_args)
+                            if commit:
+                                conn.commit()
+                            total += cursor.rowcount
+                            self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
+                except Exception as e:
+                    if commit:
+                        conn.rollback()
+                    self.log.error(f"Batch update failed: {e}")
+                    # 降级为单条更新
+                    for args, data, cond in zip(batch_args, batch_data, batch_conds):
+                        try:
+                            self._execute(base_query, args, commit=commit)
+                            total += 1
+                        except Exception as e2:
+                            self.log.error(f"Single update failed: {e2}, Data: {data}, Condition: {cond}")
+            self.log.info(f"Total updated rows: {total}")
+            return total
+        elif query is not None:
+            # 处理原始SQL和参数列表
+            if args_list is None:
+                raise ValueError("args_list must be provided when using query")
+
+            total = 0
+            for i in range(0, len(args_list), batch_size):
+                batch_args = args_list[i:i + batch_size]
+                try:
+                    with self.pool.connection() as conn:
+                        with conn.cursor() as cursor:
+                            cursor.executemany(query, batch_args)
+                            if commit:
+                                conn.commit()
+                            total += cursor.rowcount
+                            self.log.debug(f"Batch update succeeded. Rows: {cursor.rowcount}")
+                except Exception as e:
+                    if commit:
+                        conn.rollback()
+                    self.log.error(f"Batch update failed: {e}")
+                    # 降级为单条更新
+                    for args in batch_args:
+                        try:
+                            self._execute(query, args, commit=commit)
+                            total += 1
+                        except Exception as e2:
+                            self.log.error(f"Single update failed: {e2}, Args: {args}")
+            self.log.info(f"Total updated rows: {total}")
+            return total
+        else:
+            raise ValueError("Either data_list or query must be provided")
+
+    def check_pool_health(self):
+        """
+        检查连接池中有效连接数
+
+        # 使用示例
+        # 配置 MySQL 连接池
+        sql_pool = MySQLConnectionPool(log=log)
+        if not sql_pool.check_pool_health():
+            log.error("数据库连接池异常")
+            raise RuntimeError("数据库连接池异常")
+        """
+        try:
+            with self.pool.connection() as conn:
+                conn.ping(reconnect=True)
+                return True
+        except Exception as e:
+            self.log.error(f"Connection pool health check failed: {e}")
+            return False
+
+    def close(self):
+        """
+        关闭连接池,释放所有连接
+        """
+        try:
+            if hasattr(self, 'pool') and self.pool:
+                self.pool.close()
+                self.log.info("数据库连接池已关闭")
+        except Exception as e:
+            self.log.error(f"关闭连接池失败: {e}")
+
+    @staticmethod
+    def _safe_identifier(name):
+        """SQL标识符安全校验"""
+        if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name):
+            raise ValueError(f"Invalid SQL identifier: {name}")
+        return name
+
+
+if __name__ == '__main__':
+    sql_pool = MySQLConnectionPool()
+    data_dic = {'card_type_id': 111, 'card_type_name': '补充包 继承的意志【OPC-13】', 'card_type_position': 964,
+                'card_id': 5284, 'card_name': '蒙奇·D·路飞', 'card_number': 'OP13-001', 'card_rarity': 'L',
+                'card_img': 'https://source.windoent.com/OnePiecePc/Picture/1757929283612OP13-001.png',
+                'card_life': '4', 'card_attribute': '打', 'card_power': '5000', 'card_attack': '-',
+                'card_color': '红/绿', 'subscript': 4, 'card_features': '超新星/草帽一伙',
+                'card_text_desc': '【咚!!×1】【对方的攻击时】我方处于活跃状态的咚!!不多于5张的场合,可以将我方任意张数的咚!!转为休息状态。每有1张转为休息状态的咚!!,本次战斗中,此领袖或我方最多1张拥有《草帽一伙》特征的角色力量+2000。',
+                'card_offer_type': '补充包 继承的意志【OPC-13】', 'crawler_language': '简中'}
+    sql_pool.insert_one_or_dict(table="one_piece_record", data=data_dic)