ninja_add_spider.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/9/8 11:56
  5. import re
  6. import requests
  7. from parsel import Selector
  8. import add_settings
  9. import inspect
  10. from loguru import logger
  11. from tenacity import retry, stop_after_attempt, wait_fixed
  12. from mysql_pool import MySQLConnectionPool
  13. """
  14. 第二个 页面 年份获取 /html/body/div[3]/span[position() >= 4 and position() <= 18]
  15. """
  16. logger.remove()
  17. logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  18. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  19. level="DEBUG", retention="7 day")
  20. def after_log(retry_state):
  21. """
  22. retry 回调
  23. :param retry_state: RetryCallState 对象
  24. """
  25. # 检查 args 是否存在且不为空
  26. if retry_state.args and len(retry_state.args) > 0:
  27. log = retry_state.args[0] # 获取传入的 logger
  28. else:
  29. log = logger # 使用全局 logger
  30. if retry_state.outcome.failed:
  31. log.warning(
  32. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  33. else:
  34. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  35. def turn_page(log, category_name, year, series, series_url, sql_pool):
  36. """
  37. 翻页
  38. :param log: 日志对象
  39. :param category_name: 类别名称
  40. :param year: 年份
  41. :param series: 系列名称
  42. :param series_url: 系列url
  43. :param sql_pool: 数据库连接池
  44. :return:
  45. """
  46. log.info(f"{inspect.currentframe().f_code.co_name} ->->-> series: {series}")
  47. turn_base_url = series_url.split('page_no=')[0] + 'page_no='
  48. page = 1
  49. max_page = 50
  50. while page < max_page + 1:
  51. # url = "https://www.breakninja.com/baseball/2025-Topps-All-Star-Game-Mega-Box-Baseball-Checklist.php?page_no=2"
  52. # 将参数 page_no 的值改为 page 构建新的URL
  53. current_url = turn_base_url + str(page)
  54. log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> current_url: {current_url}')
  55. len_data, selector = turn_single_page(log, category_name, year, series, current_url, sql_pool)
  56. if len_data < 250:
  57. log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> len_data: {len_data}, 少于250条, break!!!')
  58. break
  59. max_page_text = selector.xpath('/html/body/div[3]/div[4]/strong[1]/text()').get()
  60. # if max_page_:
  61. if max_page_text:
  62. # 使用正则表达式提取 "of" 后面的数字
  63. match = re.search(r'of\s+(\d+)', max_page_text)
  64. if match:
  65. total_pages = int(match.group(1))
  66. max_page = (total_pages // 250) + 1 # 计算最大翻页次数
  67. log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> max_page: {max_page}')
  68. page += 1
  69. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  70. def turn_single_page(log, category_name, year, series, current_url, sql_pool):
  71. """
  72. 翻页 单页查询
  73. :param log:
  74. :param category_name:
  75. :param year:
  76. :param series:
  77. :param current_url:
  78. :param sql_pool:
  79. :return:
  80. """
  81. log.info(f"{inspect.currentframe().f_code.co_name} ->->-> series: {series}")
  82. response = requests.get(current_url, headers=add_settings.headers, timeout=60)
  83. # print(response.text)
  84. response.raise_for_status()
  85. selector = Selector(response.text)
  86. tag_table = selector.xpath('//table[@class="table table-striped table-bordered"]') # 如果是这种 需要翻页
  87. log.debug(f'{inspect.currentframe().f_code.co_name} tag_table size: {len(tag_table)}')
  88. title = selector.xpath('//h1/text()').get()
  89. if tag_table:
  90. log.debug(f'{inspect.currentframe().f_code.co_name} ->->-> tag_table size 111: {len(tag_table)}')
  91. # len_data = parse_one_team(log, tag_table, category_name, series, sql_pool)
  92. len_data = parse_turn_single_team(log, tag_table, category_name, year, series, current_url, sql_pool, title)
  93. else:
  94. len_data = 0
  95. return len_data, selector
  96. def parse_turn_single_team(log, tag_table, category_name, year, series, series_url, sql_pool, title):
  97. # 获取表头(第一行)
  98. head = tag_table.xpath('./thead/tr/th/text()').getall()
  99. # 统一表头字段名:将 "#" 替换为 "Number"
  100. head = ['Number' if h.strip() == '#' else h for h in head]
  101. head = ['Player' if h.strip() == 'Name' else h for h in head]
  102. head = ['Print Run' if h.strip() == 'Run' else h for h in head]
  103. # 获取所有行的数据
  104. rows = tag_table.xpath('./tbody/tr')
  105. data = []
  106. for row in rows:
  107. cells = row.xpath('./td/text()').getall()
  108. cells = [cell.strip() for cell in cells if cell.strip()]
  109. if not cells:
  110. cells = [''] * len(head) # 如果没有数据,用空字符串填充
  111. # 将当前行的数据与表头组合成字典
  112. row_data = dict(zip(head, cells))
  113. # 检查是否所有字段都为空,如果都不为空才添加
  114. if any(value.strip() for value in row_data.values()):
  115. data.append(row_data)
  116. # 打印结果
  117. log.info(f'{inspect.currentframe().f_code.co_name} ->->-> len data size: {len(data)}')
  118. # 定义数据库中存在的字段
  119. database_fields = {
  120. 'player', 'number', 'card_set', 'copies', 'team', 'print_run',
  121. 'category_name', 'year', 'series', 'series_url', 'team_url', 'title'
  122. }
  123. info_list = []
  124. for item in data:
  125. item.update(
  126. {
  127. 'category_name': category_name,
  128. 'year': year,
  129. 'series': series,
  130. 'series_url': series_url,
  131. 'title': title
  132. }
  133. )
  134. # print(item)
  135. # 处理字段名:转换为小写并替换特殊字符
  136. processed_item = {}
  137. for key, value in item.items():
  138. # 转换为小写,并将空格和特殊字符替换为下划线
  139. new_key = key.lower().replace(' ', '_').replace('-', '_').replace('/', '_').replace('(', '').replace(
  140. ')', '')
  141. processed_item[new_key] = value
  142. # print(processed_item)
  143. # 创建统一结构的字典,确保所有字段都存在
  144. unified_item = {}
  145. for field in database_fields:
  146. unified_item[field] = processed_item.get(field, None)
  147. info_list.append(unified_item)
  148. # print(info_list)
  149. try:
  150. if info_list:
  151. # sql_pool.insert_one_or_dict(table='breakninja_checklist_record_202511', data=processed_item)
  152. sql_pool.insert_many(table='breakninja_checklist_record_202511', data_list=info_list)
  153. except Exception as e:
  154. log.error(f"insert_many data: {e}")
  155. return len(data)
  156. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  157. def ninja_main(log):
  158. """
  159. 主函数
  160. :param log: logger对象
  161. """
  162. log.info(
  163. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  164. # 配置 MySQL 连接池
  165. sql_pool = MySQLConnectionPool(log=log)
  166. if not sql_pool.check_pool_health():
  167. log.error("数据库连接池异常")
  168. raise RuntimeError("数据库连接池异常")
  169. try:
  170. for ca in add_settings.series_url_list:
  171. category_name = ca.get('category_name')
  172. years = ca.get('years')
  173. for ye in years:
  174. year = ye.get('year')
  175. series = ye.get('series')
  176. for ser in series:
  177. series_name = ser.get('series_name')
  178. series_url = ser.get('series_url')
  179. log.debug(
  180. f'category_name: {category_name}, year: {year}, series_name: {series_name}, series_url: {series_url}')
  181. try:
  182. if '?page_no=' in series_url:
  183. log.debug(f'page_no in series_url, request turn_page.........')
  184. turn_page(log, category_name, year, series_name, series_url, sql_pool)
  185. else:
  186. log.debug(f'page_no not in series_url, request turn_single_page.........')
  187. except Exception as e:
  188. log.error(f"Error processing series: {e}")
  189. except Exception as e:
  190. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  191. finally:
  192. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  193. if __name__ == '__main__':
  194. ninja_main(logger)