# -*- coding: utf-8 -*- # Author : Charley # Python : 3.8.10 # Date: 2024-09-30 13:29 import json import random import time import requests from datetime import datetime from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed # logger.remove() # logger.add("sold_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", # level="DEBUG", retention="1 day") headers = { "appVersion": "2.0.4", "osVersion": "11", "deviceModel": "Pixel 5", "appVersionCode": "196", "deviceBrand": "google", "platform": "android", "token": "", "User-Agent": "Mozilla/5.0 (Linux; Android 11; Pixel 5 Build/RQ3A.211001.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/83.0.4103.106 Mobile Safari/537.36 uni-app Html5Plus/1.0 (Immersed/52.727272)", "Content-Type": "application/json", "Connection": "Keep-Alive" } base_url = "https://api.weikajia.com" def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e def transform_timestamp(timestamp): """ 将 timestamp 格式转换为 %Y-%m-%d %H:%M:%S :param timestamp: :return: formatted_time """ # 将Unix时间戳转换为datetime对象 dt_object = datetime.fromtimestamp(int(timestamp)) # 格式化时间 formatted_time = dt_object.strftime('%Y-%m-%d %H:%M:%S') return formatted_time @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_action(log, auctionId, token): """ 获取 auction 信息 :param log: :param auctionId: :param token: :return: agentUserInfo """ log.debug(f'正在查询auctionId为: {auctionId}的信息..............') url = f"{base_url}/api/v2/auction/detail" params = { "auctionId": auctionId } headers["token"] = token[0] response = requests.get(url, headers=headers, params=params, timeout=5) # print(f'get_action: {response.json()}') if response.json()["resultCode"] == 200: try: agentUserInfo = response.json()["data"].get("agentUserInfo") if agentUserInfo: agentId = response.json()["data"].get("agentId") agentUserInfo["agentId"] = agentId return agentUserInfo else: log.warning("get_action agentUserInfo 为空, 跳过...........") return {} except Exception as e: log.error(f"get_action agentUserInfo , error: {e}") return {} else: log.debug("get_action 请求失败,重试中...........") # raise Exception("请求失败") return {} @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_cabinet(log, cabinetId, token): """ 获取 cabinet 信息 :param log: :param cabinetId: :param token: :return: cab_dict """ log.debug(f'正在查询cabinetId为: {cabinetId}的信息..............') url = f"{base_url}/api/v2/cabinet/detail" params = { "cabinetId": cabinetId } headers["token"] = token[0] response = requests.get(url, headers=headers, params=params, timeout=5) # print(f'get_cabinet: {response.json()}') if response.json()["resultCode"] == 200: data = response.json()["data"] cab_dict = {"rmbPrice": data.get("rmbPrice"), "brand": data.get("brand"), "status": data.get("status"), "switchSt": data.get("switchSt"), "cardNo": data.get("cardNo"), "barcodeId": data.get("barcodeId"), "year": data.get("year"), "grade": data.get("grade"), "setName": data.get("setName"), "player": data.get("player"), "onSaleExpireTs": data.get("onSaleExpireTs"), "authenticNumber": data.get("authenticNumber") } return cab_dict else: log.debug("get_cabinet 请求失败,重试中...........") raise Exception("请求失败") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_sold_xhr_page(log): """ 获取已售数据页数 :return: total """ log.info("开始获取总页数....") url = f"{base_url}/search/searchAuctionItem" data = { "page": 1, "pageSize": 10, "orderStatus": "3", "sortType": "auction_end", "ascSort": "desc" } response = requests.post(url, headers=headers, json=data) total = response.json()['data']['total'] if total: return total else: log.error("get_sold_xhr_page, error") raise Exception("获取get_sold_xhr_page数据失败") def fetch_all_pages(log): """ 查询所有页数的数据 :return: page_data 每页的数据 """ log.info("开始获取所有页数据....") total = get_sold_xhr_page(log) pages = (total + 9) // 10 # 计算页码 log.info(f"一共有{total}条已售数据, 总页数: {pages}..................................") for page in range(1, pages + 1): data = { "page": page, "pageSize": 10, "orderStatus": "3", "sortType": "auction_end", "ascSort": "desc" } response = requests.post(f"{base_url}/search/searchAuctionItem", headers=headers, json=data) page_data = response.json()['data']['cardCabinet'] # all_data.extend(page_data) yield page_data # time.sleep(1) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_bid(log, aid, page, token): """ 获取竞价相关数据 每个用户的最后一条竞价信息 :param token: :param log: :param aid: auctionItemId :param page: :return: result: recordList """ url = f"{base_url}/api/v2/auction/record" params = { "auctionItemId": aid, "pageNumber": str(page), "pageSize": "10" } log.debug(f'正在获取竞价相关第{page}页的数据..................') headers["token"] = token[0] response = requests.get(url, headers=headers, params=params) # print(f'get_bid: {response.json()}') if response.status_code != 200: log.error(f"请求失败,状态码: {response.status_code}") raise Exception("请求失败") # time.sleep(1) recordList = response.json()['data']['recordList'] if recordList: return recordList else: log.error(f"get_bid, error") raise Exception("获取get_bid数据失败") def get_bid_list(log, aid, bidIndex, token): """ 获取竞价相关数据 每个用户的最后一条竞价信息 :param token: :param log: :param aid: auctionItemId :param bidIndex: 竞价总条数 :return: result: JSON列表格式 """ # if bidIndex <= 0: # bidIndex = 1 log.info(f"开始获取第{aid}的get_bid_list数据, 一共{bidIndex}条") pages = (bidIndex + 9) // 10 # 计算页码 resp_list = [] # for page in range(1, int(pages) + 1): # recordList = get_bid(log, aid, page, token) # resp_list.extend(recordList) for page in range(1, int(pages) + 1): try: recordList = get_bid(log, aid, page, token) resp_list.extend(recordList) except Exception as e: log.error(f"recordList get_bid error: {e}") break # 创建一个字典来存储每个用户的最新竞价信息 latest_bids = {} for bid in resp_list: nick_name = bid['nickName'] if nick_name not in latest_bids or bid['bidTime'] > latest_bids[nick_name]['bidTime']: latest_bids[nick_name] = {'nickName': nick_name, 'bidPrice': bid['bidPrice'], 'bidTime': bid['bidTime']} result = [ {'nickName': i['nickName'], 'bidPrice': i['bidPrice'], 'bidTime': transform_timestamp(i['bidTime'])} for i in latest_bids.values() ] return result @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def run(log, sql_pool, token): """ 主运行函数 """ try: log.info("开始运行 sold_spider 爬虫任务............................................................") for page_data in fetch_all_pages(log): info_list = [] for auction in page_data: aid = auction.get('auctionItemId') cabinetId = auction.get("cabinetId") imgs = auction.get("imgs") title = auction.get("title") bidIndex = auction.get("currBidIndex") price = auction.get("price") lastBidPrice = auction.get("lastBidPrice") auctionStart_ = auction.get("auctionStart") auctionStart = transform_timestamp(auctionStart_) auctionEnd_ = auction.get("auctionEnd") auctionEnd = transform_timestamp(auctionEnd_) bid_list = get_bid_list(log, aid, bidIndex, token) # 获取详情页数据 act_dict = get_action(log, aid, token) # time.sleep(random.randint(5, 10)) cab_dict = get_cabinet(log, cabinetId, token) # follows = act_dict.get("follows") if not bid_list: bid_list = [] info_dict = { "cabinetId": cabinetId, "auctionItemId": aid, "imgs": imgs, "title": title, "bidIndex": bidIndex, "price": price, "lastBidPrice": lastBidPrice, "auctionStart": auctionStart, "auctionEnd": auctionEnd, "bid_list": json.dumps(bid_list, ensure_ascii=False), "nickName": act_dict.get("nickName"), "following": act_dict.get("following"), "certifyStatus": act_dict.get("certifyStatus"), "ipRegion": act_dict.get("ipRegion"), "credit": act_dict.get("credit"), "agentLevel": act_dict.get("agentLevel"), "agentId": act_dict.get("agentId"), "rmbPrice": cab_dict.get("rmbPrice"), "brand": cab_dict.get("brand"), "status": cab_dict.get("status"), "switchSt": cab_dict.get("switchSt"), "cardNo": cab_dict.get("cardNo"), "barcodeId": cab_dict.get("barcodeId"), "year": cab_dict.get("year"), "grade": cab_dict.get("grade"), "setName": cab_dict.get("setName"), "player": cab_dict.get("player"), "onSaleExpireTs": cab_dict.get("onSaleExpireTs"), "authenticNumber": cab_dict.get("authenticNumber") } info_list.append(info_dict) # 保存数据 sql_pool.insert_many(table="weikajia_sold", data_list=info_list, ignore=True) # time.sleep(random.randint(1, 3)) except Exception as e: log.error(f'Error: {e}') finally: log.info("爬虫程序运行结束,等待下一轮的采集任务.............") @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def sold_main(log): # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") token = sql_pool.select_one("select token from wkj_token") run(log, sql_pool, token) if __name__ == '__main__': from loguru import logger sold_main(log=logger)