| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/7/31 11:33
- import re
- import time
- import requests
- import inspect
- import schedule
- from loguru import logger
- from parsel import Selector
- from tenacity import retry, stop_after_attempt, wait_fixed
- from mysql_pool import MySQLConnectionPool
- from DrissionPage import ChromiumPage, ChromiumOptions
- logger.remove()
- logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_response(log, page_url) -> (None, None):
- """
- 获取页面源码
- :param log: log 对象
- :param page_url: 页面 URL
- :return: 页面源码, tag_turn_href
- """
- options = ChromiumOptions().set_paths(local_port=9131, user_data_path=r'D:\Drissionpage_temp\local_port_9131')
- # options.set_argument("--disable-gpu")
- options.set_argument("-accept-lang=en-US")
- page = ChromiumPage(options)
- try:
- page.get(page_url)
- # page.wait.load_start() # 等待页面进入加载状态
- # page_title = page.title.lower()
- # if "just a moment" or "请稍候" in page_title:
- # cf_bypasser = CloudflareBypasser(page, max_retries=5, log=log)
- # cf_bypasser.bypass()
- # page.wait.load_start() # 等待页面进入加载状态
- html = page.html
- if html:
- # print(html)
- # parse_data(html)
- return html
- else:
- log.error('页面加载失败')
- raise '页面加载失败, 重新加载........'
- except Exception as e:
- log.error(f'get_response error: {e}')
- raise 'get_response error'
- finally:
- page.quit()
- def get_lucky_bag_list(log, category_name, sql_pool):
- page = 1
- while page <= 500:
- try:
- log.debug(
- f'--------------- {inspect.currentframe().f_code.co_name}, page {page}, category_name {category_name} start ---------------')
- len_items = get_lucky_bag_single_page(log, category_name, page, sql_pool)
- except Exception as e:
- log.error(
- f"{inspect.currentframe().f_code.co_name} Request get_shop_product_sold_single_page for page:{page}, {e}")
- len_items = 0
- if len_items < 72:
- log.debug(f'--------------- page {page} has {len_items} items, break ---------------')
- break
- page += 1
- # 设置等待时间 避免查询太频繁
- # time.sleep(random.uniform(0.5, 1))
- def get_lucky_bag_single_page(log, category_name, page, sql_pool):
- log.debug(
- f"{inspect.currentframe().f_code.co_name} Request category_name:{category_name}, page:{page}................")
- # url = "https://store.clove.jp/jp/categories/pokemon?page=1"
- url = f"https://store.clove.jp/jp/categories/{category_name}?page={page}"
- # response = requests.get(url, headers=headers)
- response_text = get_response(log, url)
- # print(response.text)
- # with open("clove_list.html", "w", encoding="utf-8") as f:
- # f.write(response.text)
- # print(response)
- # selector = Selector(text=response.text)
- # tag_li_list = selector.xpath('//div[@class="w-full"]/ul/li')
- # # print(tag_li_list)
- # for tag_li in tag_li_list:
- # print(tag_li)
- # # title = tag_li.xpath('./div/a/div/div[2]/div[1]/p[1]/font/font/text()').get()
- # title = tag_li.xpath('./div/a/div/div[2]/div[1]/p[1]//text()').get()
- # detail_href = tag_li.xpath('./div/a/@href').get()
- # detail_href = 'https://store.clove.jp' + detail_href if detail_href else ""
- # # 将图片链接 的 w和q 改为 1200 和75
- # """
- # src="https://store.clove.jp/_next/image?url=https%3A%2F%2Fstorage.googleapis.com%2Fclove-admin-public-resources%2Fcollectibles%2Fcm4aukaqi00k8s601pg63bli1&w=1200&q=75"
- # src=" /_next/image?url=https%3A%2F%2Fstorage.googleapis.com%2Fclove-admin-public-resources%2Fcollectibles%2Fcm4aukaqi00k8s601pg63bli1&w=3840&q=50"
- # """
- # image_url = tag_li.xpath('./div/a/div/div[1]//img/@src').get()
- # image_url = 'https://store.clove.jp' + image_url if image_url else ""
- # image_url = image_url.replace("w=3840", "w=1200").replace("q=50", "q=75") if image_url else ""
- # card_number = tag_li.xpath('./div/a/div/div[2]/div[1]/p[1]/font/font/text()').get()
- # price = tag_li.xpath('./div/a/div/div[2]/div[2]/div/div/p[2]//text()').getall()
- # price = "".join(price).strip() if price else None
- #
- # inventory = tag_li.xpath('./div/a/div/div[2]/div[2]/p/font[2]/font/text()').get()
- # data_dict = {
- # "category": category_name,
- # "title": title,
- # "detail_href": detail_href,
- # "image_url": image_url,
- # "card_number": card_number,
- # "price": price,
- # "inventory": inventory,
- # }
- # print(data_dict)
- selector = Selector(text=response_text)
- len_items = parse_products_from_html(log, selector, category_name, sql_pool)
- return len_items
- def parse_products_from_html(log, selector, category, sql_pool):
- """使用XPath从HTML中提取商品信息"""
- # 查找所有商品预览div
- tag_div_list = selector.xpath('//div[@data-sentry-component="ProductPreview"]')
- log.debug(f"找到 {len(tag_div_list)} 个商品")
- # # 如果找不到ProductPreview组件,尝试查找其他可能的商品元素
- # if len(tag_div_list) == 0:
- # # 尝试查找li元素作为备选方案
- # tag_div_list = selector.xpath('//ul[@data-testid="products-list-loader"]/li')
- # print(f"备选方案找到 {len(tag_div_list)} 个商品占位符")
- #
- # # 如果仍然找不到,尝试从script标签中解析商品数据
- # if len(tag_div_list) == 0:
- # print("未找到商品元素,尝试从script标签中解析数据...")
- # # 使用你已有的parse_products_from_script函数来解析
- # return
- info_list = []
- for tag_div in tag_div_list:
- # 提取商品信息
- title = tag_div.xpath('.//p[@data-testid="product-title"]/text()').get()
- # subtitle可能包含卡片编号信息,如"117/139"
- subtitle = tag_div.xpath('.//p[contains(@class, "text-ui-fg-subtle")]/text()').get()
- detail_href = tag_div.xpath('./a/@href').get()
- detail_href = 'https://store.clove.jp' + detail_href if detail_href else ""
- # 提取价格信息 - 尝试多种可能的XPath路径
- # price_elements = tag_div.xpath('.//p[contains(text(), "¥")]/following-sibling::p[1]//text()').getall()
- # price_elements = tag_div.xpath('.//p[contains(@class, "font-bold") and contains(@class, "text-base")]//text()').getall()
- # if not price_elements:
- # # 如果上面的方法找不到,尝试其他可能的XPath路径
- # price_elements = tag_div.xpath(
- # './/div[@data-sentry-component="PreviewPrice"]//p//text() | .//a//div[@data-sentry-component="PreviewPrice"]//text()'
- # ).getall()
- price_elements = tag_div.xpath(
- './/div[@data-sentry-component="PreviewPrice"]//p//text() | .//a//div[@data-sentry-component="PreviewPrice"]//text()'
- ).getall()
- # print(price_elements)
- price = ''.join(price_elements).strip() if price_elements else None
- # 清理价格数据,移除货币符号和逗号
- if price:
- price = re.sub(r'[¥,]', '', price).strip()
- # 提取库存信息 - 增加错误处理
- inventory_text = tag_div.xpath('.//p[contains(text(), "在庫数")]//text()').getall()
- inventory = None
- if inventory_text and len(inventory_text) > 1:
- inventory = inventory_text[1].strip()
- elif inventory_text:
- # 如果只找到一个文本节点,尝试提取其中的数字
- inventory_full_text = ''.join(inventory_text)
- inventory_match = re.search(r'\d+', inventory_full_text)
- if inventory_match:
- inventory = inventory_match.group()
- # 提取图片链接
- image_url = tag_div.xpath('.//div[@data-sentry-component="ImageOrPlaceholder"]//img/@src').get()
- image_url = 'https://store.clove.jp' + image_url if image_url else ""
- image_url = image_url.replace("w=3840", "w=1200").replace("q=50", "q=75") if image_url else ""
- data_dict = {
- "title": title,
- "subtitle": subtitle, # 类似"117/139"的卡片编号信息
- "detail_href": detail_href,
- "image_url": image_url,
- "price": price,
- "inventory": inventory, # 库存
- "category": category
- }
- # print(data_dict)
- info_list.append(data_dict)
- # 插入数据库
- if info_list:
- try:
- sql_pool.insert_many(table="clove_lucky_bag_list_record", data_list=info_list, ignore=True)
- # sql = "INSERT IGNORE INTO clove_lucky_bag_list_record (pid, category, title, price, publish_status, quantity, remaining, image, sub_image, open_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
- # sql_pool.insert_all(sql, info_list)
- except Exception as e:
- log.warning(f"{inspect.currentframe().f_code.co_name}, {e[:500]}")
- return len(tag_div_list)
- def get_detail(log, sql_id, detail_url, sql_pool):
- headers = {
- "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
- "referer": "https://store.clove.jp/jp/categories/lorcana",
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"
- }
- # url = "https://store.clove.jp/jp/products/cm8cv711v00b4s60197g7tikc"
- response = requests.get(detail_url, headers=headers)
- # print(response.text)
- # print(response.status_code)
- selector = Selector(response.text)
- # tag_div_list = selector.xpath('//div[@class="grid gap-y-3"]/div')
- # for tag_div in tag_div_list:
- first_inventory = selector.xpath('//div[@class="grid gap-y-3"]/div[1]/div[1]/p/text()').getall()
- first_inventory = ''.join(first_inventory).strip() if first_inventory else None
- first_inventory = first_inventory.replace('在庫: ', '').replace('点', '') if first_inventory else None
- first_price = selector.xpath('//div[@class="grid gap-y-3"]/div[1]//span[@data-testid="product-price"]/text()').get()
- second_inventory = selector.xpath('//div[@class="grid gap-y-3"]/div[2]/div[1]/p/text()').getall()
- second_inventory = ''.join(second_inventory).strip() if second_inventory else None
- second_inventory = second_inventory.replace('在庫: ', '').replace('点', '') if second_inventory else None
- second_price = selector.xpath(
- '//div[@class="grid gap-y-3"]/div[2]//span[@data-testid="product-price"]/text()').get()
- third_inventory = selector.xpath('//div[@class="grid gap-y-3"]/div[3]/div[1]/p/text()').getall()
- third_inventory = ''.join(third_inventory).strip() if third_inventory else None
- third_inventory = third_inventory.replace('在庫: ', '').replace('点', '') if third_inventory else None
- third_price = selector.xpath('//div[@class="grid gap-y-3"]/div[3]//span[@data-testid="product-price"]/text()').get()
- data_dict = {
- "first_inventory": first_inventory,
- "first_price": first_price,
- "second_inventory": second_inventory,
- "second_price": second_price,
- "third_inventory": third_inventory,
- "third_price": third_price
- }
- # print(data_dict)
- try:
- sql_pool.update_one_or_dict(table="clove_lucky_bag_list_record", data=data_dict, condition={"id": sql_id})
- # 更新任务状态为 1
- # sql_pool.update_one(table="clove_lucky_bag_list_record", data={"state": 1}, condition={"pid": sql_id})
- sql_pool.update_one(f"update clove_lucky_bag_list_record set state=1 where id={sql_id}")
- except Exception as e:
- log.warning(f"{inspect.currentframe().f_code.co_name}, {e[:500]}")
- # sql_pool.update_one_or_dict(table="clove_lucky_bag_list_record", data={"state": 3},
- # condition={"id": sql_id})
- sql_pool.update_one(f"update clove_lucky_bag_list_record set state=3 where id={sql_id}")
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def lucky_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务.................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- category_list = ["pokemon", "onepiece", "duel-masters", "lorcana", "fab"]
- for category in category_list:
- try:
- get_lucky_bag_list(log, category, sql_pool)
- except Exception as e2:
- log.error(f"Request get_lucky_bag_list error: {e2}")
- # 获取详情页数据
- sql_result = sql_pool.select_all("select id, detail_href from clove_lucky_bag_list_record where state=0")
- if sql_result and not isinstance(sql_result, Exception):
- for row in sql_result:
- try:
- pid, detail_href = row
- log.debug(f"{inspect.currentframe().f_code.co_name} 获取 pid: {pid} 详情..............")
- get_detail(log, pid, detail_href, sql_pool)
- except Exception as e:
- log.error(f"Request get_detail error: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- def schedule_task():
- """
- 爬虫模块 定时任务 的启动文件
- """
- # 立即运行一次任务
- lucky_main(log=logger)
- # 设置定时任务
- schedule.every().day.at("00:01").do(lucky_main, log=logger)
- while True:
- schedule.run_pending()
- time.sleep(1)
- if __name__ == '__main__':
- # get_lucky_bag_single_page(logger, 'pokemon')
- # lucky_main(log=logger)
- # get_detail(log=logger)
- schedule_task()
|