| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.8.10
- # Date: 2024-09-30 13:29
- import json
- import random
- import time
- import requests
- from datetime import datetime
- from mysql_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- # logger.remove()
- # logger.add("sold_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- # level="DEBUG", retention="1 day")
- headers = {
- "appVersion": "2.0.4",
- "osVersion": "11",
- "deviceModel": "Pixel 5",
- "appVersionCode": "196",
- "deviceBrand": "google",
- "platform": "android",
- "token": "",
- "User-Agent": "Mozilla/5.0 (Linux; Android 11; Pixel 5 Build/RQ3A.211001.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/83.0.4103.106 Mobile Safari/537.36 uni-app Html5Plus/1.0 (Immersed/52.727272)",
- "Content-Type": "application/json",
- "Connection": "Keep-Alive"
- }
- base_url = "https://api.weikajia.com"
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_proxys(log):
- """
- 获取代理
- :return: 代理
- """
- tunnel = "x371.kdltps.com:15818"
- kdl_username = "t13753103189895"
- kdl_password = "o0yefv6z"
- try:
- proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
- }
- return proxies
- except Exception as e:
- log.error(f"Error getting proxy: {e}")
- raise e
- def transform_timestamp(timestamp):
- """
- 将 timestamp 格式转换为 %Y-%m-%d %H:%M:%S
- :param timestamp:
- :return: formatted_time
- """
- # 将Unix时间戳转换为datetime对象
- dt_object = datetime.fromtimestamp(int(timestamp))
- # 格式化时间
- formatted_time = dt_object.strftime('%Y-%m-%d %H:%M:%S')
- return formatted_time
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_action(log, auctionId, token):
- """
- 获取 auction 信息
- :param log:
- :param auctionId:
- :param token:
- :return: agentUserInfo
- """
- log.debug(f'正在查询auctionId为: {auctionId}的信息..............')
- url = f"{base_url}/api/v2/auction/detail"
- params = {
- "auctionId": auctionId
- }
- headers["token"] = token[0]
- response = requests.get(url, headers=headers, params=params, timeout=5)
- # print(f'get_action: {response.json()}')
- if response.json()["resultCode"] == 200:
- try:
- agentUserInfo = response.json()["data"].get("agentUserInfo")
- if agentUserInfo:
- agentId = response.json()["data"].get("agentId")
- agentUserInfo["agentId"] = agentId
- return agentUserInfo
- else:
- log.warning("get_action agentUserInfo 为空, 跳过...........")
- return {}
- except Exception as e:
- log.error(f"get_action agentUserInfo , error: {e}")
- return {}
- else:
- log.debug("get_action 请求失败,重试中...........")
- # raise Exception("请求失败")
- return {}
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_cabinet(log, cabinetId, token):
- """
- 获取 cabinet 信息
- :param log:
- :param cabinetId:
- :param token:
- :return: cab_dict
- """
- log.debug(f'正在查询cabinetId为: {cabinetId}的信息..............')
- url = f"{base_url}/api/v2/cabinet/detail"
- params = {
- "cabinetId": cabinetId
- }
- headers["token"] = token[0]
- response = requests.get(url, headers=headers, params=params, timeout=5)
- # print(f'get_cabinet: {response.json()}')
- if response.json()["resultCode"] == 200:
- data = response.json()["data"]
- cab_dict = {"rmbPrice": data.get("rmbPrice"), "brand": data.get("brand"), "status": data.get("status"),
- "switchSt": data.get("switchSt"), "cardNo": data.get("cardNo"),
- "barcodeId": data.get("barcodeId"), "year": data.get("year"), "grade": data.get("grade"),
- "setName": data.get("setName"), "player": data.get("player"),
- "onSaleExpireTs": data.get("onSaleExpireTs"), "authenticNumber": data.get("authenticNumber")
- }
- return cab_dict
- else:
- log.debug("get_cabinet 请求失败,重试中...........")
- raise Exception("请求失败")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_sold_xhr_page(log):
- """
- 获取已售数据页数
- :return: total
- """
- log.info("开始获取总页数....")
- url = f"{base_url}/search/searchAuctionItem"
- data = {
- "page": 1,
- "pageSize": 10,
- "orderStatus": "3",
- "sortType": "auction_end",
- "ascSort": "desc"
- }
- response = requests.post(url, headers=headers, json=data)
- total = response.json()['data']['total']
- if total:
- return total
- else:
- log.error("get_sold_xhr_page, error")
- raise Exception("获取get_sold_xhr_page数据失败")
- def fetch_all_pages(log):
- """
- 查询所有页数的数据
- :return: page_data 每页的数据
- """
- log.info("开始获取所有页数据....")
- total = get_sold_xhr_page(log)
- pages = (total + 9) // 10 # 计算页码
- log.info(f"一共有{total}条已售数据, 总页数: {pages}..................................")
- for page in range(1, pages + 1):
- data = {
- "page": page,
- "pageSize": 10,
- "orderStatus": "3",
- "sortType": "auction_end",
- "ascSort": "desc"
- }
- response = requests.post(f"{base_url}/search/searchAuctionItem", headers=headers, json=data)
- page_data = response.json()['data']['cardCabinet']
- # all_data.extend(page_data)
- yield page_data
- # time.sleep(1)
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_bid(log, aid, page, token):
- """
- 获取竞价相关数据 每个用户的最后一条竞价信息
- :param token:
- :param log:
- :param aid: auctionItemId
- :param page:
- :return: result: recordList
- """
- url = f"{base_url}/api/v2/auction/record"
- params = {
- "auctionItemId": aid,
- "pageNumber": str(page),
- "pageSize": "10"
- }
- log.debug(f'正在获取竞价相关第{page}页的数据..................')
- headers["token"] = token[0]
- response = requests.get(url, headers=headers, params=params)
- # print(f'get_bid: {response.json()}')
- if response.status_code != 200:
- log.error(f"请求失败,状态码: {response.status_code}")
- raise Exception("请求失败")
- # time.sleep(1)
- recordList = response.json()['data']['recordList']
- if recordList:
- return recordList
- else:
- log.error(f"get_bid, error")
- raise Exception("获取get_bid数据失败")
- def get_bid_list(log, aid, bidIndex, token):
- """
- 获取竞价相关数据 每个用户的最后一条竞价信息
- :param token:
- :param log:
- :param aid: auctionItemId
- :param bidIndex: 竞价总条数
- :return: result: JSON列表格式
- """
- # if bidIndex <= 0:
- # bidIndex = 1
- log.info(f"开始获取第{aid}的get_bid_list数据, 一共{bidIndex}条")
- pages = (bidIndex + 9) // 10 # 计算页码
- resp_list = []
- # for page in range(1, int(pages) + 1):
- # recordList = get_bid(log, aid, page, token)
- # resp_list.extend(recordList)
- for page in range(1, int(pages) + 1):
- try:
- recordList = get_bid(log, aid, page, token)
- resp_list.extend(recordList)
- except Exception as e:
- log.error(f"recordList get_bid error: {e}")
- break
- # 创建一个字典来存储每个用户的最新竞价信息
- latest_bids = {}
- for bid in resp_list:
- nick_name = bid['nickName']
- if nick_name not in latest_bids or bid['bidTime'] > latest_bids[nick_name]['bidTime']:
- latest_bids[nick_name] = {'nickName': nick_name, 'bidPrice': bid['bidPrice'], 'bidTime': bid['bidTime']}
- result = [
- {'nickName': i['nickName'], 'bidPrice': i['bidPrice'], 'bidTime': transform_timestamp(i['bidTime'])}
- for i in latest_bids.values()
- ]
- return result
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def run(log, sql_pool, token):
- """
- 主运行函数
- """
- try:
- log.info("开始运行 sold_spider 爬虫任务............................................................")
- for page_data in fetch_all_pages(log):
- info_list = []
- for auction in page_data:
- aid = auction.get('auctionItemId')
- cabinetId = auction.get("cabinetId")
- imgs = auction.get("imgs")
- title = auction.get("title")
- bidIndex = auction.get("currBidIndex")
- price = auction.get("price")
- lastBidPrice = auction.get("lastBidPrice")
- auctionStart_ = auction.get("auctionStart")
- auctionStart = transform_timestamp(auctionStart_)
- auctionEnd_ = auction.get("auctionEnd")
- auctionEnd = transform_timestamp(auctionEnd_)
- bid_list = get_bid_list(log, aid, bidIndex, token)
- # 获取详情页数据
- act_dict = get_action(log, aid, token)
- # time.sleep(random.randint(5, 10))
- cab_dict = get_cabinet(log, cabinetId, token)
- # follows = act_dict.get("follows")
- if not bid_list:
- bid_list = []
- info_dict = {
- "cabinetId": cabinetId,
- "auctionItemId": aid,
- "imgs": imgs,
- "title": title,
- "bidIndex": bidIndex,
- "price": price,
- "lastBidPrice": lastBidPrice,
- "auctionStart": auctionStart,
- "auctionEnd": auctionEnd,
- "bid_list": json.dumps(bid_list, ensure_ascii=False),
- "nickName": act_dict.get("nickName"),
- "following": act_dict.get("following"),
- "certifyStatus": act_dict.get("certifyStatus"),
- "ipRegion": act_dict.get("ipRegion"),
- "credit": act_dict.get("credit"),
- "agentLevel": act_dict.get("agentLevel"),
- "agentId": act_dict.get("agentId"),
- "rmbPrice": cab_dict.get("rmbPrice"),
- "brand": cab_dict.get("brand"),
- "status": cab_dict.get("status"),
- "switchSt": cab_dict.get("switchSt"),
- "cardNo": cab_dict.get("cardNo"),
- "barcodeId": cab_dict.get("barcodeId"),
- "year": cab_dict.get("year"),
- "grade": cab_dict.get("grade"),
- "setName": cab_dict.get("setName"),
- "player": cab_dict.get("player"),
- "onSaleExpireTs": cab_dict.get("onSaleExpireTs"),
- "authenticNumber": cab_dict.get("authenticNumber")
- }
- info_list.append(info_dict)
- # 保存数据
- sql_pool.insert_many(table="weikajia_sold", data_list=info_list, ignore=True)
- # time.sleep(random.randint(1, 3))
- except Exception as e:
- log.error(f'Error: {e}')
- finally:
- log.info("爬虫程序运行结束,等待下一轮的采集任务.............")
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def sold_main(log):
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- token = sql_pool.select_one("select token from wkj_token")
- run(log, sql_pool, token)
- if __name__ == '__main__':
- from loguru import logger
- sold_main(log=logger)
|