# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/9/8 11:56 import re import requests from parsel import Selector import add_settings import inspect from loguru import logger from tenacity import retry, stop_after_attempt, wait_fixed from mysql_pool import MySQLConnectionPool """ 第二个 页面 年份获取 /html/body/div[3]/span[position() >= 4 and position() <= 18] """ logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") def turn_page(log, category_name, year, series, series_url, sql_pool): """ 翻页 :param log: 日志对象 :param category_name: 类别名称 :param year: 年份 :param series: 系列名称 :param series_url: 系列url :param sql_pool: 数据库连接池 :return: """ log.info(f"{inspect.currentframe().f_code.co_name} ->->-> series: {series}") turn_base_url = series_url.split('page_no=')[0] + 'page_no=' page = 1 max_page = 50 while page < max_page + 1: # url = "https://www.breakninja.com/baseball/2025-Topps-All-Star-Game-Mega-Box-Baseball-Checklist.php?page_no=2" # 将参数 page_no 的值改为 page 构建新的URL current_url = turn_base_url + str(page) log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> current_url: {current_url}') len_data, selector = turn_single_page(log, category_name, year, series, current_url, sql_pool) if len_data < 250: log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> len_data: {len_data}, 少于250条, break!!!') break max_page_text = selector.xpath('/html/body/div[3]/div[4]/strong[1]/text()').get() # if max_page_: if max_page_text: # 使用正则表达式提取 "of" 后面的数字 match = re.search(r'of\s+(\d+)', max_page_text) if match: total_pages = int(match.group(1)) max_page = (total_pages // 250) + 1 # 计算最大翻页次数 log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> max_page: {max_page}') page += 1 @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def turn_single_page(log, category_name, year, series, current_url, sql_pool): """ 翻页 单页查询 :param log: :param category_name: :param year: :param series: :param current_url: :param sql_pool: :return: """ log.info(f"{inspect.currentframe().f_code.co_name} ->->-> series: {series}") response = requests.get(current_url, headers=add_settings.headers, timeout=60) # print(response.text) response.raise_for_status() selector = Selector(response.text) tag_table = selector.xpath('//table[@class="table table-striped table-bordered"]') # 如果是这种 需要翻页 log.debug(f'{inspect.currentframe().f_code.co_name} tag_table size: {len(tag_table)}') title = selector.xpath('//h1/text()').get() if tag_table: log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> tag_table size 111: {len(tag_table)}') # len_data = parse_one_team(log, tag_table, category_name, series, sql_pool) len_data = parse_turn_single_team(log, tag_table, category_name, year, series, current_url, sql_pool, title) else: len_data = 0 return len_data, selector def parse_turn_single_team(log, tag_table, category_name, year, series, series_url, sql_pool, title): # 获取表头(第一行) head = tag_table.xpath('./thead/tr/th/text()').getall() # 统一表头字段名:将 "#" 替换为 "Number" head = ['Number' if h.strip() == '#' else h for h in head] head = ['Player' if h.strip() == 'Name' else h for h in head] head = ['Print Run' if h.strip() == 'Run' else h for h in head] # 获取所有行的数据 rows = tag_table.xpath('./tbody/tr') data = [] for row in rows: cells = row.xpath('./td/text()').getall() cells = [cell.strip() for cell in cells if cell.strip()] if not cells: cells = [''] * len(head) # 如果没有数据,用空字符串填充 # 将当前行的数据与表头组合成字典 row_data = dict(zip(head, cells)) # 检查是否所有字段都为空,如果都不为空才添加 if any(value.strip() for value in row_data.values()): data.append(row_data) # 打印结果 log.info(f'{inspect.currentframe().f_code.co_name} ->->-> len data size: {len(data)}') # 定义数据库中存在的字段 database_fields = { 'player', 'number', 'card_set', 'copies', 'team', 'print_run', 'category_name', 'year', 'series', 'series_url', 'team_url', 'title' } info_list = [] for item in data: item.update( { 'category_name': category_name, 'year': year, 'series': series, 'series_url': series_url, 'title': title } ) # print(item) # 处理字段名:转换为小写并替换特殊字符 processed_item = {} for key, value in item.items(): # 转换为小写,并将空格和特殊字符替换为下划线 new_key = key.lower().replace(' ', '_').replace('-', '_').replace('/', '_').replace('(', '').replace( ')', '') processed_item[new_key] = value # print(processed_item) # 创建统一结构的字典,确保所有字段都存在 unified_item = {} for field in database_fields: unified_item[field] = processed_item.get(field, None) info_list.append(unified_item) # print(info_list) try: if info_list: # sql_pool.insert_one_or_dict(table='breakninja_checklist_record_202511', data=processed_item) sql_pool.insert_many(table='breakninja_checklist_record_202511', data_list=info_list) except Exception as e: log.error(f"insert_many data: {e}") return len(data) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def ninja_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: for ca in add_settings.series_url_list: category_name = ca.get('category_name') years = ca.get('years') for ye in years: year = ye.get('year') series = ye.get('series') for ser in series: series_name = ser.get('series_name') series_url = ser.get('series_url') log.debug( f'category_name: {category_name}, year: {year}, series_name: {series_name}, series_url: {series_url}') try: if '?page_no=' in series_url: log.debug(f'page_no in series_url, request turn_page.........') turn_page(log, category_name, year, series_name, series_url, sql_pool) else: log.debug(f'page_no not in series_url, request turn_single_page.........') except Exception as e: log.error(f"Error processing series: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': ninja_main(logger)