# -*- coding: utf-8 -*- # DPoP (RFC 9449) Token 生成器 # 用于 Mercari API 的 DPoP Proof JWT 动态生成 import json import time import uuid import base64 from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend def _base64url_encode(data: bytes) -> str: return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") def _int_to_base64url(n: int, length: int) -> str: return _base64url_encode(n.to_bytes(length, byteorder="big")) class DPoPGenerator: """ 生成 Mercari DPoP Proof JWT (ES256 / P-256)。 用法: gen = DPoPGenerator(device_uuid="a00429c5-ad26-4be4-83ae-60b7239e14d5") list_dpop = gen.generate( htu="https://api.mercari.jp/v2/entities:search", htm="POST", ) detail_dpop = gen.generate( htu="https://api.mercari.jp/items/get", htm="GET", ) """ def __init__(self, device_uuid: str = "a00429c5-ad26-4be4-83ae-60b7239e14d5"): self.device_uuid = device_uuid self._private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) self._public_key = self._private_key.public_key() pub_numbers = self._public_key.public_numbers() self._jwk = { "crv": "P-256", "kty": "EC", "x": _int_to_base64url(pub_numbers.x, 32), "y": _int_to_base64url(pub_numbers.y, 32), } self._header_b64 = _base64url_encode( json.dumps( {"typ": "dpop+jwt", "alg": "ES256", "jwk": self._jwk}, separators=(",", ":"), ).encode() ) def generate(self, htu: str, htm: str) -> str: payload = { "iat": int(time.time()), "jti": str(uuid.uuid4()), "htu": htu, "htm": htm, "uuid": self.device_uuid, } payload_b64 = _base64url_encode( json.dumps(payload, separators=(",", ":")).encode() ) signing_input = f"{self._header_b64}.{payload_b64}".encode("ascii") der_sig = self._private_key.sign(signing_input, ec.ECDSA(hashes.SHA256())) # DER -> raw r||s (各 32 字节) r, s = _decode_der_signature(der_sig) raw_sig = r.to_bytes(32, "big") + s.to_bytes(32, "big") return f"{self._header_b64}.{payload_b64}.{_base64url_encode(raw_sig)}" def _decode_der_signature(der_bytes: bytes) -> tuple[int, int]: from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature return decode_dss_signature(der_bytes) # ── 便捷函数 ────────────────────────────────────────────── _default_gen = None def _get_generator(device_uuid: str) -> DPoPGenerator: global _default_gen if _default_gen is None or _default_gen.device_uuid != device_uuid: _default_gen = DPoPGenerator(device_uuid=device_uuid) return _default_gen def generate_list_dpop( device_uuid: str = "a00429c5-ad26-4be4-83ae-60b7239e14d5", ) -> str: return _get_generator(device_uuid).generate( htu="https://api.mercari.jp/v2/entities:search", htm="POST", ) def generate_detail_dpop( device_uuid: str = "a00429c5-ad26-4be4-83ae-60b7239e14d5", ) -> str: return _get_generator(device_uuid).generate( htu="https://api.mercari.jp/items/get", htm="GET", ) if __name__ == "__main__": gen = DPoPGenerator() list_token = gen.generate( htu="https://api.mercari.jp/v2/entities:search", htm="POST" ) detail_token = gen.generate( htu="https://api.mercari.jp/items/get", htm="GET" ) print("LIST_DPOP =", list_token) print() print("DETAIL_DPOP =", detail_token)