| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.8.10
- # Date: 2024-09-30 13:29
- import json
- import random
- import time
- import requests
- from retrying import retry
- from datetime import datetime
- from mysq_pool import MySQLConnectionPool
- # logger.remove()
- # logger.add("sold_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- # level="DEBUG", retention="1 day")
- headers = {
- "appVersion": "1.6.5",
- "osVersion": "9",
- "deviceModel": "M2007J22C",
- "appVersionCode": "168",
- "deviceBrand": "xiaomi",
- "platform": "android",
- "token": "",
- "user-agent": "Mozilla/5.0 (Linux; Android 9; M2007J22C Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/92.0.4515.131 Mobile Safari/537.36",
- "Content-Type": "application/json",
- "Connection": "Keep-Alive"
- }
- base_url = "https://api.weikajia.com"
- def transform_timestamp(timestamp):
- """
- 将 timestamp 格式转换为 %Y-%m-%d %H:%M:%S
- :param timestamp:
- :return: formatted_time
- """
- # 将Unix时间戳转换为datetime对象
- dt_object = datetime.fromtimestamp(int(timestamp))
- # 格式化时间
- formatted_time = dt_object.strftime('%Y-%m-%d %H:%M:%S')
- return formatted_time
- @retry(stop_max_attempt_number=3, wait_fixed=1000)
- def get_action(log, auctionId):
- """
- 获取auction信息
- :param log:
- :param auctionId:
- :return: agentUserInfo
- """
- log.debug(f'正在查询auctionId为: {auctionId}的信息..............')
- url = f"{base_url}/api/v2/auction/detail"
- params = {
- "auctionId": auctionId
- }
- response = requests.get(url, headers=headers, params=params, timeout=5)
- # print(f'get_action: {response.json()}')
- if response.json()["resultCode"] == 200:
- try:
- agentUserInfo = response.json()["data"].get("agentUserInfo")
- agentId = response.json()["data"].get("agentId")
- agentUserInfo["agentId"] = agentId
- return agentUserInfo
- except Exception as e:
- log.error(f"get_action agentUserInfo , error: {e}")
- return {}
- else:
- log.debug("get_action 请求失败,重试中...........")
- # raise Exception("请求失败")
- return {}
- @retry(stop_max_attempt_number=3, wait_fixed=1000)
- def get_cabinet(log, cabinetId):
- """
- 获取cabinet信息
- :param log:
- :param cabinetId:
- :return: cab_dict
- """
- log.debug(f'正在查询cabinetId为: {cabinetId}的信息..............')
- url = f"{base_url}/api/v2/cabinet/detail"
- params = {
- "cabinetId": cabinetId
- }
- response = requests.get(url, headers=headers, params=params, timeout=5)
- # print(f'get_cabinet: {response.json()}')
- if response.json()["resultCode"] == 200:
- data = response.json()["data"]
- cab_dict = {"rmbPrice": data.get("rmbPrice"), "brand": data.get("brand"), "status": data.get("status"),
- "switchSt": data.get("switchSt"), "cardNo": data.get("cardNo"),
- "barcodeId": data.get("barcodeId"), "year": data.get("year"), "grade": data.get("grade"),
- "setName": data.get("setName"), "player": data.get("player"),
- "onSaleExpireTs": data.get("onSaleExpireTs"), "authenticNumber": data.get("authenticNumber")
- }
- return cab_dict
- else:
- log.debug("get_cabinet 请求失败,重试中...........")
- raise Exception("请求失败")
- @retry(stop_max_attempt_number=3, wait_fixed=1000)
- def get_sold_xhr_page(log):
- """
- 获取已售数据页数
- :return: total
- """
- log.info("开始获取总页数....")
- url = f"{base_url}/search/searchAuctionItem"
- data = {
- "page": 1,
- "pageSize": 10,
- "orderStatus": "3",
- "sortType": "auction_end",
- "ascSort": "desc"
- }
- response = requests.post(url, headers=headers, json=data)
- total = response.json()['data']['total']
- if total:
- return total
- else:
- log.error("get_sold_xhr_page, error")
- raise Exception("获取get_sold_xhr_page数据失败")
- def fetch_all_pages(log):
- """
- 查询所有页数的数据
- :return: page_data 每页的数据
- """
- log.info("开始获取所有页数据....")
- total = get_sold_xhr_page(log)
- pages = (total + 9) // 10 # 计算页码
- log.info(f"一共有{total}条已售数据, 总页数: {pages}..................................")
- for page in range(1, pages + 1):
- data = {
- "page": page,
- "pageSize": 10,
- "orderStatus": "3",
- "sortType": "auction_end",
- "ascSort": "desc"
- }
- response = requests.post(f"{base_url}/search/searchAuctionItem", headers=headers, json=data)
- page_data = response.json()['data']['cardCabinet']
- # all_data.extend(page_data)
- yield page_data
- time.sleep(1)
- @retry(stop_max_attempt_number=3, wait_fixed=1000)
- def get_bid(log, aid, page, token):
- """
- 获取竞价相关数据 每个用户的最后一条竞价信息
- :param token:
- :param log:
- :param aid: auctionItemId
- :param page:
- :return: result: recordList
- """
- url = f"{base_url}/api/v2/auction/record"
- params = {
- "auctionItemId": aid,
- "pageNumber": str(page),
- "pageSize": "10"
- }
- log.debug(f'正在获取竞价相关第{page}页的数据..................')
- headers["token"] = token[0]
- response = requests.get(url, headers=headers, params=params)
- # print(f'get_bid: {response.json()}')
- if response.status_code != 200:
- log.error(f"请求失败,状态码: {response.status_code}")
- raise Exception("请求失败")
- time.sleep(1)
- recordList = response.json()['data']['recordList']
- if recordList:
- return recordList
- else:
- log.error(f"get_bid, error")
- raise Exception("获取get_bid数据失败")
- def get_bid_list(log, aid, bidIndex, token):
- """
- 获取竞价相关数据 每个用户的最后一条竞价信息
- :param token:
- :param log:
- :param aid: auctionItemId
- :param bidIndex: 竞价总条数
- :return: result: JSON列表格式
- """
- # if bidIndex <= 0:
- # bidIndex = 1
- log.info(f"开始获取第{aid}的get_bid_list数据, 一共{bidIndex}条")
- pages = (bidIndex + 9) // 10 # 计算页码
- resp_list = []
- # for page in range(1, int(pages) + 1):
- # recordList = get_bid(log, aid, page, token)
- # resp_list.extend(recordList)
- for page in range(1, int(pages) + 1):
- try:
- recordList = get_bid(log, aid, page, token)
- resp_list.extend(recordList)
- except Exception as e:
- log.error(f"recordList get_bid error: {e}")
- break
- # 创建一个字典来存储每个用户的最新竞价信息
- latest_bids = {}
- for bid in resp_list:
- nick_name = bid['nickName']
- if nick_name not in latest_bids or bid['bidTime'] > latest_bids[nick_name]['bidTime']:
- latest_bids[nick_name] = {'nickName': nick_name, 'bidPrice': bid['bidPrice'], 'bidTime': bid['bidTime']}
- result = [
- {'nickName': i['nickName'], 'bidPrice': i['bidPrice'], 'bidTime': transform_timestamp(i['bidTime'])}
- for i in latest_bids.values()
- ]
- return result
- def save_data(sql_pool, info):
- """
- 保存数据
- :param sql_pool:
- :param info:
- :return:
- """
- sql = """INSERT INTO weikajia_sold(cabinetId, auctionItemId, imgs, title, bidIndex, price, lastBidPrice, auctionStart, auctionEnd, bid_list, nickName, following, certifyStatus, ipRegion, credit, agentLevel, agentId, rmbPrice, brand, status, switchSt, cardNo, barcodeId, year, grade, setName, player, onSaleExpireTs, authenticNumber) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
- sql_pool.insert_one(sql, info)
- @retry(stop_max_attempt_number=3, wait_fixed=1000)
- def run(sql_pool, log, token):
- """
- 主运行函数
- """
- try:
- log.info("开始运行 sold_spider 爬虫任务............................................................")
- # token = sql_pool.select_one("select token from wkj_token")
- # headers["token"] = token[0]
- sql_cabinetId_list = sql_pool.select_all("select cabinetId from weikajia_sold")
- cabinetId_list = [i[0] for i in sql_cabinetId_list]
- # print(cabinetId_list)
- for page_data in fetch_all_pages(log):
- # info_list = []
- for auction in page_data:
- aid = auction.get('auctionItemId')
- cabinetId = auction.get("cabinetId")
- # 判断cabid是否在库中
- if cabinetId in cabinetId_list:
- log.info(
- f"{cabinetId}已存在,跳过............................................................")
- continue
- else:
- cabinetId_list.append(cabinetId)
- imgs = auction.get("imgs")
- title = auction.get("title")
- bidIndex = auction.get("currBidIndex")
- price = auction.get("price")
- lastBidPrice = auction.get("lastBidPrice")
- auctionStart_ = auction.get("auctionStart")
- auctionStart = transform_timestamp(auctionStart_)
- auctionEnd_ = auction.get("auctionEnd")
- auctionEnd = transform_timestamp(auctionEnd_)
- bid_list = get_bid_list(log, aid, bidIndex, token)
- # 获取详情页数据
- act_dict = get_action(log, aid)
- # time.sleep(random.randint(5, 10))
- cab_dict = get_cabinet(log, cabinetId)
- # follows = act_dict.get("follows")
- if not bid_list:
- bid_list = []
- info = (
- cabinetId, aid, imgs, title, bidIndex, price, lastBidPrice, auctionStart, auctionEnd,
- json.dumps(bid_list, ensure_ascii=False), act_dict.get("nickName"),
- act_dict.get("following"), act_dict.get("certifyStatus"), act_dict.get("ipRegion"),
- act_dict.get("credit"), act_dict.get("agentLevel"), act_dict.get("agentId"),
- cab_dict.get("rmbPrice"), cab_dict.get("brand"), cab_dict.get("status"),
- cab_dict.get("switchSt"), cab_dict.get("cardNo"), cab_dict.get("barcodeId"),
- cab_dict.get("year"), cab_dict.get("grade"), cab_dict.get("setName"),
- cab_dict.get("player"), cab_dict.get("onSaleExpireTs"), cab_dict.get("authenticNumber")
- )
- # info_list.append(info)
- # 保存每页的数据
- # logger.info(info)
- save_data(sql_pool, info)
- time.sleep(random.randint(1, 3))
- cabinetId_list.clear()
- except Exception as e:
- log.error(f'Error: {e}')
- finally:
- log.info("爬虫程序运行结束,等待下一轮的采集任务.............")
- @retry(stop_max_attempt_number=100, wait_fixed=3600000)
- def sold_main(log):
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool:
- log.error("数据库连接失败")
- raise Exception("数据库连接失败")
- token = sql_pool.select_one("select token from wkj_token")
- run(sql_pool, log, token)
- if __name__ == '__main__':
- from loguru import logger
- sold_main(log=logger)
|