# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/12/2 14:08 import threading import time import inspect import requests import schedule import user_agent from loguru import logger from parsel import Selector from datetime import datetime from mysql_pool import MySQLConnectionPool from DrissionPage import ChromiumPage, ChromiumOptions from tenacity import retry, stop_after_attempt, wait_fixed """ 扣驾的 """ logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") headers = { "accept": "application/json", "referer": "https://courtyard.io/", "user-agent": user_agent.generate_user_agent() } # 全局变量标识首次运行是否完成 detail_first_run_completed = False def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_goods_list(log, sql_pool): """ 获取商品列表 :param log: logger对象 :param sql_pool: MySQL连接池对象 :return: """ log.info(f"========================== 开始获取商品列表 ==========================") url = "https://api.courtyard.io/vending-machines" response = requests.get(url, headers=headers, timeout=22) # print(response.text) response.raise_for_status() vendingMachines = response.json().get("vendingMachines", []) for item in vendingMachines: bag_id = item.get("id") bag_title = item.get("title") # sealed_pack_animation = item.get("sealedPackAnimation") # sealed_pack_image = item.get("sealedPackImage") category_title = item.get("category", {}).get("title") price = item.get("saleDetails", {}).get("salePriceUsd") data_dict = { "bag_id": bag_id, "bag_title": bag_title, "category": category_title, "price":price } # log.info(f'get_goods_list: {data_dict}') try: get_goods_detail(log, data_dict, sql_pool) except Exception as e: log.error(f"Error processing item: {e}") # 保存数据 # if info_list: # log.info(f"获取商品列表成功, 共 {len(info_list)} 条数据") # sql_pool.insert_many(table="courtyard_vending_machines_record", data_list=info_list, ignore= True) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_goods_detail(log, query_dict: dict, sql_pool=None): """ 获取商品详情 :param log: logger对象 :param query_dict: query_dict :param sql_pool: MySQL连接池对象 :return: """ log.info(f"========================== 获取商品详情 ==========================") url = "https://api.courtyard.io/index/query/recent-pulls" params = { "limit": "250", # "vendingMachineIds": "pkmn-basic-pack" "vendingMachineIds": query_dict["bag_id"] } response = requests.get(url, headers=headers, params=params, timeout=22) # print(response.text) response.raise_for_status() pulls = response.json().get("assets", []) info_list = [] for item in pulls: detail_title = item.get("title") detail_id = item.get("proof_of_integrity") if not detail_id: log.error(f"信息异常, detail_id: {detail_id}") continue asset_pictures = item.get("asset_pictures", []) img_front = asset_pictures[0] if len(asset_pictures) > 0 else None img_back = asset_pictures[1] if len(asset_pictures) > 1 else None # crawl_date = time.strftime("%Y-%m-%d", time.localtime()) data_dict = { "bag_id": query_dict["bag_id"], "bag_title": query_dict["bag_title"], "category": query_dict["category"], "price": query_dict["price"], "detail_id": detail_id, "detail_title": detail_title, "img_front": img_front, "img_back": img_back, # "crawler_date": crawl_date } # log.info(f'data_dict:{data_dict}') info_list.append(data_dict) # 保存数据 if info_list: log.info(f"获取商品详情成功, 共 {len(info_list)} 条数据") sql_pool.insert_many(table="courtyard_list_record", data_list=info_list, ignore=True) def convert_time_format(time_str): """ 将时间字符串转换为标准格式 :param time_str: 原始时间字符串,如 "December 3, 2025 at 4:29 PM" :return: 标准时间格式字符串,如 "2025-12-03 16:29:00" """ if not time_str: return None try: dt_obj = datetime.strptime(time_str, "%B %d, %Y at %I:%M %p") return dt_obj.strftime("%Y-%m-%d %H:%M:%S") except ValueError as e: logger.warning(f"时间转换失败: {time_str}, 错误: {e}") return None @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_sale_detail_single_page(log, page, sql_id, detail_id, sql_pool=None): """ 获取商品详情 :param log: logger对象 :param page: page对象 :param sql_id: 数据库id :param detail_id: 商品详情id :param sql_pool: MySQL连接池对象 :return: """ log.info(f"========================== 获取商品 详情, sql_id: {sql_id} ==========================") # page_url = "https://courtyard.io/asset/a4f0bbebd858370567f1779fddf0f55630810116d80965e33940fc8ff5ac94b4" page_url = f"https://courtyard.io/asset/{detail_id}" page.get(page_url) page.wait.load_start() log.debug(f'{inspect.currentframe().f_code.co_name} -> 页面加载成功, url: {page_url}') html = page.html if not html: log.error(f'{inspect.currentframe().f_code.co_name} -> 页面加载失败...........') raise Exception('页面加载失败, 重新加载........') # 抛出异常以便重试 selector = Selector(text=html) # 方法一:通过文本内容匹配(优先) correlation_spans = selector.xpath('//span[contains(text(), ":")]/text()') correlation_id = None for text_selector in correlation_spans: correlation_id = text_selector.get() # ✅ 获取字符串 # match = re.search(r'[\w\s]+:\s*(\d+)', text) # if match: # correlation_id = match.group(1) # break # 获取第一个有效 ID # correlation_spans = selector.xpath('//span[contains(text(), ":")]') # correlation_text = None # # for span in correlation_spans: # text = span.get() # if text and ":" in text: # correlation_text = text # break # 如果方法一失败,使用方法二:通过结构定位(备用) if not correlation_id: correlation_span = selector.xpath('//a[contains(@href, "cgccards.com")]/preceding-sibling::span[1]/text()') correlation_id = correlation_span.get() # 初始化所有可能的字段为None data_dict = {"detail_id": detail_id, "correlation_id": correlation_id, "burn_from": None, "burn_to": None, "burn_time": None, "sale_price": None, "sale_from": None, "sale_to": None, "sale_time": None, "mint_price": None, "mint_from": None, "mint_to": None, "mint_time": None} # 提取各个标签的数据 # tag_div_list = selector.xpath('//div[@class="MuiBox-root css-aylq9e"]/div')# class="MuiBox-root css-1b09fes" # 获取 "Activity history" 后面的 div activity_div = selector.xpath('//h6[text()="Activity history"]/following-sibling::div[1]/div') for tag_div in activity_div: tag_name = tag_div.xpath('./div[1]/div/span/text()').get() if not tag_name: continue if tag_name == "Burn": data_dict["burn_from"] = tag_div.xpath('./div[2]/div[1]//h6/text()').get() data_dict["burn_to"] = tag_div.xpath('./div[2]/div[2]//h6/text()').get() data_dict["burn_time"] = tag_div.xpath('./div[2]/div[3]/span/@aria-label').get() # December 3, 2025 at 4:29 PM 转换时间格式 data_dict["burn_time"] = convert_time_format(data_dict["burn_time"]) elif tag_name == "Sale": sale_price = tag_div.xpath('./div[2]/span/text()').get() if sale_price: sale_price = sale_price.replace("$", "").replace(",", "") data_dict["sale_price"] = sale_price data_dict["sale_from"] = tag_div.xpath('./div[3]/div[1]//h6/text()').get() data_dict["sale_to"] = tag_div.xpath('./div[3]/div[2]//h6/text()').get() data_dict["sale_time"] = tag_div.xpath('./div[3]/div[3]/span/@aria-label').get() # December 3, 2025 at 4:29 PM 转换时间格式 data_dict["sale_time"] = convert_time_format(data_dict["sale_time"]) elif tag_name == "Mint": mint_price = tag_div.xpath('./div[2]/span/text()').get() if mint_price: mint_price = mint_price.replace("$", "").replace(",", "") data_dict["mint_price"] = mint_price data_dict["mint_from"] = tag_div.xpath('./div[3]/div[1]//h6/text()').get() data_dict["mint_to"] = tag_div.xpath('./div[3]/div[2]//h6/text()').get() data_dict["mint_time"] = tag_div.xpath('./div[3]/div[3]/span/@aria-label').get() # December 3, 2025 at 4:29 PM 转换时间格式 data_dict["mint_time"] = convert_time_format(data_dict["mint_time"]) # log.info(f'Sale detail data: {data_dict}') # 保存数据 sql_pool.insert_one_or_dict(table="courtyard_detail_record", data=data_dict, ignore=True) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_sale_detail_list(log, sql_pool=None): """ 获取商品详情 :param log: logger对象 # :param detail_id_list: 详情id列表 :param sql_pool: MySQL连接池对象 :return: """ log.info(f"========================== 获取商品 详情 LIST ==========================") options = ChromiumOptions() options.set_paths(local_port=9138, user_data_path=r'D:\Drissionpage_temp\courtyard_port_9138') # options.set_proxy("http://" + tunnel) # options.auto_port(True) options.no_imgs(True) # 最大化 options.set_argument("--start-maximized") options.set_argument("--disable-gpu") options.set_argument("-accept-lang=en-US") page = ChromiumPage(options) try: sql_detail_id_list = sql_pool.select_all("SELECT id, detail_id FROM courtyard_list_record WHERE state = 0") for sql_detail_id in sql_detail_id_list: sql_id = sql_detail_id[0] detail_id = sql_detail_id[1] try: get_sale_detail_single_page(log, page, sql_id, detail_id, sql_pool) sql_pool.update_one("UPDATE courtyard_list_record SET state = 1 WHERE id = %s", (sql_id,)) except Exception as e: log.error(f'get_sale_detail_single_page error: {e}') sql_pool.update_one("UPDATE courtyard_list_record SET state = 2 WHERE id = %s", (sql_id,)) except Exception as e: log.error(f'get_response error: {e}') raise 'get_response error' finally: page.quit() @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def list_main(log): """ 主函数 自动售货机 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') start = time.time() # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: try: log.debug('------------------- 开始获取商品列表 -------------------') get_goods_list(log, sql_pool) except Exception as e: log.error(f'get_goods_list error: {e}') except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') end = time.time() elapsed_time = end - start log.info(f'============================== 本次爬虫运行时间:{elapsed_time:.2f} 秒 ===============================') return elapsed_time @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def detail_main(log): """ 主函数 自动售货机 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") global detail_first_run_completed try: # 获取详情页信息 try: log.debug('------------------- 获取商品 detail 数据 -------------------') get_sale_detail_list(log, sql_pool) except Exception as e: log.error(f'get_sale_detail_list error: {e}') except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: detail_first_run_completed = True log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') def control_list_mask(log): """ 控制列表爬虫任务 每10分钟运行 :param log: logger对象 """ while True: log.info( f'--------------------- 开始运行 {inspect.currentframe().f_code.co_name} 新一轮的爬虫任务 ---------------------') elapsed_time = list_main(log) # 计算剩余时间 wait_time = max(0, 300 - int(elapsed_time)) if wait_time > 0: log.info(f"程序运行时间{elapsed_time:.2f}秒, 小于 5 分钟,等待 {wait_time:.2f} 秒后再开始下一轮任务") time.sleep(wait_time) else: log.info("程序运行时间大于等于5分钟,直接开始下一轮任务") def scheduled_detail_main(log): """定时任务调用的包装函数""" global detail_first_run_completed if detail_first_run_completed: detail_main(log) else: log.info("Skipping scheduled task as first run is not completed yet") def run_threaded(job_func, *args, **kwargs): """ 在新线程中运行给定的函数,并传递参数。 :param job_func: 要运行的目标函数 :param args: 位置参数 :param kwargs: 关键字参数 """ job_thread = threading.Thread(target=job_func, args=args, kwargs=kwargs) job_thread.start() def schedule_task(): """ 设置定时任务 """ # 启动 control_list_mask 任务线程 list_thread = threading.Thread(target=control_list_mask, args=(logger,)) list_thread.daemon = True # 设置为守护线程,主程序退出时自动结束 list_thread.start() # 启动 detail_main 任务线程(首次运行) detail_thread = threading.Thread(target=detail_main, args=(logger,)) detail_thread.daemon = True detail_thread.start() # 设置定时任务 每天 # schedule.every().day.at("00:01").do(run_threaded, detail_main, logger) schedule.every().day.at("00:01").do(run_threaded, scheduled_detail_main, logger) while True: schedule.run_pending() time.sleep(1) if __name__ == '__main__': schedule_task() # detail_main(log=logger) # get_sale_detail_list(log, ((1, 'a4f0bbebd858370567f1779fddf0f55630810116d80965e33940fc8ff5ac94b4'), # (2, 'a4f0bbebd858370567f1779fddf0f55630810116d80965e33940fc8ff5ac94b4')), sql_pool)