jhs_rpc_spider.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2026/4/23 13:46
  5. import json
  6. import time
  7. import requests
  8. import inspect
  9. import schedule
  10. from loguru import logger
  11. from typing import Any, Dict
  12. from datetime import datetime
  13. from mysql_pool import MySQLConnectionPool
  14. from jhs_raw_codec_client import JhsRawCodecClient
  15. from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
  16. # TOKEN = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJlbnYiOiJwcm9kdWN0aW9uIiwic3ViIjoyODI3NDU4LCJpc3MiOiJodHRwOi8vYXBpLmppaHVhbnNoZS5jb20vYXBpL21hcmtldC9hdXRoL2xvZ2luLW9yLXNpZ251cCIsImlhdCI6MTc3NTYzNzQzNSwiZXhwIjoxNzgwODIxNDM1LCJuYmYiOjE3NzU2Mzc0MzUsImp0aSI6InhiT3NsdUJRTzVWeHRabHQifQ.uHz7M-U0ewPgi5Qzr5P4eJbSdIUO_i_hmVE-0jsaG2Y"
  17. # DEVICE_ID = "127.0.0.1:5557" # adb connect 127.0.0.1:5557
  18. DEVICE_ID = "25051FDD4S018P" # adb connect 127.0.0.1:5557
  19. CLI_TARGET_SEC = 2
  20. TIMEOUT_SEC = 15
  21. BASE_URL = "https://api.jihuanshe.com/api/market/auction-products"
  22. HEADERS = {
  23. "User-Agent": "Model/google,Pixel5 OS/30 Version/3.36.2",
  24. "Connection": "Keep-Alive",
  25. "Accept-Encoding": "gzip",
  26. "x-device-id": "6efe93931488e176",
  27. }
  28. logger.remove()
  29. logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  30. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  31. level="DEBUG", retention="7 day")
  32. def after_log(retry_state):
  33. """
  34. retry 回调
  35. :param retry_state: RetryCallState 对象
  36. """
  37. # 检查 args 是否存在且不为空
  38. if retry_state.args and len(retry_state.args) > 0:
  39. log = retry_state.args[0] # 获取传入的 logger
  40. else:
  41. log = logger # 使用全局 logger
  42. if retry_state.outcome.failed:
  43. log.warning(
  44. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  45. else:
  46. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  47. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  48. def get_proxys(log):
  49. """
  50. 获取代理
  51. :return: 代理
  52. """
  53. tunnel = "x371.kdltps.com:15818"
  54. kdl_username = "t13753103189895"
  55. kdl_password = "o0yefv6z"
  56. try:
  57. proxies = {
  58. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
  59. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
  60. }
  61. return proxies
  62. except Exception as e:
  63. log.error(f"Error getting proxy: {e}")
  64. raise e
  65. @retry(stop=stop_after_attempt(3), wait=wait_fixed(2), retry=retry_if_exception_type(json.JSONDecodeError), after=after_log)
  66. def fetch_market_page(
  67. log,
  68. page: int,
  69. token: str,
  70. client: JhsRawCodecClient,
  71. session: requests.Session,
  72. headers: Dict[str, str],
  73. timeout_sec: int = TIMEOUT_SEC,
  74. ) -> Dict[str, Any]:
  75. """
  76. 请求并解密单页数据。
  77. 复用方式:
  78. - `client` 和 `session` 由外层创建一次并长期复用
  79. - 调用本函数时只传不同 page 即可
  80. """
  81. log.info(f"Fetching page {page}......................")
  82. url_for_enc = f"{BASE_URL}?sorting=completed&page={page}&token={token}"
  83. enc = client.call({"op": "enc", "url": url_for_enc})
  84. raw_data = enc["raw_data"]
  85. resp = session.get(
  86. BASE_URL,
  87. headers=headers,
  88. params={"raw_data": raw_data, "token": token},
  89. timeout=timeout_sec,
  90. )
  91. resp.raise_for_status()
  92. body = resp.json()
  93. response_raw_data = body["raw_data"]
  94. request_url_for_dec = f"{BASE_URL}?raw_data={raw_data}&token={token}"
  95. dec = client.call(
  96. {
  97. "op": "dec",
  98. "request_url": request_url_for_dec,
  99. "response_raw_data": response_raw_data,
  100. }
  101. )
  102. response_body = dec.get("response_body", "")
  103. parsed: Any = response_body
  104. if isinstance(response_body, str):
  105. try:
  106. parsed = json.loads(response_body)
  107. except Exception:
  108. log.error(f"Error parsing response body: {response_body}")
  109. pass
  110. return {
  111. "page": page,
  112. "enc": enc,
  113. "http_json": body,
  114. "dec": dec,
  115. "decoded": parsed,
  116. }
  117. def parse_data(resp_data, sql_pool):
  118. """
  119. 解析数据
  120. :param resp_data: 响应数据
  121. :param sql_pool: 数据库连接池
  122. """
  123. data_list = resp_data.get("raw_data",{}).get("data", [])
  124. info_list = []
  125. for data in data_list:
  126. seller_username = data.get("seller_username")
  127. product_id = data.get("auction_product_id")
  128. app_id = data.get("app_id")
  129. auction_product_name = data.get("auction_product_name")
  130. auction_product_images = data.get("auction_product_image")
  131. game_key = data.get("game_key")
  132. language_text = data.get("language_text")
  133. authenticator_name = data.get("authenticator_name")
  134. grading = data.get("grading")
  135. starting_price = data.get("starting_price")
  136. max_bid_price = data.get("max_bid_price")
  137. status = data.get("status")
  138. auction_product_start_timestamp = data.get('auction_product_start_timestamp')
  139. auction_product_start_time = datetime.fromtimestamp(auction_product_start_timestamp).strftime(
  140. '%Y-%m-%d %H:%M:%S') if auction_product_start_timestamp else None
  141. auction_product_end_timestamp = data.get('auction_product_end_timestamp')
  142. auction_product_end_time = datetime.fromtimestamp(auction_product_end_timestamp).strftime(
  143. '%Y-%m-%d %H:%M:%S') if auction_product_end_timestamp else None
  144. bid_count = data.get("bid_count")
  145. card_number = data.get("number")
  146. rarity = data.get("rarity")
  147. data_dict = {
  148. "seller_username": seller_username,
  149. "product_id": product_id,
  150. "app_id": app_id,
  151. "auction_product_name": auction_product_name,
  152. "auction_product_images": auction_product_images,
  153. "game_key": game_key,
  154. "language_text": language_text,
  155. "authenticator_name": authenticator_name,
  156. "grading": grading,
  157. "starting_price": starting_price,
  158. "max_bid_price": max_bid_price,
  159. "status": status,
  160. "auction_product_start_time": auction_product_start_time,
  161. "auction_product_end_time": auction_product_end_time,
  162. "bid_count": bid_count,
  163. "card_number": card_number,
  164. "rarity": rarity,
  165. }
  166. # print(data_dict)
  167. # print(type(data))
  168. info_list.append(data_dict)
  169. if info_list:
  170. sql_pool.insert_many(table="jhs_product_record", data_list=info_list, ignore=True)
  171. def get_market_list(log, token: str, sql_pool):
  172. page = 1
  173. max_page = 800
  174. with JhsRawCodecClient(device_id=DEVICE_ID, cli_target_sec=CLI_TARGET_SEC) as codec_client:
  175. with requests.Session() as http_sess:
  176. while page < max_page:
  177. try:
  178. result = fetch_market_page(
  179. log=log,
  180. page=page,
  181. token=token,
  182. client=codec_client,
  183. session=http_sess,
  184. headers=HEADERS,
  185. )
  186. # print(page, result["decoded"])
  187. try:
  188. parse_data(result["decoded"], sql_pool)
  189. except Exception as e:
  190. log.error(f"Error parsing page {page}: {e}")
  191. except Exception as e:
  192. log.error(f"Error fetching page {page}: {e}")
  193. page += 1
  194. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  195. def jhs_rpc_main(log):
  196. """
  197. 主函数
  198. :param log: logger对象
  199. """
  200. log.info(
  201. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  202. # 配置 MySQL 连接池
  203. sql_pool = MySQLConnectionPool(log=log)
  204. if not sql_pool:
  205. log.error("MySQL数据库连接失败")
  206. raise Exception("MySQL数据库连接失败")
  207. try:
  208. jhs_token = sql_pool.select_one('SELECT token FROM jhs_token WHERE id = 1')
  209. get_market_list(log, jhs_token[0], sql_pool)
  210. except Exception as e:
  211. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  212. finally:
  213. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  214. def schedule_task():
  215. """
  216. 设置定时任务
  217. """
  218. # jhs_rpc_main(log=logger)
  219. schedule.every().day.at("05:00").do(jhs_rpc_main, log=logger)
  220. while True:
  221. schedule.run_pending()
  222. time.sleep(1)
  223. if __name__ == "__main__":
  224. schedule_task()
  225. # jhs_rpc_main(log=logger)