# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/9/8 11:56 import requests from parsel import Selector import ninja_settings import inspect import time import schedule from loguru import logger from tenacity import retry, stop_after_attempt, wait_fixed from mysql_pool import MySQLConnectionPool """ 第二个 页面 年份获取 /html/body/div[3]/span[position() >= 4 and position() <= 18] """ # logger.remove() # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", # level="DEBUG", retention="7 day") def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_series(log, category_name, category_url, sql_pool): """ 获取系列 :param log: 日志对象 :param category_name: 类别名称 :param category_url: 类别url :param sql_pool: 数据库连接池 :return: """ log.info(f"{inspect.currentframe().f_code.co_name} ->->-> category_name: {category_name}") # url = "https://www.breakninja.com/baseball_box_break_group_checklists.html" response = requests.get(category_url, headers=ninja_settings.headers, proxies=get_proxys(log), timeout=22) response.raise_for_status() selector = Selector(response.text) tag_detail_url_list = selector.xpath('//p[@align="left"]/a') # print(len(tag_detail_url_list)) log.debug(f'{inspect.currentframe().f_code.co_name} tag_detail_url_list size: {len(tag_detail_url_list)}') # skip_until_target = True # 控制是否跳过直到目标URL # target_url = '2017-Topps-Inception-Baseball-Most-Popular-Cards-With-Large-Pictures.html' for tag_detail_url in tag_detail_url_list: series = tag_detail_url.xpath('./text()').get() series_url = tag_detail_url.xpath('./@href').get() # # 如果需要跳过直到目标URL # if skip_until_target and series_url != target_url: # log.info(f"跳过 {series_url}") # continue # # # 找到目标URL后,取消跳过标志 # if series_url == target_url: # skip_until_target = False # log.info(f"从 {series_url} 开始执行") # continue # 如果也想跳过目标URL本身,保留这行;如果想包含目标URL,删除这行 if 'http' not in series_url: series_url = ninja_settings.base_url + series_url # print(series, series_url) try: if 'page_no=' in series_url: log.debug(f'page_no in team_url, request turn_page.........') turn_page(log, category_name, series, series_url, sql_pool) else: get_table_list(log, category_name, series, series_url, sql_pool) except Exception as e: log.error(f"Error processing series: {e}") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_table_list(log, category_name, series, series_url, sql_pool): """ 获取表单 :param log: 日志对象 :param category_name: 类别名称 :param series: 系列名称 :param series_url: 系列url :param sql_pool: 数据库连接池 :return: """ log.info(f"{inspect.currentframe().f_code.co_name} ->->-> series: {series}") # url = "https://www.breakninja.com/baseball/2025-Topps-All-Star-Game-Mega-Box-Baseball-Checklist.php" response = requests.get(series_url, headers=ninja_settings.headers, proxies=get_proxys(log), timeout=22) response.raise_for_status() selector = Selector(response.text) tag_team_list = selector.xpath('//div[@class="col-7 col-s-7"]/a | //div[@class="col-3 col-s-3 menu"]/a') log.debug(f'{inspect.currentframe().f_code.co_name} tag_team_list size: {len(tag_team_list)}') for tag_team in tag_team_list: team = tag_team.xpath('./text()').get() if team in ninja_settings.exclusion_words: log.debug(f"Skipping <{team}> due to exclusion word") continue team_url = tag_team.xpath('./@href').get() if 'http' not in team_url: team_url = ninja_settings.base_url + team_url # print(team, team_url) log.debug(f'no page_no in team_url, request parse_one_team.........') get_table_list_2(log, category_name, series, sql_pool, team_url) def turn_page(log, category_name, series, series_url, sql_pool): """ 翻页 :param log: 日志对象 :param category_name: 类别名称 :param series: 系列名称 :param series_url: 系列url :param sql_pool: 数据库连接池 :return: """ log.info(f"{inspect.currentframe().f_code.co_name} ->->-> series: {series}") turn_base_url = series_url.split('page_no=')[0] + 'page_no=' page = 1 max_page = 50 while page < max_page + 1: # url = "https://www.breakninja.com/baseball/2025-Topps-All-Star-Game-Mega-Box-Baseball-Checklist.php?page_no=2" # 将参数 page_no 的值改为 page 构建新的URL current_url = turn_base_url + str(page) log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> page: {page}') len_data, selector = turn_single_page(log, category_name, series, current_url, sql_pool) if len_data < 250: log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> len_data: {len_data}, 少于250条, break!!!') break max_page_ = selector.xpath('/html/body/div[3]/div[4]/strong[1]/text()').get() if max_page_: max_page = (int(max_page_) / 250) + 1 log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> max_page: {max_page}') page += 1 @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def turn_single_page(log, category_name, series, current_url, sql_pool): """ 翻页 单页查询 :param log: :param category_name: :param series: :param current_url: :param sql_pool: :return: """ log.info(f"{inspect.currentframe().f_code.co_name} ->->-> series: {series}") response = requests.get(current_url, headers=ninja_settings.headers, proxies=get_proxys(log), timeout=60) # print(response.text) response.raise_for_status() selector = Selector(response.text) tag_table = selector.xpath('//table[@class="table table-striped table-bordered"]') # 如果是这种 需要翻页 log.debug(f'{inspect.currentframe().f_code.co_name} tag_table size: {len(tag_table)}') title = selector.xpath('//h1/text()').get() if tag_table: log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> tag_table size 111: {len(tag_table)}') # len_data = parse_one_team(log, tag_table, category_name, series, sql_pool) len_data = parse_turn_single_team(log, tag_table, category_name, series, current_url, sql_pool, title) else: len_data = 0 return len_data, selector def parse_turn_single_team(log, tag_table, category_name, series, series_url, sql_pool, title): # 获取表头(第一行) head = tag_table.xpath('./thead/tr/th/text()').getall() # 统一表头字段名:将 "#" 替换为 "Number" head = ['Number' if h.strip() == '#' else h for h in head] head = ['Player' if h.strip() == 'Name' else h for h in head] head = ['Print Run' if h.strip() == 'Run' else h for h in head] # 获取所有行的数据 rows = tag_table.xpath('./tbody/tr') data = [] for row in rows: cells = row.xpath('./td/text()').getall() cells = [cell.strip() for cell in cells if cell.strip()] if not cells: cells = [''] * len(head) # 如果没有数据,用空字符串填充 # 将当前行的数据与表头组合成字典 row_data = dict(zip(head, cells)) # 检查是否所有字段都为空,如果都不为空才添加 if any(value.strip() for value in row_data.values()): data.append(row_data) # 打印结果 log.info(f'{inspect.currentframe().f_code.co_name} ->->-> len data size: {len(data)}') for item in data: item.update({'category_name': category_name, 'series': series, 'series_url': series_url, 'title': title}) # print(item) # 处理字段名:转换为小写并替换特殊字符 processed_item = {} for key, value in item.items(): # 转换为小写,并将空格和特殊字符替换为下划线 new_key = key.lower().replace(' ', '_').replace('-', '_').replace('/', '_').replace('(', '').replace( ')', '') processed_item[new_key] = value print(processed_item) # try: # sql_pool.insert_one_or_dict(table='breakninja_checklist_record', data=processed_item) # except Exception as e: # log.error(f"insert_one_or_dict data: {e}") return len(data) def parse_one_team(log, tag_table, category_name, series, sql_pool, team_url, title): # 获取表头(第一行) head = tag_table.xpath('./thead/tr/th/text()').getall() # 统一表头字段名:将 "#" 替换为 "Number" head = ['Number' if h.strip() == '#' else h for h in head] head = ['Player' if h.strip() == 'Name' else h for h in head] head = ['Print Run' if h.strip() == 'Run' else h for h in head] # title = tag_table.xpath('./thead/tr/th/text()').getall() # 获取所有行的数据 rows = tag_table.xpath('./tbody/tr') data = [] for row in rows: cells = row.xpath('./td/text()').getall() cells = [cell.strip() for cell in cells if cell.strip()] if not cells: cells = [''] * len(head) # 如果没有数据,用空字符串填充 # 将当前行的数据与表头组合成字典 row_data = dict(zip(head, cells)) # 检查是否所有字段都为空,如果都不为空才添加 if any(value.strip() for value in row_data.values()): data.append(row_data) # 打印结果 log.info(f'{inspect.currentframe().f_code.co_name} ->->-> len data size: {len(data)}') for item in data: item.update({'category_name': category_name, 'series': series, 'team_url': team_url, 'title': title}) # print(item) # 处理字段名:转换为小写并替换特殊字符 processed_item = {} for key, value in item.items(): # 转换为小写,并将空格和特殊字符替换为下划线 new_key = key.lower().replace(' ', '_').replace('-', '_').replace('/', '_').replace('(', '').replace(')', '') processed_item[new_key] = value try: # print(processed_item) sql_pool.insert_one_or_dict(table='breakninja_checklist_record', data=processed_item) except Exception as e: log.error(f"insert_one_or_dict data: {e}") return len(data) def parse_two_team(log, tag_table, category_name, series, sql_pool, team_url, title): # 获取表头(第一行) head = tag_table.xpath('.//th/text()').getall() # 统一表头字段名:将 "#" 替换为 "Number" head = ['Number' if h.strip() == '#' else h for h in head] head = ['Player' if h.strip() == 'Name' else h for h in head] head = ['Print Run' if h.strip() == 'Run' else h for h in head] # 获取所有行的数据 rows = tag_table.xpath('.//tr') data = [] for row in rows: cells = row.xpath('./td/text()').getall() cells = [cell.strip() for cell in cells if cell.strip()] if not cells: cells = [''] * len(head) # 如果没有数据,用空字符串填充 # 将当前行的数据与表头组合成字典 row_data = dict(zip(head, cells)) # 检查是否所有字段都为空,如果都不为空才添加 if any(value.strip() for value in row_data.values()): data.append(row_data) # 打印结果 log.info(f'{inspect.currentframe().f_code.co_name} ->->-> len data size: {len(data)}') for item in data: item.update({'category_name': category_name, 'series': series, 'team_url': team_url, 'title': title}) # 处理字段名:转换为小写并替换特殊字符 processed_item = {} for key, value in item.items(): # 转换为小写,并将空格和特殊字符替换为下划线 new_key = key.lower().replace(' ', '_').replace('-', '_').replace('/', '_').replace('(', '').replace(')', '') processed_item[new_key] = value try: sql_pool.insert_one_or_dict(table='breakninja_checklist_record', data=processed_item) except Exception as e: log.error(f"insert_one_or_dict data: {e}") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_table_list_2(log, category_name, series, sql_pool, team_url): # url = 'https://www.breakninja.com/baseball/2025-Topps-All-Star-Game-Mega-Box-Baseball-Checklist-Arizona-Diamondbacks.php' # url = 'https://www.breakninja.com/baseball/2025-Topps-All-Star-Game-Mega-Box-Baseball-Checklist-One-Of-Ones.php' # url = 'https://www.breakninja.com/baseball/2025-Topps-All-Star-Game-Mega-Box-Baseball-Checklist-No-Team.php' # url = 'https://www.breakninja.com/basketballcards/21-22-Bowman-University-Basketball-Checklist.php?page_no=1' # url = 'https://www.breakninja.com/baseball/2018-Topps-Archives-Baseball-Checklist-Arizona-Diamondbacks.php' # url = 'https://www.breakninja.com/basketball/22-23-Panini-Contenders-Basketball-Checklist-Miami-Heat.php' # response = requests.get(url_, headers=ninja_settings.headers, proxies=get_proxys(log)) log.info(f'{inspect.currentframe().f_code.co_name} ->->-> team_url: {team_url}') response = requests.get(team_url, headers=ninja_settings.headers, proxies=get_proxys(log), timeout=22) response.raise_for_status() selector = Selector(response.text) title = selector.xpath('//h1/text()').get() # print(title) # 有多种页面处理 tag_table = selector.xpath('//table[@class="table table-striped table-bordered"]') # 如果是这种 需要翻页 if tag_table: if len(tag_table) != 0: log.debug(f'{inspect.currentframe().f_code.co_name} tag_table size 111: {len(tag_table)}') parse_one_team(log, tag_table, category_name, series, sql_pool, team_url, title) else: tag_table = selector.xpath('//table[@class="table-fill"]') if len(tag_table) != 0: log.debug(f'{inspect.currentframe().f_code.co_name} tag_table size 222: {len(tag_table)}') parse_one_team(log, tag_table, category_name, series, sql_pool, team_url, title) if not tag_table: tag_table = selector.xpath('//table[1]') if len(tag_table) != 0: log.debug(f'{inspect.currentframe().f_code.co_name} tag_table size 333: {len(tag_table)}') parse_two_team(log, tag_table, category_name, series, sql_pool, team_url, title) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def ninja_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: for ca in ninja_settings.category_list: category_name = ca.get('category_name') category_url = ca.get('category_url') try: get_series(log, category_name, category_url, sql_pool) except Exception as e: log.error(f"Request get_series error: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') # EmailSender().send(subject="【 - 爬虫通知】今日任务已完成", # content="数据采集和处理已全部完成,请查收结果。\n\n ------ 来自 Python 爬虫系统。") # def schedule_task(): # """ # 爬虫模块 定时任务 的启动文件 # """ # # 立即运行一次任务 # ninja_main(log=logger) # # # 设置定时任务 # schedule.every().day.at("00:01").do(ninja_main, log=logger) # # while True: # schedule.run_pending() # time.sleep(1) # if __name__ == '__main__': # get_table_list(logger) # get_table_list_2(logger, # 'https://www.breakninja.com/basketball/22-23-Panini-Contenders-Basketball-Checklist-Miami-Heat.php') # # ninja_main(logger) # turn_page(logger, None, None, # "https://www.breakninja.com/baseballcards/2024-Topps-Athletes-Unlimited-Pro-Softball-Checklist.php?page_no=4", # None) turn_single_page(logger, None, None, "https://www.breakninja.com/baseballcards/2024-Topps-Athletes-Unlimited-Pro-Softball-Checklist.php?page_no=4", None)