# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/8/14 14:27 import inspect import requests from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") category = "premier" max_page = 50 country_name = 'Indonesia' headers = { "User-Agent": "okhttp/4.10.0", "Accept-Encoding": "gzip", "Content-Type": "application/json", # "x-access-token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiIxIiwiZXhwIjoxNzU3OTUwODI0LCJ1c2VybmFtZSI6InRpYW56aHUxMDA5QGdtYWlsLmNvbSJ9.PkSn4I2evvlF27OfrxGidT-IwuuTo9nNDukuHSHSs0w", "country": "1875086144853712897", "lang": "zh", "platform": "Android", "content-type": "application/json; charset=UTF-8" } def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_premier_single_page(log, page_no): url = "https://cp.hoopi.xyz/hoopiserver/hoopi/api/goods/listGoods" data = { "isTopStatus": "1", "sortType": "6", "containType": [ "2" ], # "pageNo": 1, "pageNo": page_no, "premierGoods": "1", "pageSize": 15, "isSellOutShow": "1", "status": "2" } response = requests.post(url, headers=headers, json=data, timeout=22) # print(response.text) response.raise_for_status() if response.status_code == 200: result = response.json() if result["success"]: return result["result"] else: log.warning(f"result_message: {result['message']}") else: log.warning(f" {inspect.currentframe().f_code.co_name} Request failed with status code: {response.status_code}") return None def parse_list_items(log, items, sql_pool): log.info(f"{inspect.currentframe().f_code.co_name} Start parsing items") if items: info_list = [] for item in items: item_id = item.get('id') data_dict = { "item_id": item_id, "category": category } info_list.append(data_dict) if info_list: sql_pool.insert_many(table="hoopi_auction_record", data_list=info_list, ignore=True) else: log.warning(f" {inspect.currentframe().f_code.co_name} No items found") def get_premier_list(log, sql_pool): page = 1 total_items = 0 # while True: while page <= max_page: result = get_premier_single_page(log, page) if result is None: break items = result.get("list", []) if not items: log.debug("No items found on page %s", page) break try: parse_list_items(log, items, sql_pool) except Exception as e: log.error("Error parsing items on page %s: %s", page, e) total_items += len(items) pages = result.get("pages") total = result.get("total") # 判断条件 1: 根据 pages 判断 if pages is not None and page >= pages: log.debug("已爬取 %s 页,共 %s 页" % (page, pages)) break # 判断条件 2: 根据 list 的长度判断 if len(items) < 15: # pageSize 为 15 log.debug("已获取数据量小于15,停止爬取......................") break # 判断条件 3: 根据 total 和已获取数据量判断 if total is not None and total_items >= total: log.debug("已获取数据量已满足要求,停止爬取......................") break page += 1 # time.sleep(random.uniform(0.1, 0.5)) # 添加延时,避免频繁请求 # ---------------------------------------------------------------------------------------------------------------------- def parse_detail(log, item, sql_pool, item_id): log.debug("开始解析详情页数据........................") try: title = item.get('name') shopId = item.get('shopId') shopAppUserId = item.get('shopAppUserId') shopName = item.get('shopName') infoImgs = item.get('infoImgs') # 详情图片, 多图, 逗号分隔 cardTypeName = item.get('cardTypeName') # 卡类型 explainIntroduce = item.get('explainIntroduce') # 描述 # 去除表情符号 # if explainIntroduce: # explainIntroduce = emoji.replace_emoji(explainIntroduce, replace='') price = item.get('price') freightPrice = item.get('freightPrice') # 运费 currency = item.get('currency') # 币种 soldCount = item.get('soldCount') # 售出计数 sellOffCount = item.get('sellOffCount') # 抛售计数 status = item.get('status') # 2:售罄 finishTime = item.get('finishTime') conditionTypeName = item.get('conditionTypeName') # 评级/状况 countryName = item.get('countryName') # 国家 shopSoldCount = item.get('shopSoldCount') # 店铺已售 bidCount = item.get('bidCount') # 竞拍次数 data_dict = { 'title': title, "shop_id": shopId, 'shop_name': shopName, 'shop_app_user_id': shopAppUserId, 'info_imgs': infoImgs, 'card_type_name': cardTypeName, 'explain_introduce': explainIntroduce, 'price': price, 'freight_price': freightPrice, 'currency': currency, 'sold_count': soldCount, 'sell_off_count': sellOffCount, 'status': status, 'finish_time': finishTime, 'condition_type_name': conditionTypeName, 'country_name': countryName, 'shop_sold_count': shopSoldCount, 'bid_count': bidCount, 'state': 1 } # print('data_dict:',data_dict) try: sql_pool.update_one_or_dict(table='hoopi_auction_record', data=data_dict, condition={'item_id': item_id}) log.success(f"----------------------- 更新成功, item_id: {item_id} -----------------------") except Exception as e: log.error(f'解析详情页数据 update_one_or_dict 报错:{e}') sql_pool.update_one_or_dict(table="hoopi_auction_record", data={"state": 3}, condition={"item_id": item_id}) except Exception as e: log.error(f'解析详情页数据error, {e}') sql_pool.update_one_or_dict(table="hoopi_auction_record", data={"state": 3}, condition={"item_id": item_id}) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_detail(log, item_id, sql_pool): log.debug(f"开始获取详情页数据, item_id: {item_id}........................") # url = "https://cp.hoopi.xyz/hoopiserver/hoopi/api/goods/getGoodsInfo/1954822331293704194" url = f"https://cp.hoopi.xyz/hoopiserver/hoopi/api/goods/getGoodsInfo/{item_id}" response = requests.post(url, headers=headers, timeout=10) # print(response.text) response.raise_for_status() data = response.json() if data['code'] == 200: result = data.get("result", {}) parse_detail(log, result, sql_pool, item_id) else: log.error(f"获取详情页数据失败, item_id: {item_id}, msg:{data['message']}") sql_pool.update_one_or_dict(table="hoopi_auction_record", data={"state": 3}, condition={"item_id": item_id}) # ------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_bid_list(log, item_id, sql_pool,token): log.debug(f"开始获取竞拍记录数据, item_id: {item_id}........................") headers_bid = { # "x-access-token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiIxIiwiZXhwIjoxNzU3OTUwODI0LCJ1c2VybmFtZSI6InRpYW56aHUxMDA5QGdtYWlsLmNvbSJ9.PkSn4I2evvlF27OfrxGidT-IwuuTo9nNDukuHSHSs0w" "x-access-token": token } copy_headers = headers.copy() copy_headers.update(headers_bid) # print(copy_headers) # url = "https://cp.hoopi.xyz/hoopiserver/hoopi/api/goodsAuction/getBidRecordByGoodsId/1830661503251054593" url = f"https://cp.hoopi.xyz/hoopiserver/hoopi/api/goodsAuction/getBidRecordByGoodsId/{item_id}" response = requests.post(url, headers=copy_headers) # print(response.text) response.raise_for_status() if response.status_code == 200: biddings = response.json()["result"] """ 获取 biddings 信息 """ # biddings = resp_json.get('biddings', []) # print(biddings) # 创建一个字典来存储每个用户的最高出价记录 highest_bids = {} for record in biddings: username = record['appUserName'] bid_price = float(record['bidPrice']) # 将出价转换为浮点数以便比较 # 如果用户不在字典中,或者当前出价高于已存储的最高出价,则更新记录 if username not in highest_bids or bid_price > float(highest_bids[username]['bidPrice']): highest_bids[username] = record bids_list = list(highest_bids.values()) # print(highest_bids) # print(bids_list) biddings_list = [ { 'item_id': item_id, 'bid_id': record['id'], 'user_id': record['appUserId'], 'username': record['appUserName'], 'bid_price': record['bidPrice'], 'bid_time': record['bidTime'], } for record in bids_list ] # print('biddings_list:', biddings_list) if biddings_list: sql_pool.insert_many(table='hoopi_auction_bid_record', data_list=biddings_list) sql_pool.update_one_or_dict(table="hoopi_auction_record", data={"state": 1}, condition={"item_id": item_id}) log.success(f"----------------------- 添加成功, item_id: {item_id} -----------------------") else: log.warning(f"----------------------- 添加失败, item_id: {item_id} -----------------------") sql_pool.update_one_or_dict(table="hoopi_auction_record", data={"state": 2}, condition={"item_id": item_id}) else: log.warning(f" {inspect.currentframe().f_code.co_name} Request failed with status code: {response.status_code}") sql_pool.update_one_or_dict(table="hoopi_auction_record", data={"state": 3}, condition={"item_id": item_id}) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def hoopi_premier_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: try: # 获取已售出商品列表 log.debug(f"开始获取已售出商品列表, category: {category}........................") get_premier_list(log, sql_pool) # 获取商品详情 log.debug(f"开始获取商品详情, category: {category}........................") sql_ietm_id_list = sql_pool.select_all( f"SELECT item_id FROM hoopi_auction_record WHERE state != 1 AND category = '{category}' AND country_name = '{country_name}'") sql_ietm_id_list = [item_id[0] for item_id in sql_ietm_id_list] for item_id in sql_ietm_id_list: try: get_detail(log, item_id, sql_pool) except Exception as e: log.error(f"Request get_detail error: {e}") # 获取商品出价列表 log.debug(f"开始获取商品出价列表, category: {category}........................") # 获取 token token = sql_pool.select_one("SELECT token FROM hoopi_token WHERE state = 1") token = token[0] sql_bid_state_list = sql_pool.select_all( f"SELECT item_id FROM hoopi_auction_record WHERE bid_state != 1 AND category = '{category}' AND country_name = '{country_name}'") sql_bid_state_list = [item_id[0] for item_id in sql_bid_state_list] for item_id in sql_bid_state_list: try: get_bid_list(log, item_id, sql_pool,token) except Exception as e: log.error(f"Request get_bid_list error: {e}") except Exception as e: log.error(f"Request get_shop_data_list error: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') # EmailSender().send(subject="【千岛 拍卖 - 爬虫通知】今日任务已完成", # content="数据采集和处理已全部完成,请查收结果。\n\n ------ 来自 Python 爬虫系统。") if __name__ == '__main__': # get_general_list(logger, None) get_bid_list(logger, '1830661503251054593', None) # hoopi_premier_main(logger)