clove_blind_box_spider.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/7/29 15:06
  5. import json
  6. import time
  7. import requests
  8. # from curl_cffi import requests
  9. import inspect
  10. import schedule
  11. from loguru import logger
  12. from parsel import Selector
  13. from tenacity import retry, stop_after_attempt, wait_fixed
  14. from mysql_pool import MySQLConnectionPool
  15. # logger.remove()
  16. # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  17. # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  18. # level="DEBUG", retention="7 day")
  19. def after_log(retry_state):
  20. """
  21. retry 回调
  22. :param retry_state: RetryCallState 对象
  23. """
  24. # 检查 args 是否存在且不为空
  25. if retry_state.args and len(retry_state.args) > 0:
  26. log = retry_state.args[0] # 获取传入的 logger
  27. else:
  28. log = logger # 使用全局 logger
  29. if retry_state.outcome.failed:
  30. log.warning(
  31. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  32. else:
  33. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  34. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  35. def get_proxys(log):
  36. """
  37. 获取代理
  38. :return: 代理
  39. """
  40. tunnel = "x371.kdltps.com:15818"
  41. kdl_username = "t13753103189895"
  42. kdl_password = "o0yefv6z"
  43. try:
  44. proxies = {
  45. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
  46. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
  47. }
  48. return proxies
  49. except Exception as e:
  50. log.error(f"Error getting proxy: {e}")
  51. raise e
  52. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  53. def get_blind_box_list(log):
  54. log.debug(f"{inspect.currentframe().f_code.co_name}---------------------- 开始获取盲盒列表 ----------------------")
  55. headers = {
  56. "accept": "application/graphql-response+json, application/graphql+json, application/json, text/event-stream, multipart/mixed",
  57. "content-type": "application/json",
  58. "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"
  59. }
  60. url = "https://api.prd.oripa.clove.jp/graphql"
  61. data = {
  62. "operationName": "orderedOripas",
  63. "query": "query orderedOripas($orderedOripasInput: OrderedOripasInput!) {\n orderedOripas(orderedOripasInput: $orderedOripasInput) {\n category\n isR18\n hasLastOne\n id\n isDaily\n isAppraised\n isUserLimited\n requiredRankName\n name\n nameLogo\n nameHidden\n price\n publishStatus\n quantity\n remaining\n roundNumber\n subImages\n thumbnail\n extraPrizeThreshold\n video {\n id\n title\n videoData\n deleted\n __typename\n }\n oripaSearchTargetPrizes {\n searchTargetPrize {\n titleJa\n __typename\n }\n __typename\n }\n openAt\n __typename\n }\n}",
  64. "variables": {
  65. "orderedOripasInput": {
  66. "isR18": False,
  67. "sort": "LOW_REMAINING_RATE" # 剩余数量
  68. }
  69. }
  70. }
  71. response = requests.post(url, headers=headers, json=data, timeout=60, proxies=get_proxys(log))
  72. # response = requests.post(url, headers=headers, json=data, timeout=60)
  73. # print(response.text)
  74. response.raise_for_status()
  75. return response.json()
  76. def parse_list(log, resp_data, sql_pool):
  77. log.debug(f"{inspect.currentframe().f_code.co_name} start..........")
  78. items = resp_data["data"]["orderedOripas"]
  79. log.info("获取到%s个数据" % len(items))
  80. if not items:
  81. log.debug("没有数据")
  82. return
  83. info_list = []
  84. for item in items:
  85. # print(item)
  86. pid = item.get("id")
  87. category = item.get("category")
  88. title = item.get("name")
  89. price = item.get("price")
  90. publishStatus = item.get("publishStatus")
  91. quantity = item.get("quantity") # 总数量
  92. remaining = item.get("remaining") # 剩余数量
  93. # thumbnail有三种 {}, [], ''
  94. tag_image = item.get("thumbnail")
  95. if isinstance(tag_image, dict):
  96. # 处理字典类型的thumbnail,从中提取ja字段的值
  97. ja_images = tag_image.get("ja")
  98. if isinstance(ja_images, list) and len(ja_images) > 0:
  99. image = ja_images[0] # 取第一个图片链接
  100. elif isinstance(ja_images, str):
  101. image = ja_images
  102. else:
  103. image = ''
  104. elif isinstance(tag_image, list):
  105. # 处理列表类型的thumbnail
  106. if len(tag_image) > 0:
  107. image = tag_image[0]
  108. else:
  109. image = ''
  110. elif isinstance(tag_image, str):
  111. # 处理字符串类型的thumbnail
  112. image = tag_image
  113. else:
  114. image = ''
  115. # subImages
  116. tag_subImages = item.get("subImages")
  117. if isinstance(tag_subImages, dict):
  118. # 处理字典类型的thumbnail,从中提取ja字段的值
  119. ja_subimages = tag_subImages.get("ja")
  120. if isinstance(ja_subimages, list) and len(ja_subimages) > 0:
  121. sub_image = ja_subimages[0] # 取第一个图片链接
  122. elif isinstance(ja_subimages, str):
  123. sub_image = ja_subimages
  124. else:
  125. sub_image = ''
  126. elif isinstance(tag_subImages, list):
  127. # 处理列表类型的thumbnail
  128. if len(tag_subImages) > 0:
  129. sub_image = tag_subImages[0]
  130. else:
  131. sub_image = ''
  132. elif isinstance(tag_subImages, str):
  133. # 处理字符串类型的thumbnail
  134. sub_image = tag_subImages
  135. else:
  136. sub_image = ''
  137. open_at = item.get("openAt")
  138. data_dict = {
  139. "pid": pid,
  140. "category": category,
  141. "title": title,
  142. "price": price,
  143. "publish_status": publishStatus,
  144. "quantity": quantity,
  145. "remaining": remaining,
  146. "image": image,
  147. "sub_image": sub_image,
  148. "open_at": open_at
  149. }
  150. # print(data_dict)
  151. info_list.append(data_dict)
  152. # 插入数据库
  153. if info_list:
  154. try:
  155. sql_pool.insert_many(table="clove_blind_box_list_record", data_list=info_list, ignore=True)
  156. # sql = "INSERT IGNORE INTO clove_blind_box_list_record (pid, category, title, price, publish_status, quantity, remaining, image, sub_image, open_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
  157. # sql_pool.insert_all(sql, info_list)
  158. except Exception as e:
  159. log.warning(f"{inspect.currentframe().f_code.co_name}, {e[:500]}")
  160. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  161. def get_blind_box_detail(log, pid, sql_pool):
  162. headers = {
  163. "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
  164. "referer": "https://oripa.clove.jp/oripa/All",
  165. "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"
  166. }
  167. # url = "https://oripa.clove.jp/oripa/All/cmca1kd0i0001s601a4y07n2b"
  168. url = f"https://oripa.clove.jp/oripa/All/{pid}"
  169. # response = requests.get(url, headers=headers, cookies=cookies)
  170. response = requests.get(url, headers=headers, timeout=60, proxies=get_proxys(log))
  171. # print(response.text)
  172. selector = Selector(text=response.text)
  173. tag_shang = selector.xpath('//script[@id="__NEXT_DATA__"]/text()').get()
  174. if not tag_shang:
  175. log.debug(f"{inspect.currentframe().f_code.co_name} 没有 tag_shang 数据..............")
  176. # 更改 task 表状态为 2
  177. sql_pool.update_one_or_dict(table="clove_blind_box_task", data={"state": 2}, condition={"pid": pid})
  178. return
  179. json_data = json.loads(tag_shang)
  180. if json_data:
  181. displayedPrizes = json_data.get('props', {}).get('pageProps', {}).get('oripa', {}).get("displayedPrizes", [])
  182. log.debug(f"{inspect.currentframe().f_code.co_name} 获取到 {len(displayedPrizes)} 条数据..............")
  183. if not displayedPrizes:
  184. log.debug(f"{inspect.currentframe().f_code.co_name} 没有 displayedPrizes 数据..............")
  185. # 更改 task 表状态为 2
  186. sql_pool.update_one_or_dict(table="clove_blind_box_task", data={"state": 2}, condition={"pid": pid})
  187. return
  188. info_list = []
  189. for item in displayedPrizes:
  190. prize_id = item.get("id")
  191. prizeType = item.get("prizeType")
  192. quantity = item.get("quantity")
  193. mainDescription = item.get("mainDescription")
  194. mainDescriptionEn = item.get("mainDescriptionEn")
  195. subDescription = item.get("subDescription")
  196. kataban = item.get("kataban")
  197. imageUrl = item.get("imageUrl")
  198. prize_condition = item.get("condition")
  199. isReferencePriceTarget = item.get("isReferencePriceTarget")
  200. referencePrice = item.get("referencePrice")
  201. referencePriceUpdatedAt = item.get("referencePriceUpdatedAt")
  202. data_dict = {
  203. "pid": pid,
  204. "prize_id": prize_id, # 奖品id
  205. "prize_type": prizeType, # 奖品类型
  206. "quantity": quantity, # 数量
  207. "main_description": mainDescription, # 奖品描述
  208. "main_description_en": mainDescriptionEn,
  209. "sub_description": subDescription, # 卡片等级
  210. "kataban": kataban, # 卡标签
  211. "image_url": imageUrl, # 图片地址
  212. "prize_condition": prize_condition, # 奖品评分, PSA10
  213. "is_reference_price_target": isReferencePriceTarget, # (疑似)是否开封状态 (1: 是, 0: 否)
  214. "reference_price": referencePrice, # 参考价格
  215. "reference_price_updated_at": referencePriceUpdatedAt # 参考价格更新时间
  216. }
  217. # print(data_dict)
  218. info_list.append(data_dict)
  219. if info_list:
  220. try:
  221. sql_pool.insert_many(table="clove_blind_box_detail_record", data_list=info_list, ignore=True)
  222. # 更新 task 表状态为 1
  223. sql_pool.update_one_or_dict(table="clove_blind_box_task", data={"state": 1}, condition={"pid": pid})
  224. except Exception as e:
  225. log.warning(f"{inspect.currentframe().f_code.co_name}, {e[:500]}")
  226. else:
  227. log.debug(f"{inspect.currentframe().f_code.co_name} 请求出错, 没有数据..............")
  228. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  229. def blind_main(log):
  230. """
  231. 主函数
  232. :param log: logger对象
  233. """
  234. log.info(
  235. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务.................................................')
  236. # 配置 MySQL 连接池
  237. sql_pool = MySQLConnectionPool(log=log)
  238. if not sql_pool.check_pool_health():
  239. log.error("数据库连接池异常")
  240. raise RuntimeError("数据库连接池异常")
  241. try:
  242. try:
  243. resp_data = get_blind_box_list(log)
  244. parse_list(log, resp_data, sql_pool)
  245. except Exception as e2:
  246. log.error(f"Request get_blind_box_list error: {e2}")
  247. # 先把新增的任务添加到任务表, 查询每日新增的 pid, 去重并插入 task 表
  248. sql_select_pids = sql_pool.select_all(
  249. # "SELECT DISTINCT pid FROM clove_blind_box_list_record WHERE DATE(gmt_create_time) = CURDATE() - INTERVAL 1 DAY")
  250. "SELECT DISTINCT pid FROM clove_blind_box_list_record WHERE DATE(gmt_create_time) = CURDATE()")
  251. sql_select_pids = [i[0] for i in sql_select_pids]
  252. sql_pool.insert_many(
  253. query="INSERT INTO clove_blind_box_task (pid) VALUES (%s) ON DUPLICATE KEY UPDATE pid=VALUES(pid)",
  254. args_list=sql_select_pids,
  255. ignore=True
  256. )
  257. time.sleep(5)
  258. # 获取详情
  259. sql_pids = sql_pool.select_all("SELECT pid FROM clove_blind_box_task WHERE state=0")
  260. sql_pids = [i[0] for i in sql_pids]
  261. for pid in sql_pids:
  262. try:
  263. log.debug(f"{inspect.currentframe().f_code.co_name} 获取 pid: {pid} 详情..............")
  264. get_blind_box_detail(log, pid, sql_pool)
  265. except Exception as e:
  266. log.error(f"get_blind_box_detail error: {e}")
  267. # 更改 task 表状态为 3
  268. sql_pool.update_one_or_dict(table="clove_blind_box_task", data={"state": 3}, condition={"pid": pid})
  269. except Exception as e:
  270. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  271. finally:
  272. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  273. def schedule_task():
  274. """
  275. 爬虫模块 定时任务 的启动文件
  276. """
  277. # 立即运行一次任务
  278. # blind_main(log=logger)
  279. # 设置定时任务
  280. schedule.every().day.at("00:01").do(blind_main, log=logger)
  281. while True:
  282. schedule.run_pending()
  283. time.sleep(1)
  284. if __name__ == '__main__':
  285. # blind_main(logger)
  286. schedule_task()
  287. # get_blind_box_detail(logger, 'cmca1kd0i0001s601a4y07n2b')
  288. # test()