# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/8/20 14:50 import inspect import requests import user_agent from loguru import logger from parsel import Selector from tenacity import retry, stop_after_attempt, wait_fixed from mysql_pool import MySQLConnectionPool import us_setting # logger.remove() # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", # level="DEBUG", retention="7 day") headers = { # "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", # "accept-language": "en,zh-CN;q=0.9,zh;q=0.8", # "priority": "u=0, i", # "referer": "https://www.pokemon.com/us/pokemon-tcg/pokemon-cards?cardName=&cardText=&evolvesFrom=&rsv10pt5=on&hitPointsMin=0&hitPointsMax=340&retreatCostMin=0&retreatCostMax=5&totalAttackCostMin=0&totalAttackCostMax=5&particularArtist=&advancedSubmit=", "user-agent": user_agent.generate_user_agent() } crawler_language = "us" def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e def get_category_list(log) -> list: """ 获取分类列表 """ url = "https://www.pokemon.com/us/pokemon-tcg/pokemon-cards" response = requests.get(url, headers=headers, timeout=10, proxies=get_proxys(log)) # print(response.text) response.raise_for_status() selector = Selector(response.text) cate_list = [] category_list = selector.xpath('//*[@id="filterExpansions"]/div/fieldset') for category in category_list: major_category_name = category.xpath('.//div/h2/text()').get() tag_li_list = category.xpath('.//ul/li') for tag_li in tag_li_list: category_id = tag_li.xpath('.//input/@id').get() category_name = tag_li.xpath('./label/span/text()').get() data_dict = { "major_category_name": major_category_name, "category_id": category_id, "category_name": category_name } # print(data_dict) cate_list.append(data_dict) return cate_list @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_card_info_single_page(log, major_category_name, category_id, category_name, sql_pool, page): url = "https://www.pokemon.com/us/pokemon-tcg/pokemon-cards" params = { # "cardName": "", # "cardText": "", # "evolvesFrom": "", # "zsv10pt5": "on", f"{category_id}": "on", # "hitPointsMin": "0", # "hitPointsMax": "340", # "retreatCostMin": "0", # "retreatCostMax": "5", # "totalAttackCostMin": "0", # "totalAttackCostMax": "5", # "particularArtist": "", # "advancedSubmit": "", # "sort": [ # "number", # "number" # ], "page": str(page) } response = requests.get(url, headers=headers, params=params, timeout=10, proxies=get_proxys(log)) # print(response.text) response.raise_for_status() selector = Selector(response.text) tag_li_list = selector.xpath('//*[@id="cardResults"]/li') info_list = [] for tag_li in tag_li_list: tag_a = tag_li.xpath('./a/@href').get() parts = tag_a.strip('/').split('/') card_id = f"{parts[-2]}/{parts[-1]}" detail_url = f"https://www.pokemon.com{tag_a}" img = tag_li.xpath('.//img/@src').get() # print(card_id, category_id, category_name, detail_url, img) data_dict = { "card_id": card_id, "major_category_name": major_category_name, "pg_value": category_id, "pg_label": category_name, "detail_url": detail_url, "img": img, "crawler_language":crawler_language } info_list.append(data_dict) if info_list: sql_pool.insert_many(table="pokemon_card_record", data_list=info_list, ignore=True) return len(tag_li_list) def get_card_info_list(log, major_category_name, category_id, category_name, sql_pool): page = 1 # max_page = 5000 max_page = 50 while page <= max_page: log.debug(f"正在获取第 {page} 页数据, 分类: {category_name} .........") try: log.debug( f'--------------- {inspect.currentframe().f_code.co_name}, page {page}, category_name {category_name} start ---------------') len_items = get_card_info_single_page(log, major_category_name, category_id, category_name, sql_pool, page) except Exception as e: log.error( f"{inspect.currentframe().f_code.co_name} Request get_card_info_single_page for page:{page}, {e}") len_items = 0 if len_items < 12: log.debug(f'--------------- page {page} has {len_items} items, break ---------------') break page += 1 # ---------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_details(log, sql_id_detail_url: tuple, sql_pool): sql_id = sql_id_detail_url[0] detail_url = sql_id_detail_url[1] log.debug(f"正在获取商品详情, 商品ID: {sql_id}, 商品详情: {detail_url}") response = requests.get(detail_url, headers=headers, timeout=10, proxies=get_proxys(log)) response.raise_for_status() selector = Selector(response.text) card_name = selector.xpath('//div[@class="card-description"]/div/h1/text()').get() card_name = card_name.strip() if card_name else '' card_no_rarity = selector.xpath('//div[@class="stats-footer"]/span/text()').get() data_dict = { "card_name": card_name, "card_no_rarity": card_no_rarity } # log.debug(f'data_dict:\n{data_dict}') sql_pool.update_one_or_dict( table="pokemon_card_record", data=data_dict, condition={"id": sql_id} ) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def us_pokemon_main(log): """ 主函数 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: # 获取分类列表 log.debug(".......... 获取分类列表 ..........") cate_list = get_category_list(logger) if not cate_list: cate_list = us_setting.POKEMON_SETS for cate in cate_list: major_category_name = cate.get("major_category_name") category_id = cate.get("category_id") category_name = cate.get("category_name") log.info(major_category_name, category_id, category_name) try: # 获取商品列表 get_card_info_list(logger, major_category_name, category_id, category_name, sql_pool) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} Request get_card_info_list error: {e}") # 获取商品详情 log.debug(f"........... 获取商品详情 ..........") sql_ietm_id_list = sql_pool.select_all( f"SELECT id, detail_url FROM pokemon_card_record WHERE card_name IS NULL AND crawler_language='{crawler_language}'") for item_id in sql_ietm_id_list: try: get_details(log, item_id, sql_pool) except Exception as e: log.error(f"Request get_details error: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': # get_card_info_single_page(logger) # get_category_list(logger) us_pokemon_main(logger)