| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/9/8 11:56
- import re
- import requests
- from parsel import Selector
- import add_settings
- import inspect
- from loguru import logger
- from tenacity import retry, stop_after_attempt, wait_fixed
- from mysql_pool import MySQLConnectionPool
- """
- 第二个 页面 年份获取 /html/body/div[3]/span[position() >= 4 and position() <= 18]
- """
- logger.remove()
- logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- def turn_page(log, category_name, year, series, series_url, sql_pool):
- """
- 翻页
- :param log: 日志对象
- :param category_name: 类别名称
- :param year: 年份
- :param series: 系列名称
- :param series_url: 系列url
- :param sql_pool: 数据库连接池
- :return:
- """
- log.info(f"{inspect.currentframe().f_code.co_name} ->->-> series: {series}")
- turn_base_url = series_url.split('page_no=')[0] + 'page_no='
- page = 1
- max_page = 50
- while page < max_page + 1:
- # url = "https://www.breakninja.com/baseball/2025-Topps-All-Star-Game-Mega-Box-Baseball-Checklist.php?page_no=2"
- # 将参数 page_no 的值改为 page 构建新的URL
- current_url = turn_base_url + str(page)
- log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> current_url: {current_url}')
- len_data, selector = turn_single_page(log, category_name, year, series, current_url, sql_pool)
- if len_data < 250:
- log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> len_data: {len_data}, 少于250条, break!!!')
- break
- max_page_text = selector.xpath('/html/body/div[3]/div[4]/strong[1]/text()').get()
- # if max_page_:
- if max_page_text:
- # 使用正则表达式提取 "of" 后面的数字
- match = re.search(r'of\s+(\d+)', max_page_text)
- if match:
- total_pages = int(match.group(1))
- max_page = (total_pages // 250) + 1 # 计算最大翻页次数
- log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> max_page: {max_page}')
- page += 1
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def turn_single_page(log, category_name, year, series, current_url, sql_pool):
- """
- 翻页 单页查询
- :param log:
- :param category_name:
- :param year:
- :param series:
- :param current_url:
- :param sql_pool:
- :return:
- """
- log.info(f"{inspect.currentframe().f_code.co_name} ->->-> series: {series}")
- response = requests.get(current_url, headers=add_settings.headers, timeout=60)
- # print(response.text)
- response.raise_for_status()
- selector = Selector(response.text)
- tag_table = selector.xpath('//table[@class="table table-striped table-bordered"]') # 如果是这种 需要翻页
- log.debug(f'{inspect.currentframe().f_code.co_name} tag_table size: {len(tag_table)}')
- title = selector.xpath('//h1/text()').get()
- if tag_table:
- log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> tag_table size 111: {len(tag_table)}')
- # len_data = parse_one_team(log, tag_table, category_name, series, sql_pool)
- len_data = parse_turn_single_team(log, tag_table, category_name, year, series, current_url, sql_pool, title)
- else:
- len_data = 0
- return len_data, selector
- def parse_turn_single_team(log, tag_table, category_name, year, series, series_url, sql_pool, title):
- # 获取表头(第一行)
- head = tag_table.xpath('./thead/tr/th/text()').getall()
- # 统一表头字段名:将 "#" 替换为 "Number"
- head = ['Number' if h.strip() == '#' else h for h in head]
- head = ['Player' if h.strip() == 'Name' else h for h in head]
- head = ['Print Run' if h.strip() == 'Run' else h for h in head]
- # 获取所有行的数据
- rows = tag_table.xpath('./tbody/tr')
- data = []
- for row in rows:
- cells = row.xpath('./td/text()').getall()
- cells = [cell.strip() for cell in cells if cell.strip()]
- if not cells:
- cells = [''] * len(head) # 如果没有数据,用空字符串填充
- # 将当前行的数据与表头组合成字典
- row_data = dict(zip(head, cells))
- # 检查是否所有字段都为空,如果都不为空才添加
- if any(value.strip() for value in row_data.values()):
- data.append(row_data)
- # 打印结果
- log.info(f'{inspect.currentframe().f_code.co_name} ->->-> len data size: {len(data)}')
- # 定义数据库中存在的字段
- database_fields = {
- 'player', 'number', 'card_set', 'copies', 'team', 'print_run',
- 'category_name', 'year', 'series', 'series_url', 'team_url', 'title'
- }
- info_list = []
- for item in data:
- item.update(
- {
- 'category_name': category_name,
- 'year': year,
- 'series': series,
- 'series_url': series_url,
- 'title': title
- }
- )
- # print(item)
- # 处理字段名:转换为小写并替换特殊字符
- processed_item = {}
- for key, value in item.items():
- # 转换为小写,并将空格和特殊字符替换为下划线
- new_key = key.lower().replace(' ', '_').replace('-', '_').replace('/', '_').replace('(', '').replace(
- ')', '')
- processed_item[new_key] = value
- # print(processed_item)
- # 创建统一结构的字典,确保所有字段都存在
- unified_item = {}
- for field in database_fields:
- unified_item[field] = processed_item.get(field, None)
- info_list.append(unified_item)
- # print(info_list)
- try:
- if info_list:
- # sql_pool.insert_one_or_dict(table='breakninja_checklist_record_202511', data=processed_item)
- sql_pool.insert_many(table='breakninja_checklist_record_202511', data_list=info_list)
- except Exception as e:
- log.error(f"insert_many data: {e}")
- return len(data)
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def ninja_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- for ca in add_settings.series_url_list:
- category_name = ca.get('category_name')
- years = ca.get('years')
- for ye in years:
- year = ye.get('year')
- series = ye.get('series')
- for ser in series:
- series_name = ser.get('series_name')
- series_url = ser.get('series_url')
- log.debug(
- f'category_name: {category_name}, year: {year}, series_name: {series_name}, series_url: {series_url}')
- try:
- if '?page_no=' in series_url:
- log.debug(f'page_no in series_url, request turn_page.........')
- turn_page(log, category_name, year, series_name, series_url, sql_pool)
- else:
- log.debug(f'page_no not in series_url, request turn_single_page.........')
- except Exception as e:
- log.error(f"Error processing series: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- if __name__ == '__main__':
- ninja_main(logger)
|