dpop_generator.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. # -*- coding: utf-8 -*-
  2. # DPoP (RFC 9449) Token 生成器
  3. # 用于 Mercari API 的 DPoP Proof JWT 动态生成
  4. import json
  5. import time
  6. import uuid
  7. import base64
  8. from cryptography.hazmat.primitives.asymmetric import ec
  9. from cryptography.hazmat.primitives import hashes
  10. from cryptography.hazmat.backends import default_backend
  11. def _base64url_encode(data: bytes) -> str:
  12. return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
  13. def _int_to_base64url(n: int, length: int) -> str:
  14. return _base64url_encode(n.to_bytes(length, byteorder="big"))
  15. class DPoPGenerator:
  16. """
  17. 生成 Mercari DPoP Proof JWT (ES256 / P-256)。
  18. 用法:
  19. gen = DPoPGenerator(device_uuid="a00429c5-ad26-4be4-83ae-60b7239e14d5")
  20. list_dpop = gen.generate(
  21. htu="https://api.mercari.jp/v2/entities:search",
  22. htm="POST",
  23. )
  24. detail_dpop = gen.generate(
  25. htu="https://api.mercari.jp/items/get",
  26. htm="GET",
  27. )
  28. """
  29. def __init__(self, device_uuid: str = "a00429c5-ad26-4be4-83ae-60b7239e14d5"):
  30. self.device_uuid = device_uuid
  31. self._private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
  32. self._public_key = self._private_key.public_key()
  33. pub_numbers = self._public_key.public_numbers()
  34. self._jwk = {
  35. "crv": "P-256",
  36. "kty": "EC",
  37. "x": _int_to_base64url(pub_numbers.x, 32),
  38. "y": _int_to_base64url(pub_numbers.y, 32),
  39. }
  40. self._header_b64 = _base64url_encode(
  41. json.dumps(
  42. {"typ": "dpop+jwt", "alg": "ES256", "jwk": self._jwk},
  43. separators=(",", ":"),
  44. ).encode()
  45. )
  46. def generate(self, htu: str, htm: str) -> str:
  47. payload = {
  48. "iat": int(time.time()),
  49. "jti": str(uuid.uuid4()),
  50. "htu": htu,
  51. "htm": htm,
  52. "uuid": self.device_uuid,
  53. }
  54. payload_b64 = _base64url_encode(
  55. json.dumps(payload, separators=(",", ":")).encode()
  56. )
  57. signing_input = f"{self._header_b64}.{payload_b64}".encode("ascii")
  58. der_sig = self._private_key.sign(signing_input, ec.ECDSA(hashes.SHA256()))
  59. # DER -> raw r||s (各 32 字节)
  60. r, s = _decode_der_signature(der_sig)
  61. raw_sig = r.to_bytes(32, "big") + s.to_bytes(32, "big")
  62. return f"{self._header_b64}.{payload_b64}.{_base64url_encode(raw_sig)}"
  63. def _decode_der_signature(der_bytes: bytes) -> tuple[int, int]:
  64. from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
  65. return decode_dss_signature(der_bytes)
  66. # ── 便捷函数 ──────────────────────────────────────────────
  67. _default_gen = None
  68. def _get_generator(device_uuid: str) -> DPoPGenerator:
  69. global _default_gen
  70. if _default_gen is None or _default_gen.device_uuid != device_uuid:
  71. _default_gen = DPoPGenerator(device_uuid=device_uuid)
  72. return _default_gen
  73. def generate_list_dpop(
  74. device_uuid: str = "a00429c5-ad26-4be4-83ae-60b7239e14d5",
  75. ) -> str:
  76. return _get_generator(device_uuid).generate(
  77. htu="https://api.mercari.jp/v2/entities:search",
  78. htm="POST",
  79. )
  80. def generate_detail_dpop(
  81. device_uuid: str = "a00429c5-ad26-4be4-83ae-60b7239e14d5",
  82. ) -> str:
  83. return _get_generator(device_uuid).generate(
  84. htu="https://api.mercari.jp/items/get",
  85. htm="GET",
  86. )
  87. if __name__ == "__main__":
  88. gen = DPoPGenerator()
  89. list_token = gen.generate(
  90. htu="https://api.mercari.jp/v2/entities:search", htm="POST"
  91. )
  92. detail_token = gen.generate(
  93. htu="https://api.mercari.jp/items/get", htm="GET"
  94. )
  95. print("LIST_DPOP =", list_token)
  96. print()
  97. print("DETAIL_DPOP =", detail_token)