# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/8/14 15:55 import inspect import requests from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") country_name = "Malaysia" max_page = 50 def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_box_single_page(log, page_no, brand_id): log.debug(f"Start {inspect.currentframe().f_code.co_name}, page:{page_no}") headers = { "User-Agent": "okhttp/4.10.0", "Accept-Encoding": "gzip", "Content-Type": "application/json", # "x-access-token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiIxIiwiZXhwIjoxNzU1MTkyNDI2LCJ1c2VybmFtZSI6ImNoYXJsZXlfbGVvQDE2My5jb20ifQ.byBS1zj-LyD1mKHrCx9eLy5X2d0QzTO0FwApj2egSVI", "country": "1", "lang": "zh", "platform": "Android", "content-type": "application/json; charset=UTF-8" } url = "https://cp.hoopi.xyz/hoopiserver/hoopi/api/box/listBox" data = { "sortType": "6", # "pageNo": 1, "pageNo": page_no, # "brandId": "1934927046253977602", "brandId": brand_id, "pageSize": 20 } response = requests.post(url, headers=headers, json=data, timeout=22) # print(response.text) response.raise_for_status() if response.status_code == 200: result = response.json() if result["success"]: return result["result"] else: log.warning(f"result_message: {result['message']}") else: log.warning(f" {inspect.currentframe().f_code.co_name} Request failed with status code: {response.status_code}") return None def parse_list_items(log, items, sql_pool, brand_name): log.info(f"{inspect.currentframe().f_code.co_name} Start parsing items") if items: info_list = [] for item in items: item_id = item.get("id") title = item.get("name") releaseTime = item.get("releaseTime") soldCount = item.get("soldCount") sellOffCount = item.get("sellOffCount") price = item.get("price") # 原价 discountedPrice = item.get("discountedPrice") # 优惠价 currency = item.get("currency") # 币种 status = item.get("status") boxImg = item.get("boxImg") data_dict = { "item_id": item_id, "category": brand_name, "title": title, "release_time": releaseTime, "sold_count": soldCount, "sell_off_count": sellOffCount, "price": price, "discounted_price": discountedPrice, "currency": currency, "status": status, "box_img": boxImg, "country_name": country_name } # print('data_dict:', data_dict) info_list.append(data_dict) if info_list: sql_pool.insert_many(table="hoopi_box_record", data_list=info_list, ignore=True) else: log.warning(f" {inspect.currentframe().f_code.co_name} No items found") def get_box_list(log, sql_pool, brand_name, brand_id): page = 1 total_items = 0 # while True: while page <= max_page: result = get_box_single_page(log, page, brand_id) if result is None: break items = result.get("list", []) if not items: log.debug("No items found on page %s", page) break try: parse_list_items(log, items, sql_pool, brand_name) except Exception as e: log.error("Error parsing items on page %s: %s", page, e) total_items += len(items) pages = result.get("pages") total = result.get("total") # 判断条件 1: 根据 pages 判断 if pages is not None and page >= pages: log.debug("已爬取 %s 页,共 %s 页" % (page, pages)) break # 判断条件 2: 根据 list 的长度判断 if len(items) < 20: # pageSize 为 20 log.debug("已获取数据量小于 20,停止爬取......................") break # 判断条件 3: 根据 total 和已获取数据量判断 if total is not None and total_items >= total: log.debug("已获取数据量已满足要求,停止爬取......................") break page += 1 # time.sleep(random.uniform(0.1, 0.5)) # 添加延时,避免频繁请求 # ---------------------------------------------------------------------------------------------------------------------- def parse_detail(log, result, sql_pool, item_id): # log.debug(f"开始解析详情页数据, item_id:{item_id}........................") try: shopId = result.get("shopId") backImg = result.get("backImg") detailsImg = result.get("detailsImg") isRecycling = result.get("isRecycling") # 是否回收 str totalCount = result.get("totalCount") # 总数 int data_dict = { "shop_id": shopId, "back_img": backImg, "details_img": detailsImg, "is_recycling": isRecycling, "total_count": totalCount } # print('data_dict:',data_dict) try: sql_pool.update_one_or_dict(table='hoopi_box_record', data=data_dict, condition={'item_id': item_id}) log.success(f"----------------------- 更新成功, item_id: {item_id} -----------------------") except Exception as e: log.error(f'解析详情页数据 update_one_or_dict 报错:{e}') sql_pool.update_one_or_dict(table="hoopi_box_record", data={"state": 3}, condition={"item_id": item_id}) except Exception as e: log.error(f'解析详情页数据error, {e}') sql_pool.update_one_or_dict(table="hoopi_box_record", data={"state": 3}, condition={"item_id": item_id}) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_detail(log, item_id, sql_pool): log.debug(f"开始获取详情页数据, item_id: {item_id}........................") headers = { "User-Agent": "okhttp/4.10.0", "Accept-Encoding": "gzip", # "x-access-token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiIxIiwiZXhwIjoxNzU1MTkyNDI2LCJ1c2VybmFtZSI6ImNoYXJsZXlfbGVvQDE2My5jb20ifQ.byBS1zj-LyD1mKHrCx9eLy5X2d0QzTO0FwApj2egSVI", "country": "1", "lang": "zh", "platform": "Android", "content-length": "0" } url = f"https://cp.hoopi.xyz/hoopiserver/hoopi/api/box/getBoxInfo/{item_id}" response = requests.post(url, headers=headers, timeout=10) # print(response.text) # time.sleep(11111) response.raise_for_status() data = response.json() if data['code'] == 200: result = data.get("result", {}) parse_detail(log, result, sql_pool, item_id) else: log.error(f"获取详情页数据失败, item_id: {item_id}, msg:{data['message']}") sql_pool.update_one_or_dict(table="hoopi_box_record", data={"state": 3}, condition={"item_id": item_id}) # ---------------------------------------------------------------------------------------------------------------------- # def parse_carousel_list(log, result_json, sql_pool, item_id): # for item in result_json: # result_id = item.get("id") # boxGradeId = item.get("boxGradeId") # gradeType = item.get("gradeType") # title = item.get("name") # coverImg = item.get("coverImg") # priceText = item.get("priceText") # data_dict = { # "item_id": item_id, # "result_id": result_id, # "boxGradeId": boxGradeId, # "gradeType": gradeType, # "title": title, # "coverImg": coverImg, # "priceText": priceText # } # print('data_dict:', data_dict) # # # def get_carousel_list(log, item_id, sql_pool): # """ # 获取轮播图列表 # :param log: # :param item_id: # :param sql_pool: # :return: # """ # headers = { # "User-Agent": "okhttp/4.10.0", # "Accept-Encoding": "gzip", # # "x-access-token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiIxIiwiZXhwIjoxNzU1MTkyNDI2LCJ1c2VybmFtZSI6ImNoYXJsZXlfbGVvQDE2My5jb20ifQ.byBS1zj-LyD1mKHrCx9eLy5X2d0QzTO0FwApj2egSVI", # "country": "1", # "lang": "zh", # "platform": "Android", # "content-length": "0" # } # url = "https://cp.hoopi.xyz/hoopiserver/hoopi/api/box/listBoxGradePrize" # data = { # "boxId": "1953726337084772353" # } # response = requests.post(url, headers=headers, json=data) # print(response.text) # response.raise_for_status() # # if response.status_code == 200: # result = response.json() # if result["success"]: # result_json = result["result"] # parse_carousel_list(log, result_json, sql_pool, item_id) # else: # log.warning(f"result_message: {result['message']}") # else: # log.warning(f" {inspect.currentframe().f_code.co_name} Request failed with status code: {response.status_code}") @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def hoopi_box_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: try: # 获取已售出商品列表 log.debug(f"开始获取已售出商品列表........................") brand_list = { "pokemon": "1934927046253977602", "mix": "1955185420300132353", "one piece": "1934927236365000706" } for brand_name, brand_id in brand_list.items(): log.info(f"开始处理品牌: {brand_name}, ID: {brand_id}") get_box_list(log, sql_pool, brand_name, brand_id) log.success(f"获取已售出商品列表 Finished") # 获取商品详情 log.debug(f"开始获取商品详情........................") sql_ietm_id_list = sql_pool.select_all(f"SELECT item_id FROM hoopi_box_record WHERE state != 1 AND country_name = '{country_name}'") # f"SELECT item_id FROM hoopi_box_record WHERE state != 1 AND category = '{category}'") sql_ietm_id_list = [item_id[0] for item_id in sql_ietm_id_list] for item_id in sql_ietm_id_list: try: get_detail(log, item_id, sql_pool) except Exception as e: log.error(f"Request get_detail error: {e}") log.success(f"获取商品详情 Finished") except Exception as e: log.error(f"Request get_shop_data_list error: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') # EmailSender().send(subject="【千岛 拍卖 - 爬虫通知】今日任务已完成", # content="数据采集和处理已全部完成,请查收结果。\n\n ------ 来自 Python 爬虫系统。") if __name__ == '__main__': # get_box_list(logger, None, None, None) hoopi_box_main(logger)