# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/7/29 15:06 import json import time import requests # from curl_cffi import requests import inspect import schedule from loguru import logger from parsel import Selector from tenacity import retry, stop_after_attempt, wait_fixed from mysql_pool import MySQLConnectionPool # logger.remove() # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", # level="DEBUG", retention="7 day") def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_blind_box_list(log): log.debug(f"{inspect.currentframe().f_code.co_name}---------------------- 开始获取盲盒列表 ----------------------") headers = { "accept": "application/graphql-response+json, application/graphql+json, application/json, text/event-stream, multipart/mixed", "content-type": "application/json", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36" } url = "https://api.prd.oripa.clove.jp/graphql" data = { "operationName": "orderedOripas", "query": "query orderedOripas($orderedOripasInput: OrderedOripasInput!) {\n orderedOripas(orderedOripasInput: $orderedOripasInput) {\n category\n isR18\n hasLastOne\n id\n isDaily\n isAppraised\n isUserLimited\n requiredRankName\n name\n nameLogo\n nameHidden\n price\n publishStatus\n quantity\n remaining\n roundNumber\n subImages\n thumbnail\n extraPrizeThreshold\n video {\n id\n title\n videoData\n deleted\n __typename\n }\n oripaSearchTargetPrizes {\n searchTargetPrize {\n titleJa\n __typename\n }\n __typename\n }\n openAt\n __typename\n }\n}", "variables": { "orderedOripasInput": { "isR18": False, "sort": "LOW_REMAINING_RATE" # 剩余数量 } } } response = requests.post(url, headers=headers, json=data, timeout=60, proxies=get_proxys(log)) # response = requests.post(url, headers=headers, json=data, timeout=60) # print(response.text) response.raise_for_status() return response.json() def parse_list(log, resp_data, sql_pool): log.debug(f"{inspect.currentframe().f_code.co_name} start..........") items = resp_data["data"]["orderedOripas"] log.info("获取到%s个数据" % len(items)) if not items: log.debug("没有数据") return info_list = [] for item in items: # print(item) pid = item.get("id") category = item.get("category") title = item.get("name") price = item.get("price") publishStatus = item.get("publishStatus") quantity = item.get("quantity") # 总数量 remaining = item.get("remaining") # 剩余数量 # thumbnail有三种 {}, [], '' tag_image = item.get("thumbnail") if isinstance(tag_image, dict): # 处理字典类型的thumbnail,从中提取ja字段的值 ja_images = tag_image.get("ja") if isinstance(ja_images, list) and len(ja_images) > 0: image = ja_images[0] # 取第一个图片链接 elif isinstance(ja_images, str): image = ja_images else: image = '' elif isinstance(tag_image, list): # 处理列表类型的thumbnail if len(tag_image) > 0: image = tag_image[0] else: image = '' elif isinstance(tag_image, str): # 处理字符串类型的thumbnail image = tag_image else: image = '' # subImages tag_subImages = item.get("subImages") if isinstance(tag_subImages, dict): # 处理字典类型的thumbnail,从中提取ja字段的值 ja_subimages = tag_subImages.get("ja") if isinstance(ja_subimages, list) and len(ja_subimages) > 0: sub_image = ja_subimages[0] # 取第一个图片链接 elif isinstance(ja_subimages, str): sub_image = ja_subimages else: sub_image = '' elif isinstance(tag_subImages, list): # 处理列表类型的thumbnail if len(tag_subImages) > 0: sub_image = tag_subImages[0] else: sub_image = '' elif isinstance(tag_subImages, str): # 处理字符串类型的thumbnail sub_image = tag_subImages else: sub_image = '' open_at = item.get("openAt") data_dict = { "pid": pid, "category": category, "title": title, "price": price, "publish_status": publishStatus, "quantity": quantity, "remaining": remaining, "image": image, "sub_image": sub_image, "open_at": open_at } # print(data_dict) info_list.append(data_dict) # 插入数据库 if info_list: try: sql_pool.insert_many(table="clove_blind_box_list_record", data_list=info_list, ignore=True) # sql = "INSERT IGNORE INTO clove_blind_box_list_record (pid, category, title, price, publish_status, quantity, remaining, image, sub_image, open_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" # sql_pool.insert_all(sql, info_list) except Exception as e: log.warning(f"{inspect.currentframe().f_code.co_name}, {e[:500]}") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_blind_box_detail(log, pid, sql_pool): headers = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "referer": "https://oripa.clove.jp/oripa/All", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36" } # url = "https://oripa.clove.jp/oripa/All/cmca1kd0i0001s601a4y07n2b" url = f"https://oripa.clove.jp/oripa/All/{pid}" # response = requests.get(url, headers=headers, cookies=cookies) response = requests.get(url, headers=headers, timeout=60, proxies=get_proxys(log)) # print(response.text) selector = Selector(text=response.text) tag_shang = selector.xpath('//script[@id="__NEXT_DATA__"]/text()').get() if not tag_shang: log.debug(f"{inspect.currentframe().f_code.co_name} 没有 tag_shang 数据..............") # 更改 task 表状态为 2 sql_pool.update_one_or_dict(table="clove_blind_box_task", data={"state": 2}, condition={"pid": pid}) return json_data = json.loads(tag_shang) if json_data: displayedPrizes = json_data.get('props', {}).get('pageProps', {}).get('oripa', {}).get("displayedPrizes", []) log.debug(f"{inspect.currentframe().f_code.co_name} 获取到 {len(displayedPrizes)} 条数据..............") if not displayedPrizes: log.debug(f"{inspect.currentframe().f_code.co_name} 没有 displayedPrizes 数据..............") # 更改 task 表状态为 2 sql_pool.update_one_or_dict(table="clove_blind_box_task", data={"state": 2}, condition={"pid": pid}) return info_list = [] for item in displayedPrizes: prize_id = item.get("id") prizeType = item.get("prizeType") quantity = item.get("quantity") mainDescription = item.get("mainDescription") mainDescriptionEn = item.get("mainDescriptionEn") subDescription = item.get("subDescription") kataban = item.get("kataban") imageUrl = item.get("imageUrl") prize_condition = item.get("condition") isReferencePriceTarget = item.get("isReferencePriceTarget") referencePrice = item.get("referencePrice") referencePriceUpdatedAt = item.get("referencePriceUpdatedAt") data_dict = { "pid": pid, "prize_id": prize_id, # 奖品id "prize_type": prizeType, # 奖品类型 "quantity": quantity, # 数量 "main_description": mainDescription, # 奖品描述 "main_description_en": mainDescriptionEn, "sub_description": subDescription, # 卡片等级 "kataban": kataban, # 卡标签 "image_url": imageUrl, # 图片地址 "prize_condition": prize_condition, # 奖品评分, PSA10 "is_reference_price_target": isReferencePriceTarget, # (疑似)是否开封状态 (1: 是, 0: 否) "reference_price": referencePrice, # 参考价格 "reference_price_updated_at": referencePriceUpdatedAt # 参考价格更新时间 } # print(data_dict) info_list.append(data_dict) if info_list: try: sql_pool.insert_many(table="clove_blind_box_detail_record", data_list=info_list, ignore=True) # 更新 task 表状态为 1 sql_pool.update_one_or_dict(table="clove_blind_box_task", data={"state": 1}, condition={"pid": pid}) except Exception as e: log.warning(f"{inspect.currentframe().f_code.co_name}, {e[:500]}") else: log.debug(f"{inspect.currentframe().f_code.co_name} 请求出错, 没有数据..............") @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def blind_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务.................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: try: resp_data = get_blind_box_list(log) parse_list(log, resp_data, sql_pool) except Exception as e2: log.error(f"Request get_blind_box_list error: {e2}") # 先把新增的任务添加到任务表, 查询每日新增的 pid, 去重并插入 task 表 sql_select_pids = sql_pool.select_all( # "SELECT DISTINCT pid FROM clove_blind_box_list_record WHERE DATE(gmt_create_time) = CURDATE() - INTERVAL 1 DAY") "SELECT DISTINCT pid FROM clove_blind_box_list_record WHERE DATE(gmt_create_time) = CURDATE()") sql_select_pids = [i[0] for i in sql_select_pids] sql_pool.insert_many( query="INSERT INTO clove_blind_box_task (pid) VALUES (%s) ON DUPLICATE KEY UPDATE pid=VALUES(pid)", args_list=sql_select_pids, ignore=True ) time.sleep(5) # 获取详情 sql_pids = sql_pool.select_all("SELECT pid FROM clove_blind_box_task WHERE state=0") sql_pids = [i[0] for i in sql_pids] for pid in sql_pids: try: log.debug(f"{inspect.currentframe().f_code.co_name} 获取 pid: {pid} 详情..............") get_blind_box_detail(log, pid, sql_pool) except Exception as e: log.error(f"get_blind_box_detail error: {e}") # 更改 task 表状态为 3 sql_pool.update_one_or_dict(table="clove_blind_box_task", data={"state": 3}, condition={"pid": pid}) except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') def schedule_task(): """ 爬虫模块 定时任务 的启动文件 """ # 立即运行一次任务 # blind_main(log=logger) # 设置定时任务 schedule.every().day.at("00:01").do(blind_main, log=logger) while True: schedule.run_pending() time.sleep(1) if __name__ == '__main__': # blind_main(logger) schedule_task() # get_blind_box_detail(logger, 'cmca1kd0i0001s601a4y07n2b') # test()