| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/12/2 14:08
- import threading
- import time
- import inspect
- import requests
- import schedule
- import user_agent
- from loguru import logger
- from parsel import Selector
- from datetime import datetime
- from mysql_pool import MySQLConnectionPool
- from DrissionPage import ChromiumPage, ChromiumOptions
- from tenacity import retry, stop_after_attempt, wait_fixed
- """
- 扣驾的
- """
- logger.remove()
- logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- headers = {
- "accept": "application/json",
- "referer": "https://courtyard.io/",
- "user-agent": user_agent.generate_user_agent()
- }
- # 全局变量标识首次运行是否完成
- detail_first_run_completed = False
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_proxys(log):
- """
- 获取代理
- :return: 代理
- """
- tunnel = "x371.kdltps.com:15818"
- kdl_username = "t13753103189895"
- kdl_password = "o0yefv6z"
- try:
- proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
- }
- return proxies
- except Exception as e:
- log.error(f"Error getting proxy: {e}")
- raise e
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_goods_list(log, sql_pool):
- """
- 获取商品列表
- :param log: logger对象
- :param sql_pool: MySQL连接池对象
- :return:
- """
- log.info(f"========================== 开始获取商品列表 ==========================")
- url = "https://api.courtyard.io/vending-machines"
- response = requests.get(url, headers=headers, timeout=22)
- # print(response.text)
- response.raise_for_status()
- vendingMachines = response.json().get("vendingMachines", [])
- for item in vendingMachines:
- bag_id = item.get("id")
- bag_title = item.get("title")
- # sealed_pack_animation = item.get("sealedPackAnimation")
- # sealed_pack_image = item.get("sealedPackImage")
- category_title = item.get("category", {}).get("title")
- price = item.get("saleDetails", {}).get("salePriceUsd")
- data_dict = {
- "bag_id": bag_id,
- "bag_title": bag_title,
- "category": category_title,
- "price":price
- }
- # log.info(f'get_goods_list: {data_dict}')
- try:
- get_goods_detail(log, data_dict, sql_pool)
- except Exception as e:
- log.error(f"Error processing item: {e}")
- # 保存数据
- # if info_list:
- # log.info(f"获取商品列表成功, 共 {len(info_list)} 条数据")
- # sql_pool.insert_many(table="courtyard_vending_machines_record", data_list=info_list, ignore= True)
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_goods_detail(log, query_dict: dict, sql_pool=None):
- """
- 获取商品详情
- :param log: logger对象
- :param query_dict: query_dict
- :param sql_pool: MySQL连接池对象
- :return:
- """
- log.info(f"========================== 获取商品详情 ==========================")
- url = "https://api.courtyard.io/index/query/recent-pulls"
- params = {
- "limit": "250",
- # "vendingMachineIds": "pkmn-basic-pack"
- "vendingMachineIds": query_dict["bag_id"]
- }
- response = requests.get(url, headers=headers, params=params, timeout=22)
- # print(response.text)
- response.raise_for_status()
- pulls = response.json().get("assets", [])
- info_list = []
- for item in pulls:
- detail_title = item.get("title")
- detail_id = item.get("proof_of_integrity")
- if not detail_id:
- log.error(f"信息异常, detail_id: {detail_id}")
- continue
- asset_pictures = item.get("asset_pictures", [])
- img_front = asset_pictures[0] if len(asset_pictures) > 0 else None
- img_back = asset_pictures[1] if len(asset_pictures) > 1 else None
- # crawl_date = time.strftime("%Y-%m-%d", time.localtime())
- data_dict = {
- "bag_id": query_dict["bag_id"],
- "bag_title": query_dict["bag_title"],
- "category": query_dict["category"],
- "price": query_dict["price"],
- "detail_id": detail_id,
- "detail_title": detail_title,
- "img_front": img_front,
- "img_back": img_back,
- # "crawler_date": crawl_date
- }
- # log.info(f'data_dict:{data_dict}')
- info_list.append(data_dict)
- # 保存数据
- if info_list:
- log.info(f"获取商品详情成功, 共 {len(info_list)} 条数据")
- sql_pool.insert_many(table="courtyard_list_record", data_list=info_list, ignore=True)
- def convert_time_format(time_str):
- """
- 将时间字符串转换为标准格式
- :param time_str: 原始时间字符串,如 "December 3, 2025 at 4:29 PM"
- :return: 标准时间格式字符串,如 "2025-12-03 16:29:00"
- """
- if not time_str:
- return None
- try:
- dt_obj = datetime.strptime(time_str, "%B %d, %Y at %I:%M %p")
- return dt_obj.strftime("%Y-%m-%d %H:%M:%S")
- except ValueError as e:
- logger.warning(f"时间转换失败: {time_str}, 错误: {e}")
- return None
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_sale_detail_single_page(log, page, sql_id, detail_id, sql_pool=None):
- """
- 获取商品详情
- :param log: logger对象
- :param page: page对象
- :param sql_id: 数据库id
- :param detail_id: 商品详情id
- :param sql_pool: MySQL连接池对象
- :return:
- """
- log.info(f"========================== 获取商品 <sale> 详情, sql_id: {sql_id} ==========================")
- # page_url = "https://courtyard.io/asset/a4f0bbebd858370567f1779fddf0f55630810116d80965e33940fc8ff5ac94b4"
- page_url = f"https://courtyard.io/asset/{detail_id}"
- page.get(page_url)
- page.wait.load_start()
- log.debug(f'{inspect.currentframe().f_code.co_name} -> 页面加载成功, url: {page_url}')
- html = page.html
- if not html:
- log.error(f'{inspect.currentframe().f_code.co_name} -> 页面加载失败...........')
- raise Exception('页面加载失败, 重新加载........') # 抛出异常以便重试
- selector = Selector(text=html)
- # 方法一:通过文本内容匹配(优先)
- correlation_spans = selector.xpath('//span[contains(text(), ":")]/text()')
- correlation_id = None
- for text_selector in correlation_spans:
- correlation_id = text_selector.get() # ✅ 获取字符串
- # match = re.search(r'[\w\s]+:\s*(\d+)', text)
- # if match:
- # correlation_id = match.group(1)
- # break # 获取第一个有效 ID
- # correlation_spans = selector.xpath('//span[contains(text(), ":")]')
- # correlation_text = None
- #
- # for span in correlation_spans:
- # text = span.get()
- # if text and ":" in text:
- # correlation_text = text
- # break
- # 如果方法一失败,使用方法二:通过结构定位(备用)
- if not correlation_id:
- correlation_span = selector.xpath('//a[contains(@href, "cgccards.com")]/preceding-sibling::span[1]/text()')
- correlation_id = correlation_span.get()
- # 初始化所有可能的字段为None
- data_dict = {"detail_id": detail_id, "correlation_id": correlation_id, "burn_from": None,
- "burn_to": None, "burn_time": None, "sale_price": None, "sale_from": None, "sale_to": None,
- "sale_time": None, "mint_price": None, "mint_from": None, "mint_to": None, "mint_time": None}
- # 提取各个标签的数据
- # tag_div_list = selector.xpath('//div[@class="MuiBox-root css-aylq9e"]/div')# class="MuiBox-root css-1b09fes"
- # 获取 "Activity history" 后面的 div
- activity_div = selector.xpath('//h6[text()="Activity history"]/following-sibling::div[1]/div')
- for tag_div in activity_div:
- tag_name = tag_div.xpath('./div[1]/div/span/text()').get()
- if not tag_name:
- continue
- if tag_name == "Burn":
- data_dict["burn_from"] = tag_div.xpath('./div[2]/div[1]//h6/text()').get()
- data_dict["burn_to"] = tag_div.xpath('./div[2]/div[2]//h6/text()').get()
- data_dict["burn_time"] = tag_div.xpath('./div[2]/div[3]/span/@aria-label').get()
- # December 3, 2025 at 4:29 PM 转换时间格式
- data_dict["burn_time"] = convert_time_format(data_dict["burn_time"])
- elif tag_name == "Sale":
- sale_price = tag_div.xpath('./div[2]/span/text()').get()
- if sale_price:
- sale_price = sale_price.replace("$", "").replace(",", "")
- data_dict["sale_price"] = sale_price
- data_dict["sale_from"] = tag_div.xpath('./div[3]/div[1]//h6/text()').get()
- data_dict["sale_to"] = tag_div.xpath('./div[3]/div[2]//h6/text()').get()
- data_dict["sale_time"] = tag_div.xpath('./div[3]/div[3]/span/@aria-label').get()
- # December 3, 2025 at 4:29 PM 转换时间格式
- data_dict["sale_time"] = convert_time_format(data_dict["sale_time"])
- elif tag_name == "Mint":
- mint_price = tag_div.xpath('./div[2]/span/text()').get()
- if mint_price:
- mint_price = mint_price.replace("$", "").replace(",", "")
- data_dict["mint_price"] = mint_price
- data_dict["mint_from"] = tag_div.xpath('./div[3]/div[1]//h6/text()').get()
- data_dict["mint_to"] = tag_div.xpath('./div[3]/div[2]//h6/text()').get()
- data_dict["mint_time"] = tag_div.xpath('./div[3]/div[3]/span/@aria-label').get()
- # December 3, 2025 at 4:29 PM 转换时间格式
- data_dict["mint_time"] = convert_time_format(data_dict["mint_time"])
- # log.info(f'Sale detail data: {data_dict}')
- # 保存数据
- sql_pool.insert_one_or_dict(table="courtyard_detail_record", data=data_dict, ignore=True)
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_sale_detail_list(log, sql_pool=None):
- """
- 获取商品详情
- :param log: logger对象
- # :param detail_id_list: 详情id列表
- :param sql_pool: MySQL连接池对象
- :return:
- """
- log.info(f"========================== 获取商品 <sale> 详情 LIST ==========================")
- options = ChromiumOptions()
- options.set_paths(local_port=9138, user_data_path=r'D:\Drissionpage_temp\courtyard_port_9138')
- # options.set_proxy("http://" + tunnel)
- # options.auto_port(True)
- options.no_imgs(True)
- # 最大化
- options.set_argument("--start-maximized")
- options.set_argument("--disable-gpu")
- options.set_argument("-accept-lang=en-US")
- page = ChromiumPage(options)
- try:
- sql_detail_id_list = sql_pool.select_all("SELECT id, detail_id FROM courtyard_list_record WHERE state = 0")
- for sql_detail_id in sql_detail_id_list:
- sql_id = sql_detail_id[0]
- detail_id = sql_detail_id[1]
- try:
- get_sale_detail_single_page(log, page, sql_id, detail_id, sql_pool)
- sql_pool.update_one("UPDATE courtyard_list_record SET state = 1 WHERE id = %s", (sql_id,))
- except Exception as e:
- log.error(f'get_sale_detail_single_page error: {e}')
- sql_pool.update_one("UPDATE courtyard_list_record SET state = 2 WHERE id = %s", (sql_id,))
- except Exception as e:
- log.error(f'get_response error: {e}')
- raise 'get_response error'
- finally:
- page.quit()
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def list_main(log):
- """
- 主函数 自动售货机
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- start = time.time()
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- try:
- log.debug('------------------- 开始获取商品列表 -------------------')
- get_goods_list(log, sql_pool)
- except Exception as e:
- log.error(f'get_goods_list error: {e}')
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- end = time.time()
- elapsed_time = end - start
- log.info(f'============================== 本次爬虫运行时间:{elapsed_time:.2f} 秒 ===============================')
- return elapsed_time
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def detail_main(log):
- """
- 主函数 自动售货机
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- global detail_first_run_completed
- try:
- # 获取详情页信息
- try:
- log.debug('------------------- 获取商品 detail 数据 -------------------')
- get_sale_detail_list(log, sql_pool)
- except Exception as e:
- log.error(f'get_sale_detail_list error: {e}')
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- detail_first_run_completed = True
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- def control_list_mask(log):
- """
- 控制列表爬虫任务 每10分钟运行
- :param log: logger对象
- """
- while True:
- log.info(
- f'--------------------- 开始运行 {inspect.currentframe().f_code.co_name} 新一轮的爬虫任务 ---------------------')
- elapsed_time = list_main(log)
- # 计算剩余时间
- wait_time = max(0, 300 - int(elapsed_time))
- if wait_time > 0:
- log.info(f"程序运行时间{elapsed_time:.2f}秒, 小于 5 分钟,等待 {wait_time:.2f} 秒后再开始下一轮任务")
- time.sleep(wait_time)
- else:
- log.info("程序运行时间大于等于5分钟,直接开始下一轮任务")
- def scheduled_detail_main(log):
- """定时任务调用的包装函数"""
- global detail_first_run_completed
- if detail_first_run_completed:
- detail_main(log)
- else:
- log.info("Skipping scheduled task as first run is not completed yet")
- def run_threaded(job_func, *args, **kwargs):
- """
- 在新线程中运行给定的函数,并传递参数。
- :param job_func: 要运行的目标函数
- :param args: 位置参数
- :param kwargs: 关键字参数
- """
- job_thread = threading.Thread(target=job_func, args=args, kwargs=kwargs)
- job_thread.start()
- def schedule_task():
- """
- 设置定时任务
- """
- # 启动 control_list_mask 任务线程
- list_thread = threading.Thread(target=control_list_mask, args=(logger,))
- list_thread.daemon = True # 设置为守护线程,主程序退出时自动结束
- list_thread.start()
- # 启动 detail_main 任务线程(首次运行)
- detail_thread = threading.Thread(target=detail_main, args=(logger,))
- detail_thread.daemon = True
- detail_thread.start()
- # 设置定时任务 每天
- # schedule.every().day.at("00:01").do(run_threaded, detail_main, logger)
- schedule.every().day.at("00:01").do(run_threaded, scheduled_detail_main, logger)
- while True:
- schedule.run_pending()
- time.sleep(1)
- if __name__ == '__main__':
- schedule_task()
- # detail_main(log=logger)
- # get_sale_detail_list(log, ((1, 'a4f0bbebd858370567f1779fddf0f55630810116d80965e33940fc8ff5ac94b4'),
- # (2, 'a4f0bbebd858370567f1779fddf0f55630810116d80965e33940fc8ff5ac94b4')), sql_pool)
|