jhs_rpc_spider.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2026/4/23 13:46
  5. import json
  6. import time
  7. import requests
  8. import inspect
  9. import schedule
  10. from loguru import logger
  11. from typing import Any, Dict
  12. from datetime import datetime
  13. from mysql_pool import MySQLConnectionPool
  14. from jhs_raw_codec_client import JhsRawCodecClient
  15. from tenacity import retry, stop_after_attempt, wait_fixed
  16. # TOKEN = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJlbnYiOiJwcm9kdWN0aW9uIiwic3ViIjoyODI3NDU4LCJpc3MiOiJodHRwOi8vYXBpLmppaHVhbnNoZS5jb20vYXBpL21hcmtldC9hdXRoL2xvZ2luLW9yLXNpZ251cCIsImlhdCI6MTc3NTYzNzQzNSwiZXhwIjoxNzgwODIxNDM1LCJuYmYiOjE3NzU2Mzc0MzUsImp0aSI6InhiT3NsdUJRTzVWeHRabHQifQ.uHz7M-U0ewPgi5Qzr5P4eJbSdIUO_i_hmVE-0jsaG2Y"
  17. DEVICE_ID = "25051FDD4S018P"
  18. CLI_TARGET_SEC = 2
  19. TIMEOUT_SEC = 15
  20. BASE_URL = "https://api.jihuanshe.com/api/market/auction-products"
  21. HEADERS = {
  22. "User-Agent": "Model/google,Pixel5 OS/30 Version/3.36.2",
  23. "Connection": "Keep-Alive",
  24. "Accept-Encoding": "gzip",
  25. "x-device-id": "6efe93931488e176",
  26. }
  27. # logger.remove()
  28. # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  29. # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  30. # level="DEBUG", retention="7 day")
  31. def after_log(retry_state):
  32. """
  33. retry 回调
  34. :param retry_state: RetryCallState 对象
  35. """
  36. # 检查 args 是否存在且不为空
  37. if retry_state.args and len(retry_state.args) > 0:
  38. log = retry_state.args[0] # 获取传入的 logger
  39. else:
  40. log = logger # 使用全局 logger
  41. if retry_state.outcome.failed:
  42. log.warning(
  43. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  44. else:
  45. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  46. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  47. def get_proxys(log):
  48. """
  49. 获取代理
  50. :return: 代理
  51. """
  52. tunnel = "x371.kdltps.com:15818"
  53. kdl_username = "t13753103189895"
  54. kdl_password = "o0yefv6z"
  55. try:
  56. proxies = {
  57. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
  58. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
  59. }
  60. return proxies
  61. except Exception as e:
  62. log.error(f"Error getting proxy: {e}")
  63. raise e
  64. def fetch_market_page(
  65. log,
  66. page: int,
  67. token: str,
  68. client: JhsRawCodecClient,
  69. session: requests.Session,
  70. headers: Dict[str, str],
  71. timeout_sec: int = TIMEOUT_SEC,
  72. ) -> Dict[str, Any]:
  73. """
  74. 请求并解密单页数据。
  75. 复用方式:
  76. - `client` 和 `session` 由外层创建一次并长期复用
  77. - 调用本函数时只传不同 page 即可
  78. """
  79. log.info(f"Fetching page {page}......................")
  80. url_for_enc = f"{BASE_URL}?sorting=completed&page={page}&token={token}"
  81. enc = client.call({"op": "enc", "url": url_for_enc})
  82. raw_data = enc["raw_data"]
  83. resp = session.get(
  84. BASE_URL,
  85. headers=headers,
  86. params={"raw_data": raw_data, "token": token},
  87. timeout=timeout_sec,
  88. )
  89. resp.raise_for_status()
  90. body = resp.json()
  91. response_raw_data = body["raw_data"]
  92. request_url_for_dec = f"{BASE_URL}?raw_data={raw_data}&token={token}"
  93. dec = client.call(
  94. {
  95. "op": "dec",
  96. "request_url": request_url_for_dec,
  97. "response_raw_data": response_raw_data,
  98. }
  99. )
  100. response_body = dec.get("response_body", "")
  101. parsed: Any = response_body
  102. if isinstance(response_body, str):
  103. try:
  104. parsed = json.loads(response_body)
  105. except Exception:
  106. log.error(f"Error parsing response body: {response_body}")
  107. pass
  108. return {
  109. "page": page,
  110. "enc": enc,
  111. "http_json": body,
  112. "dec": dec,
  113. "decoded": parsed,
  114. }
  115. def parse_data(resp_data, sql_pool):
  116. """
  117. 解析数据
  118. :param resp_data: 响应数据
  119. :param sql_pool: 数据库连接池
  120. """
  121. data_list = resp_data.get("raw_data",{}).get("data", [])
  122. info_list = []
  123. for data in data_list:
  124. seller_username = data.get("seller_username")
  125. product_id = data.get("auction_product_id")
  126. app_id = data.get("app_id")
  127. auction_product_name = data.get("auction_product_name")
  128. auction_product_images = data.get("auction_product_image")
  129. game_key = data.get("game_key")
  130. language_text = data.get("language_text")
  131. authenticator_name = data.get("authenticator_name")
  132. grading = data.get("grading")
  133. starting_price = data.get("starting_price")
  134. max_bid_price = data.get("max_bid_price")
  135. status = data.get("status")
  136. auction_product_start_timestamp = data.get('auction_product_start_timestamp')
  137. auction_product_start_time = datetime.fromtimestamp(auction_product_start_timestamp).strftime(
  138. '%Y-%m-%d %H:%M:%S') if auction_product_start_timestamp else None
  139. auction_product_end_timestamp = data.get('auction_product_end_timestamp')
  140. auction_product_end_time = datetime.fromtimestamp(auction_product_end_timestamp).strftime(
  141. '%Y-%m-%d %H:%M:%S') if auction_product_end_timestamp else None
  142. bid_count = data.get("bid_count")
  143. card_number = data.get("number")
  144. rarity = data.get("rarity")
  145. data_dict = {
  146. "seller_username": seller_username,
  147. "product_id": product_id,
  148. "app_id": app_id,
  149. "auction_product_name": auction_product_name,
  150. "auction_product_images": auction_product_images,
  151. "game_key": game_key,
  152. "language_text": language_text,
  153. "authenticator_name": authenticator_name,
  154. "grading": grading,
  155. "starting_price": starting_price,
  156. "max_bid_price": max_bid_price,
  157. "status": status,
  158. "auction_product_start_time": auction_product_start_time,
  159. "auction_product_end_time": auction_product_end_time,
  160. "bid_count": bid_count,
  161. "card_number": card_number,
  162. "rarity": rarity,
  163. }
  164. print(data_dict)
  165. info_list.append(data_dict)
  166. # if info_list:
  167. # sql_pool.insert_many(table="jhs_product_record", data_list=info_list, ignore=True)
  168. def get_market_list(log, token: str, sql_pool):
  169. page = 1
  170. max_page = 1000
  171. with JhsRawCodecClient(device_id=DEVICE_ID, cli_target_sec=CLI_TARGET_SEC) as codec_client:
  172. with requests.Session() as http_sess:
  173. while page < max_page:
  174. result = fetch_market_page(
  175. log=log,
  176. page=page,
  177. token=token,
  178. client=codec_client,
  179. session=http_sess,
  180. headers=HEADERS,
  181. )
  182. # print(page, result["decoded"])
  183. try:
  184. parse_data(result["decoded"], sql_pool)
  185. except Exception as e:
  186. log.error(f"Error parsing page {page}: {e}")
  187. page += 1
  188. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  189. def jhs_rpc_main(log):
  190. """
  191. 主函数
  192. :param log: logger对象
  193. """
  194. log.info(
  195. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  196. # 配置 MySQL 连接池
  197. sql_pool = MySQLConnectionPool(log=log)
  198. if not sql_pool:
  199. log.error("MySQL数据库连接失败")
  200. raise Exception("MySQL数据库连接失败")
  201. try:
  202. jhs_token = sql_pool.select_one('SELECT token FROM jhs_token WHERE id = 1')
  203. get_market_list(log, jhs_token[0], sql_pool)
  204. except Exception as e:
  205. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  206. finally:
  207. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  208. def schedule_task():
  209. """
  210. 设置定时任务
  211. """
  212. jhs_rpc_main(log=logger)
  213. schedule.every().day.at("01:31").do(jhs_rpc_main, log=logger)
  214. while True:
  215. schedule.run_pending()
  216. time.sleep(1)
  217. if __name__ == "__main__":
  218. schedule_task()