| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/7/29 15:06
- import json
- import time
- import requests
- # from curl_cffi import requests
- import inspect
- import schedule
- from loguru import logger
- from parsel import Selector
- from tenacity import retry, stop_after_attempt, wait_fixed
- from mysql_pool import MySQLConnectionPool
- # logger.remove()
- # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- # level="DEBUG", retention="7 day")
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_proxys(log):
- """
- 获取代理
- :return: 代理
- """
- tunnel = "x371.kdltps.com:15818"
- kdl_username = "t13753103189895"
- kdl_password = "o0yefv6z"
- try:
- proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
- }
- return proxies
- except Exception as e:
- log.error(f"Error getting proxy: {e}")
- raise e
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_blind_box_list(log):
- log.debug(f"{inspect.currentframe().f_code.co_name}---------------------- 开始获取盲盒列表 ----------------------")
- headers = {
- "accept": "application/graphql-response+json, application/graphql+json, application/json, text/event-stream, multipart/mixed",
- "content-type": "application/json",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"
- }
- url = "https://api.prd.oripa.clove.jp/graphql"
- data = {
- "operationName": "orderedOripas",
- "query": "query orderedOripas($orderedOripasInput: OrderedOripasInput!) {\n orderedOripas(orderedOripasInput: $orderedOripasInput) {\n category\n isR18\n hasLastOne\n id\n isDaily\n isAppraised\n isUserLimited\n requiredRankName\n name\n nameLogo\n nameHidden\n price\n publishStatus\n quantity\n remaining\n roundNumber\n subImages\n thumbnail\n extraPrizeThreshold\n video {\n id\n title\n videoData\n deleted\n __typename\n }\n oripaSearchTargetPrizes {\n searchTargetPrize {\n titleJa\n __typename\n }\n __typename\n }\n openAt\n __typename\n }\n}",
- "variables": {
- "orderedOripasInput": {
- "isR18": False,
- "sort": "LOW_REMAINING_RATE" # 剩余数量
- }
- }
- }
- response = requests.post(url, headers=headers, json=data, timeout=60, proxies=get_proxys(log))
- # response = requests.post(url, headers=headers, json=data, timeout=60)
- # print(response.text)
- response.raise_for_status()
- return response.json()
- def parse_list(log, resp_data, sql_pool):
- log.debug(f"{inspect.currentframe().f_code.co_name} start..........")
- items = resp_data["data"]["orderedOripas"]
- log.info("获取到%s个数据" % len(items))
- if not items:
- log.debug("没有数据")
- return
- info_list = []
- for item in items:
- # print(item)
- pid = item.get("id")
- category = item.get("category")
- title = item.get("name")
- price = item.get("price")
- publishStatus = item.get("publishStatus")
- quantity = item.get("quantity") # 总数量
- remaining = item.get("remaining") # 剩余数量
- # thumbnail有三种 {}, [], ''
- tag_image = item.get("thumbnail")
- if isinstance(tag_image, dict):
- # 处理字典类型的thumbnail,从中提取ja字段的值
- ja_images = tag_image.get("ja")
- if isinstance(ja_images, list) and len(ja_images) > 0:
- image = ja_images[0] # 取第一个图片链接
- elif isinstance(ja_images, str):
- image = ja_images
- else:
- image = ''
- elif isinstance(tag_image, list):
- # 处理列表类型的thumbnail
- if len(tag_image) > 0:
- image = tag_image[0]
- else:
- image = ''
- elif isinstance(tag_image, str):
- # 处理字符串类型的thumbnail
- image = tag_image
- else:
- image = ''
- # subImages
- tag_subImages = item.get("subImages")
- if isinstance(tag_subImages, dict):
- # 处理字典类型的thumbnail,从中提取ja字段的值
- ja_subimages = tag_subImages.get("ja")
- if isinstance(ja_subimages, list) and len(ja_subimages) > 0:
- sub_image = ja_subimages[0] # 取第一个图片链接
- elif isinstance(ja_subimages, str):
- sub_image = ja_subimages
- else:
- sub_image = ''
- elif isinstance(tag_subImages, list):
- # 处理列表类型的thumbnail
- if len(tag_subImages) > 0:
- sub_image = tag_subImages[0]
- else:
- sub_image = ''
- elif isinstance(tag_subImages, str):
- # 处理字符串类型的thumbnail
- sub_image = tag_subImages
- else:
- sub_image = ''
- open_at = item.get("openAt")
- data_dict = {
- "pid": pid,
- "category": category,
- "title": title,
- "price": price,
- "publish_status": publishStatus,
- "quantity": quantity,
- "remaining": remaining,
- "image": image,
- "sub_image": sub_image,
- "open_at": open_at
- }
- # print(data_dict)
- info_list.append(data_dict)
- # 插入数据库
- if info_list:
- try:
- sql_pool.insert_many(table="clove_blind_box_list_record", data_list=info_list, ignore=True)
- # sql = "INSERT IGNORE INTO clove_blind_box_list_record (pid, category, title, price, publish_status, quantity, remaining, image, sub_image, open_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
- # sql_pool.insert_all(sql, info_list)
- except Exception as e:
- log.warning(f"{inspect.currentframe().f_code.co_name}, {e[:500]}")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_blind_box_detail(log, pid, sql_pool):
- headers = {
- "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
- "referer": "https://oripa.clove.jp/oripa/All",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"
- }
- # url = "https://oripa.clove.jp/oripa/All/cmca1kd0i0001s601a4y07n2b"
- url = f"https://oripa.clove.jp/oripa/All/{pid}"
- # response = requests.get(url, headers=headers, cookies=cookies)
- response = requests.get(url, headers=headers, timeout=60, proxies=get_proxys(log))
- # print(response.text)
- selector = Selector(text=response.text)
- tag_shang = selector.xpath('//script[@id="__NEXT_DATA__"]/text()').get()
- if not tag_shang:
- log.debug(f"{inspect.currentframe().f_code.co_name} 没有 tag_shang 数据..............")
- # 更改 task 表状态为 2
- sql_pool.update_one_or_dict(table="clove_blind_box_task", data={"state": 2}, condition={"pid": pid})
- return
- json_data = json.loads(tag_shang)
- if json_data:
- displayedPrizes = json_data.get('props', {}).get('pageProps', {}).get('oripa', {}).get("displayedPrizes", [])
- log.debug(f"{inspect.currentframe().f_code.co_name} 获取到 {len(displayedPrizes)} 条数据..............")
- if not displayedPrizes:
- log.debug(f"{inspect.currentframe().f_code.co_name} 没有 displayedPrizes 数据..............")
- # 更改 task 表状态为 2
- sql_pool.update_one_or_dict(table="clove_blind_box_task", data={"state": 2}, condition={"pid": pid})
- return
- info_list = []
- for item in displayedPrizes:
- prize_id = item.get("id")
- prizeType = item.get("prizeType")
- quantity = item.get("quantity")
- mainDescription = item.get("mainDescription")
- mainDescriptionEn = item.get("mainDescriptionEn")
- subDescription = item.get("subDescription")
- kataban = item.get("kataban")
- imageUrl = item.get("imageUrl")
- prize_condition = item.get("condition")
- isReferencePriceTarget = item.get("isReferencePriceTarget")
- referencePrice = item.get("referencePrice")
- referencePriceUpdatedAt = item.get("referencePriceUpdatedAt")
- data_dict = {
- "pid": pid,
- "prize_id": prize_id, # 奖品id
- "prize_type": prizeType, # 奖品类型
- "quantity": quantity, # 数量
- "main_description": mainDescription, # 奖品描述
- "main_description_en": mainDescriptionEn,
- "sub_description": subDescription, # 卡片等级
- "kataban": kataban, # 卡标签
- "image_url": imageUrl, # 图片地址
- "prize_condition": prize_condition, # 奖品评分, PSA10
- "is_reference_price_target": isReferencePriceTarget, # (疑似)是否开封状态 (1: 是, 0: 否)
- "reference_price": referencePrice, # 参考价格
- "reference_price_updated_at": referencePriceUpdatedAt # 参考价格更新时间
- }
- # print(data_dict)
- info_list.append(data_dict)
- if info_list:
- try:
- sql_pool.insert_many(table="clove_blind_box_detail_record", data_list=info_list, ignore=True)
- # 更新 task 表状态为 1
- sql_pool.update_one_or_dict(table="clove_blind_box_task", data={"state": 1}, condition={"pid": pid})
- except Exception as e:
- log.warning(f"{inspect.currentframe().f_code.co_name}, {e[:500]}")
- else:
- log.debug(f"{inspect.currentframe().f_code.co_name} 请求出错, 没有数据..............")
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def blind_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务.................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- try:
- resp_data = get_blind_box_list(log)
- parse_list(log, resp_data, sql_pool)
- except Exception as e2:
- log.error(f"Request get_blind_box_list error: {e2}")
- # 先把新增的任务添加到任务表, 查询每日新增的 pid, 去重并插入 task 表
- sql_select_pids = sql_pool.select_all(
- # "SELECT DISTINCT pid FROM clove_blind_box_list_record WHERE DATE(gmt_create_time) = CURDATE() - INTERVAL 1 DAY")
- "SELECT DISTINCT pid FROM clove_blind_box_list_record WHERE DATE(gmt_create_time) = CURDATE()")
- sql_select_pids = [i[0] for i in sql_select_pids]
- sql_pool.insert_many(
- query="INSERT INTO clove_blind_box_task (pid) VALUES (%s) ON DUPLICATE KEY UPDATE pid=VALUES(pid)",
- args_list=sql_select_pids,
- ignore=True
- )
- time.sleep(5)
- # 获取详情
- sql_pids = sql_pool.select_all("SELECT pid FROM clove_blind_box_task WHERE state=0")
- sql_pids = [i[0] for i in sql_pids]
- for pid in sql_pids:
- try:
- log.debug(f"{inspect.currentframe().f_code.co_name} 获取 pid: {pid} 详情..............")
- get_blind_box_detail(log, pid, sql_pool)
- except Exception as e:
- log.error(f"get_blind_box_detail error: {e}")
- # 更改 task 表状态为 3
- sql_pool.update_one_or_dict(table="clove_blind_box_task", data={"state": 3}, condition={"pid": pid})
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- def schedule_task():
- """
- 爬虫模块 定时任务 的启动文件
- """
- # 立即运行一次任务
- # blind_main(log=logger)
- # 设置定时任务
- schedule.every().day.at("00:01").do(blind_main, log=logger)
- while True:
- schedule.run_pending()
- time.sleep(1)
- if __name__ == '__main__':
- # blind_main(logger)
- schedule_task()
- # get_blind_box_detail(logger, 'cmca1kd0i0001s601a4y07n2b')
- # test()
|