# -*- coding: utf-8 -*- # Author : Charley # Python : 3.8.10 # Date: 2024-09-30 13:29 import json import random import time import requests from retrying import retry from datetime import datetime from mysq_pool import MySQLConnectionPool # logger.remove() # logger.add("sold_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", # level="DEBUG", retention="1 day") headers = { "appVersion": "1.6.5", "osVersion": "9", "deviceModel": "M2007J22C", "appVersionCode": "168", "deviceBrand": "xiaomi", "platform": "android", "token": "", "user-agent": "Mozilla/5.0 (Linux; Android 9; M2007J22C Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/92.0.4515.131 Mobile Safari/537.36", "Content-Type": "application/json", "Connection": "Keep-Alive" } base_url = "https://api.weikajia.com" def transform_timestamp(timestamp): """ 将 timestamp 格式转换为 %Y-%m-%d %H:%M:%S :param timestamp: :return: formatted_time """ # 将Unix时间戳转换为datetime对象 dt_object = datetime.fromtimestamp(int(timestamp)) # 格式化时间 formatted_time = dt_object.strftime('%Y-%m-%d %H:%M:%S') return formatted_time @retry(stop_max_attempt_number=3, wait_fixed=1000) def get_action(log, auctionId): """ 获取auction信息 :param log: :param auctionId: :return: agentUserInfo """ log.debug(f'正在查询auctionId为: {auctionId}的信息..............') url = f"{base_url}/api/v2/auction/detail" params = { "auctionId": auctionId } response = requests.get(url, headers=headers, params=params, timeout=5) # print(f'get_action: {response.json()}') if response.json()["resultCode"] == 200: try: agentUserInfo = response.json()["data"].get("agentUserInfo") agentId = response.json()["data"].get("agentId") agentUserInfo["agentId"] = agentId return agentUserInfo except Exception as e: log.error(f"get_action agentUserInfo , error: {e}") return {} else: log.debug("get_action 请求失败,重试中...........") # raise Exception("请求失败") return {} @retry(stop_max_attempt_number=3, wait_fixed=1000) def get_cabinet(log, cabinetId): """ 获取cabinet信息 :param log: :param cabinetId: :return: cab_dict """ log.debug(f'正在查询cabinetId为: {cabinetId}的信息..............') url = f"{base_url}/api/v2/cabinet/detail" params = { "cabinetId": cabinetId } response = requests.get(url, headers=headers, params=params, timeout=5) # print(f'get_cabinet: {response.json()}') if response.json()["resultCode"] == 200: data = response.json()["data"] cab_dict = {"rmbPrice": data.get("rmbPrice"), "brand": data.get("brand"), "status": data.get("status"), "switchSt": data.get("switchSt"), "cardNo": data.get("cardNo"), "barcodeId": data.get("barcodeId"), "year": data.get("year"), "grade": data.get("grade"), "setName": data.get("setName"), "player": data.get("player"), "onSaleExpireTs": data.get("onSaleExpireTs"), "authenticNumber": data.get("authenticNumber") } return cab_dict else: log.debug("get_cabinet 请求失败,重试中...........") raise Exception("请求失败") @retry(stop_max_attempt_number=3, wait_fixed=1000) def get_sold_xhr_page(log): """ 获取已售数据页数 :return: total """ log.info("开始获取总页数....") url = f"{base_url}/search/searchAuctionItem" data = { "page": 1, "pageSize": 10, "orderStatus": "3", "sortType": "auction_end", "ascSort": "desc" } response = requests.post(url, headers=headers, json=data) total = response.json()['data']['total'] if total: return total else: log.error("get_sold_xhr_page, error") raise Exception("获取get_sold_xhr_page数据失败") def fetch_all_pages(log): """ 查询所有页数的数据 :return: page_data 每页的数据 """ log.info("开始获取所有页数据....") total = get_sold_xhr_page(log) pages = (total + 9) // 10 # 计算页码 log.info(f"一共有{total}条已售数据, 总页数: {pages}..................................") for page in range(1, pages + 1): data = { "page": page, "pageSize": 10, "orderStatus": "3", "sortType": "auction_end", "ascSort": "desc" } response = requests.post(f"{base_url}/search/searchAuctionItem", headers=headers, json=data) page_data = response.json()['data']['cardCabinet'] # all_data.extend(page_data) yield page_data time.sleep(1) @retry(stop_max_attempt_number=3, wait_fixed=1000) def get_bid(log, aid, page, token): """ 获取竞价相关数据 每个用户的最后一条竞价信息 :param token: :param log: :param aid: auctionItemId :param page: :return: result: recordList """ url = f"{base_url}/api/v2/auction/record" params = { "auctionItemId": aid, "pageNumber": str(page), "pageSize": "10" } log.debug(f'正在获取竞价相关第{page}页的数据..................') headers["token"] = token[0] response = requests.get(url, headers=headers, params=params) # print(f'get_bid: {response.json()}') if response.status_code != 200: log.error(f"请求失败,状态码: {response.status_code}") raise Exception("请求失败") time.sleep(1) recordList = response.json()['data']['recordList'] if recordList: return recordList else: log.error(f"get_bid, error") raise Exception("获取get_bid数据失败") def get_bid_list(log, aid, bidIndex, token): """ 获取竞价相关数据 每个用户的最后一条竞价信息 :param token: :param log: :param aid: auctionItemId :param bidIndex: 竞价总条数 :return: result: JSON列表格式 """ # if bidIndex <= 0: # bidIndex = 1 log.info(f"开始获取第{aid}的get_bid_list数据, 一共{bidIndex}条") pages = (bidIndex + 9) // 10 # 计算页码 resp_list = [] # for page in range(1, int(pages) + 1): # recordList = get_bid(log, aid, page, token) # resp_list.extend(recordList) for page in range(1, int(pages) + 1): try: recordList = get_bid(log, aid, page, token) resp_list.extend(recordList) except Exception as e: log.error(f"recordList get_bid error: {e}") break # 创建一个字典来存储每个用户的最新竞价信息 latest_bids = {} for bid in resp_list: nick_name = bid['nickName'] if nick_name not in latest_bids or bid['bidTime'] > latest_bids[nick_name]['bidTime']: latest_bids[nick_name] = {'nickName': nick_name, 'bidPrice': bid['bidPrice'], 'bidTime': bid['bidTime']} result = [ {'nickName': i['nickName'], 'bidPrice': i['bidPrice'], 'bidTime': transform_timestamp(i['bidTime'])} for i in latest_bids.values() ] return result def save_data(sql_pool, info): """ 保存数据 :param sql_pool: :param info: :return: """ sql = """INSERT INTO weikajia_sold(cabinetId, auctionItemId, imgs, title, bidIndex, price, lastBidPrice, auctionStart, auctionEnd, bid_list, nickName, following, certifyStatus, ipRegion, credit, agentLevel, agentId, rmbPrice, brand, status, switchSt, cardNo, barcodeId, year, grade, setName, player, onSaleExpireTs, authenticNumber) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""" sql_pool.insert_one(sql, info) @retry(stop_max_attempt_number=3, wait_fixed=1000) def run(sql_pool, log, token): """ 主运行函数 """ try: log.info("开始运行 sold_spider 爬虫任务............................................................") # token = sql_pool.select_one("select token from wkj_token") # headers["token"] = token[0] sql_cabinetId_list = sql_pool.select_all("select cabinetId from weikajia_sold") cabinetId_list = [i[0] for i in sql_cabinetId_list] # print(cabinetId_list) for page_data in fetch_all_pages(log): # info_list = [] for auction in page_data: aid = auction.get('auctionItemId') cabinetId = auction.get("cabinetId") # 判断cabid是否在库中 if cabinetId in cabinetId_list: log.info( f"{cabinetId}已存在,跳过............................................................") continue else: cabinetId_list.append(cabinetId) imgs = auction.get("imgs") title = auction.get("title") bidIndex = auction.get("currBidIndex") price = auction.get("price") lastBidPrice = auction.get("lastBidPrice") auctionStart_ = auction.get("auctionStart") auctionStart = transform_timestamp(auctionStart_) auctionEnd_ = auction.get("auctionEnd") auctionEnd = transform_timestamp(auctionEnd_) bid_list = get_bid_list(log, aid, bidIndex, token) # 获取详情页数据 act_dict = get_action(log, aid) # time.sleep(random.randint(5, 10)) cab_dict = get_cabinet(log, cabinetId) # follows = act_dict.get("follows") if not bid_list: bid_list = [] info = ( cabinetId, aid, imgs, title, bidIndex, price, lastBidPrice, auctionStart, auctionEnd, json.dumps(bid_list, ensure_ascii=False), act_dict.get("nickName"), act_dict.get("following"), act_dict.get("certifyStatus"), act_dict.get("ipRegion"), act_dict.get("credit"), act_dict.get("agentLevel"), act_dict.get("agentId"), cab_dict.get("rmbPrice"), cab_dict.get("brand"), cab_dict.get("status"), cab_dict.get("switchSt"), cab_dict.get("cardNo"), cab_dict.get("barcodeId"), cab_dict.get("year"), cab_dict.get("grade"), cab_dict.get("setName"), cab_dict.get("player"), cab_dict.get("onSaleExpireTs"), cab_dict.get("authenticNumber") ) # info_list.append(info) # 保存每页的数据 # logger.info(info) save_data(sql_pool, info) time.sleep(random.randint(1, 3)) cabinetId_list.clear() except Exception as e: log.error(f'Error: {e}') finally: log.info("爬虫程序运行结束,等待下一轮的采集任务.............") @retry(stop_max_attempt_number=100, wait_fixed=3600000) def sold_main(log): sql_pool = MySQLConnectionPool(log=log) if not sql_pool: log.error("数据库连接失败") raise Exception("数据库连接失败") token = sql_pool.select_one("select token from wkj_token") run(sql_pool, log, token) if __name__ == '__main__': from loguru import logger sold_main(log=logger)