# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/8/12 16:59 import random import time import inspect import requests import schedule from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed # logger.remove() # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", # level="DEBUG", retention="7 day") category = "潮玩" max_page = 50 def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") def get_single_page(log, page_no): headers = { "User-Agent": "okhttp/4.10.0", "Accept-Encoding": "gzip", "Content-Type": "application/json", # "x-access-token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiIxIiwiZXhwIjoxNzU1MTkyNDI2LCJ1c2VybmFtZSI6ImNoYXJsZXlfbGVvQDE2My5jb20ifQ.byBS1zj-LyD1mKHrCx9eLy5X2d0QzTO0FwApj2egSVI", "country": "1", "lang": "zh", "platform": "Android", "content-type": "application/json; charset=UTF-8" } url = "https://cp.hoopi.xyz/hoopiserver/hoopi/api/toy/listToys" data = { "sortType": "2", "containType": [ "0" ], "pageNo": page_no, "pageSize": 10 } response = requests.post(url, headers=headers, json=data) print(response.text) response.raise_for_status() if response.status_code == 200: result = response.json() if result["success"]: return result["result"] else: log.warning(f"result_message: {result['message']}") else: log.warning(f" {inspect.currentframe().f_code.co_name} Request failed with status code: {response.status_code}") return None def parse_list_items(log, items, sql_pool): log.info(f"{inspect.currentframe().f_code.co_name} Start parsing items") if items: info_list = [] for item in items: item_id = item.get('id') info_list.append(item_id) if info_list: sql_pool.insert_many(table="hoopi_mall_record", data_list=info_list, ignore=True) else: log.warning(f" {inspect.currentframe().f_code.co_name} No items found") def get_mall_sold_list(log, sql_pool): page = 1 total_items = 0 # while True: while page <= max_page: result = get_single_page(log, page) if result is None: break items = result.get("list", []) if not items: log.warning("No items found on page %s", page) break try: parse_list_items(log, items, sql_pool) except Exception as e: log.error("Error parsing items on page %s: %s", page, e) total_items += len(items) pages = result.get("pages") total = result.get("total") # 判断条件 1: 根据 pages 判断 if pages is not None and page >= pages: log.debug("已爬取 %s 页,共 %s 页" % (page, pages)) break # 判断条件 2: 根据 list 的长度判断 if len(items) < 10: # pageSize 为 10 log.debug("已获取数据量小于15,停止爬取......................") break # 判断条件 3: 根据 total 和已获取数据量判断 if total is not None and total_items >= total: log.debug("已获取数据量已满足要求,停止爬取......................") break page += 1 # time.sleep(random.uniform(0.1, 0.5)) # 添加延时,避免频繁请求 def parse_detail(log, item, sql_pool, item_id): log.debug("开始解析详情页数据........................") try: title = item.get('name') shopId = item.get('shopId') shopAppUserId = item.get('shopAppUserId') shopName = item.get('shopName') infoImgs = item.get('infoImgs') # 详情图片, 多图, 逗号分隔 cardTypeName = item.get('cardTypeName') # 卡类型 explainIntroduce = item.get('explainIntroduce') # 描述 price = item.get('price') freightPrice = item.get('freightPrice') # 运费 currency = item.get('currency') # 币种 soldCount = item.get('soldCount') # 售出计数 sellOffCount = item.get('sellOffCount') # 抛售计数 status = item.get('status') # 2:售罄 finishTime = item.get('finishTime') conditionTypeName = item.get('conditionTypeName') # 评级/状况 countryName = item.get('countryName') # 国家 shopSoldCount = item.get('shopSoldCount') # 店铺已售 data_dict = { 'title': title, "shop_id": shopId, 'shop_name': shopName, 'shop_app_user_id': shopAppUserId, 'info_imgs': infoImgs, 'card_type_name': cardTypeName, 'explain_introduce': explainIntroduce, 'price': price, 'freight_price': freightPrice, 'currency': currency, 'sold_count': soldCount, 'sell_off_count': sellOffCount, 'status': status, 'finish_time': finishTime, 'condition_type_name': conditionTypeName, 'country_name': countryName, 'shop_sold_count': shopSoldCount } print(data_dict) # try: # sql_pool.update_one_or_dict(table='hoopi_mall_record', data=data_dict, condition={'item_id': item_id}) # except Exception as e: # log.error(f'解析详情页数据 update_one_or_dict 报错:{e[:500]}') except Exception as e: log.error(f'解析详情页数据error, {e[:500]}') def get_detail(log, item_id, sql_pool): headers = { "User-Agent": "okhttp/4.10.0", "Accept-Encoding": "gzip", # "x-access-token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiIxIiwiZXhwIjoxNzU1MTkyNDI2LCJ1c2VybmFtZSI6ImNoYXJsZXlfbGVvQDE2My5jb20ifQ.byBS1zj-LyD1mKHrCx9eLy5X2d0QzTO0FwApj2egSVI", "country": "1", "lang": "zh", "platform": "Android", "content-length": "0" } url = "https://cp.hoopi.xyz/hoopiserver/hoopi/api/goods/getGoodsInfo/1954822331293704194" # url = f"https://cp.hoopi.xyz/hoopiserver/hoopi/api/goods/getGoodsInfo/{item_id}" response = requests.post(url, headers=headers) print(response.text) response.raise_for_status() data = response.json() result = data.get("result", {}) parse_detail(log, result, sql_pool, item_id) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def hoopi_mall_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: try: pass except Exception as e: log.error(f"Request get_shop_data_list error: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') # EmailSender().send(subject="【千岛 拍卖 - 爬虫通知】今日任务已完成", # content="数据采集和处理已全部完成,请查收结果。\n\n ------ 来自 Python 爬虫系统。") def schedule_task(): """ 爬虫模块 定时任务 的启动文件 """ # 立即运行一次任务 # hoopi_mall_main(log=logger) # 设置定时任务 schedule.every().day.at("01:06").do(hoopi_mall_main, log=logger) while True: schedule.run_pending() time.sleep(1) if __name__ == '__main__': # get_mall_sold_list(logger, None) get_detail(logger, "1954822331293704194", None)