| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- # -*- coding: utf-8 -*-
- # DPoP (RFC 9449) Token 生成器
- # 用于 Mercari API 的 DPoP Proof JWT 动态生成
- import json
- import time
- import uuid
- import base64
- from cryptography.hazmat.primitives.asymmetric import ec
- from cryptography.hazmat.primitives import hashes
- from cryptography.hazmat.backends import default_backend
- def _base64url_encode(data: bytes) -> str:
- return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
- def _int_to_base64url(n: int, length: int) -> str:
- return _base64url_encode(n.to_bytes(length, byteorder="big"))
- class DPoPGenerator:
- """
- 生成 Mercari DPoP Proof JWT (ES256 / P-256)。
- 用法:
- gen = DPoPGenerator(device_uuid="a00429c5-ad26-4be4-83ae-60b7239e14d5")
- list_dpop = gen.generate(
- htu="https://api.mercari.jp/v2/entities:search",
- htm="POST",
- )
- detail_dpop = gen.generate(
- htu="https://api.mercari.jp/items/get",
- htm="GET",
- )
- """
- def __init__(self, device_uuid: str = "a00429c5-ad26-4be4-83ae-60b7239e14d5"):
- self.device_uuid = device_uuid
- self._private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
- self._public_key = self._private_key.public_key()
- pub_numbers = self._public_key.public_numbers()
- self._jwk = {
- "crv": "P-256",
- "kty": "EC",
- "x": _int_to_base64url(pub_numbers.x, 32),
- "y": _int_to_base64url(pub_numbers.y, 32),
- }
- self._header_b64 = _base64url_encode(
- json.dumps(
- {"typ": "dpop+jwt", "alg": "ES256", "jwk": self._jwk},
- separators=(",", ":"),
- ).encode()
- )
- def generate(self, htu: str, htm: str) -> str:
- payload = {
- "iat": int(time.time()),
- "jti": str(uuid.uuid4()),
- "htu": htu,
- "htm": htm,
- "uuid": self.device_uuid,
- }
- payload_b64 = _base64url_encode(
- json.dumps(payload, separators=(",", ":")).encode()
- )
- signing_input = f"{self._header_b64}.{payload_b64}".encode("ascii")
- der_sig = self._private_key.sign(signing_input, ec.ECDSA(hashes.SHA256()))
- # DER -> raw r||s (各 32 字节)
- r, s = _decode_der_signature(der_sig)
- raw_sig = r.to_bytes(32, "big") + s.to_bytes(32, "big")
- return f"{self._header_b64}.{payload_b64}.{_base64url_encode(raw_sig)}"
- def _decode_der_signature(der_bytes: bytes) -> tuple[int, int]:
- from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
- return decode_dss_signature(der_bytes)
- # ── 便捷函数 ──────────────────────────────────────────────
- _default_gen = None
- def _get_generator(device_uuid: str) -> DPoPGenerator:
- global _default_gen
- if _default_gen is None or _default_gen.device_uuid != device_uuid:
- _default_gen = DPoPGenerator(device_uuid=device_uuid)
- return _default_gen
- def generate_list_dpop(
- device_uuid: str = "a00429c5-ad26-4be4-83ae-60b7239e14d5",
- ) -> str:
- return _get_generator(device_uuid).generate(
- htu="https://api.mercari.jp/v2/entities:search",
- htm="POST",
- )
- def generate_detail_dpop(
- device_uuid: str = "a00429c5-ad26-4be4-83ae-60b7239e14d5",
- ) -> str:
- return _get_generator(device_uuid).generate(
- htu="https://api.mercari.jp/items/get",
- htm="GET",
- )
- if __name__ == "__main__":
- gen = DPoPGenerator()
- list_token = gen.generate(
- htu="https://api.mercari.jp/v2/entities:search", htm="POST"
- )
- detail_token = gen.generate(
- htu="https://api.mercari.jp/items/get", htm="GET"
- )
- print("LIST_DPOP =", list_token)
- print()
- print("DETAIL_DPOP =", detail_token)
|