weika_sold_spider.py 13 KB


  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.8.10
  4. # Date: 2024-09-30 13:29
  5. import json
  6. import random
  7. import time
  8. import requests
  9. from datetime import datetime
  10. from mysql_pool import MySQLConnectionPool
  11. from tenacity import retry, stop_after_attempt, wait_fixed
  12. # logger.remove()
  13. # logger.add("sold_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  14. # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  15. # level="DEBUG", retention="1 day")
  16. headers = {
  17. "appVersion": "2.0.4",
  18. "osVersion": "11",
  19. "deviceModel": "Pixel 5",
  20. "appVersionCode": "196",
  21. "deviceBrand": "google",
  22. "platform": "android",
  23. "token": "",
  24. "User-Agent": "Mozilla/5.0 (Linux; Android 11; Pixel 5 Build/RQ3A.211001.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/83.0.4103.106 Mobile Safari/537.36 uni-app Html5Plus/1.0 (Immersed/52.727272)",
  25. "Content-Type": "application/json",
  26. "Connection": "Keep-Alive"
  27. }
  28. base_url = "https://api.weikajia.com"
  29. def after_log(retry_state):
  30. """
  31. retry 回调
  32. :param retry_state: RetryCallState 对象
  33. """
  34. # 检查 args 是否存在且不为空
  35. if retry_state.args and len(retry_state.args) > 0:
  36. log = retry_state.args[0] # 获取传入的 logger
  37. else:
  38. log = logger # 使用全局 logger
  39. if retry_state.outcome.failed:
  40. log.warning(
  41. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  42. else:
  43. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  44. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  45. def get_proxys(log):
  46. """
  47. 获取代理
  48. :return: 代理
  49. """
  50. tunnel = "x371.kdltps.com:15818"
  51. kdl_username = "t13753103189895"
  52. kdl_password = "o0yefv6z"
  53. try:
  54. proxies = {
  55. "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
  56. "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
  57. }
  58. return proxies
  59. except Exception as e:
  60. log.error(f"Error getting proxy: {e}")
  61. raise e
  62. def transform_timestamp(timestamp):
  63. """
  64. 将 timestamp 格式转换为 %Y-%m-%d %H:%M:%S
  65. :param timestamp:
  66. :return: formatted_time
  67. """
  68. # 将Unix时间戳转换为datetime对象
  69. dt_object = datetime.fromtimestamp(int(timestamp))
  70. # 格式化时间
  71. formatted_time = dt_object.strftime('%Y-%m-%d %H:%M:%S')
  72. return formatted_time
  73. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  74. def get_action(log, auctionId, token):
  75. """
  76. 获取 auction 信息
  77. :param log:
  78. :param auctionId:
  79. :param token:
  80. :return: agentUserInfo
  81. """
  82. log.debug(f'正在查询auctionId为: {auctionId}的信息..............')
  83. url = f"{base_url}/api/v2/auction/detail"
  84. params = {
  85. "auctionId": auctionId
  86. }
  87. headers["token"] = token[0]
  88. response = requests.get(url, headers=headers, params=params, timeout=5)
  89. # print(f'get_action: {response.json()}')
  90. if response.json()["resultCode"] == 200:
  91. try:
  92. agentUserInfo = response.json()["data"].get("agentUserInfo")
  93. if agentUserInfo:
  94. agentId = response.json()["data"].get("agentId")
  95. agentUserInfo["agentId"] = agentId
  96. return agentUserInfo
  97. else:
  98. log.warning("get_action agentUserInfo 为空, 跳过...........")
  99. return {}
  100. except Exception as e:
  101. log.error(f"get_action agentUserInfo , error: {e}")
  102. return {}
  103. else:
  104. log.debug("get_action 请求失败,重试中...........")
  105. # raise Exception("请求失败")
  106. return {}
  107. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  108. def get_cabinet(log, cabinetId, token):
  109. """
  110. 获取 cabinet 信息
  111. :param log:
  112. :param cabinetId:
  113. :param token:
  114. :return: cab_dict
  115. """
  116. log.debug(f'正在查询cabinetId为: {cabinetId}的信息..............')
  117. url = f"{base_url}/api/v2/cabinet/detail"
  118. params = {
  119. "cabinetId": cabinetId
  120. }
  121. headers["token"] = token[0]
  122. response = requests.get(url, headers=headers, params=params, timeout=5)
  123. # print(f'get_cabinet: {response.json()}')
  124. if response.json()["resultCode"] == 200:
  125. data = response.json()["data"]
  126. cab_dict = {"rmbPrice": data.get("rmbPrice"), "brand": data.get("brand"), "status": data.get("status"),
  127. "switchSt": data.get("switchSt"), "cardNo": data.get("cardNo"),
  128. "barcodeId": data.get("barcodeId"), "year": data.get("year"), "grade": data.get("grade"),
  129. "setName": data.get("setName"), "player": data.get("player"),
  130. "onSaleExpireTs": data.get("onSaleExpireTs"), "authenticNumber": data.get("authenticNumber")
  131. }
  132. return cab_dict
  133. else:
  134. log.debug("get_cabinet 请求失败,重试中...........")
  135. raise Exception("请求失败")
  136. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  137. def get_sold_xhr_page(log):
  138. """
  139. 获取已售数据页数
  140. :return: total
  141. """
  142. log.info("开始获取总页数....")
  143. url = f"{base_url}/search/searchAuctionItem"
  144. data = {
  145. "page": 1,
  146. "pageSize": 10,
  147. "orderStatus": "3",
  148. "sortType": "auction_end",
  149. "ascSort": "desc"
  150. }
  151. response = requests.post(url, headers=headers, json=data)
  152. total = response.json()['data']['total']
  153. if total:
  154. return total
  155. else:
  156. log.error("get_sold_xhr_page, error")
  157. raise Exception("获取get_sold_xhr_page数据失败")
  158. def fetch_all_pages(log):
  159. """
  160. 查询所有页数的数据
  161. :return: page_data 每页的数据
  162. """
  163. log.info("开始获取所有页数据....")
  164. total = get_sold_xhr_page(log)
  165. pages = (total + 9) // 10 # 计算页码
  166. log.info(f"一共有{total}条已售数据, 总页数: {pages}..................................")
  167. for page in range(1, pages + 1):
  168. data = {
  169. "page": page,
  170. "pageSize": 10,
  171. "orderStatus": "3",
  172. "sortType": "auction_end",
  173. "ascSort": "desc"
  174. }
  175. response = requests.post(f"{base_url}/search/searchAuctionItem", headers=headers, json=data)
  176. page_data = response.json()['data']['cardCabinet']
  177. # all_data.extend(page_data)
  178. yield page_data
  179. # time.sleep(1)
  180. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  181. def get_bid(log, aid, page, token):
  182. """
  183. 获取竞价相关数据 每个用户的最后一条竞价信息
  184. :param token:
  185. :param log:
  186. :param aid: auctionItemId
  187. :param page:
  188. :return: result: recordList
  189. """
  190. url = f"{base_url}/api/v2/auction/record"
  191. params = {
  192. "auctionItemId": aid,
  193. "pageNumber": str(page),
  194. "pageSize": "10"
  195. }
  196. log.debug(f'正在获取竞价相关第{page}页的数据..................')
  197. headers["token"] = token[0]
  198. response = requests.get(url, headers=headers, params=params)
  199. # print(f'get_bid: {response.json()}')
  200. if response.status_code != 200:
  201. log.error(f"请求失败,状态码: {response.status_code}")
  202. raise Exception("请求失败")
  203. # time.sleep(1)
  204. recordList = response.json()['data']['recordList']
  205. if recordList:
  206. return recordList
  207. else:
  208. log.error(f"get_bid, error")
  209. raise Exception("获取get_bid数据失败")
  210. def get_bid_list(log, aid, bidIndex, token):
  211. """
  212. 获取竞价相关数据 每个用户的最后一条竞价信息
  213. :param token:
  214. :param log:
  215. :param aid: auctionItemId
  216. :param bidIndex: 竞价总条数
  217. :return: result: JSON列表格式
  218. """
  219. # if bidIndex <= 0:
  220. # bidIndex = 1
  221. log.info(f"开始获取第{aid}的get_bid_list数据, 一共{bidIndex}条")
  222. pages = (bidIndex + 9) // 10 # 计算页码
  223. resp_list = []
  224. # for page in range(1, int(pages) + 1):
  225. # recordList = get_bid(log, aid, page, token)
  226. # resp_list.extend(recordList)
  227. for page in range(1, int(pages) + 1):
  228. try:
  229. recordList = get_bid(log, aid, page, token)
  230. resp_list.extend(recordList)
  231. except Exception as e:
  232. log.error(f"recordList get_bid error: {e}")
  233. break
  234. # 创建一个字典来存储每个用户的最新竞价信息
  235. latest_bids = {}
  236. for bid in resp_list:
  237. nick_name = bid['nickName']
  238. if nick_name not in latest_bids or bid['bidTime'] > latest_bids[nick_name]['bidTime']:
  239. latest_bids[nick_name] = {'nickName': nick_name, 'bidPrice': bid['bidPrice'], 'bidTime': bid['bidTime']}
  240. result = [
  241. {'nickName': i['nickName'], 'bidPrice': i['bidPrice'], 'bidTime': transform_timestamp(i['bidTime'])}
  242. for i in latest_bids.values()
  243. ]
  244. return result
  245. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  246. def run(log, sql_pool, token):
  247. """
  248. 主运行函数
  249. """
  250. try:
  251. log.info("开始运行 sold_spider 爬虫任务............................................................")
  252. for page_data in fetch_all_pages(log):
  253. info_list = []
  254. for auction in page_data:
  255. aid = auction.get('auctionItemId')
  256. cabinetId = auction.get("cabinetId")
  257. imgs = auction.get("imgs")
  258. title = auction.get("title")
  259. bidIndex = auction.get("currBidIndex")
  260. price = auction.get("price")
  261. lastBidPrice = auction.get("lastBidPrice")
  262. auctionStart_ = auction.get("auctionStart")
  263. auctionStart = transform_timestamp(auctionStart_)
  264. auctionEnd_ = auction.get("auctionEnd")
  265. auctionEnd = transform_timestamp(auctionEnd_)
  266. bid_list = get_bid_list(log, aid, bidIndex, token)
  267. # 获取详情页数据
  268. act_dict = get_action(log, aid, token)
  269. # time.sleep(random.randint(5, 10))
  270. cab_dict = get_cabinet(log, cabinetId, token)
  271. # follows = act_dict.get("follows")
  272. if not bid_list:
  273. bid_list = []
  274. info_dict = {
  275. "cabinetId": cabinetId,
  276. "auctionItemId": aid,
  277. "imgs": imgs,
  278. "title": title,
  279. "bidIndex": bidIndex,
  280. "price": price,
  281. "lastBidPrice": lastBidPrice,
  282. "auctionStart": auctionStart,
  283. "auctionEnd": auctionEnd,
  284. "bid_list": json.dumps(bid_list, ensure_ascii=False),
  285. "nickName": act_dict.get("nickName"),
  286. "following": act_dict.get("following"),
  287. "certifyStatus": act_dict.get("certifyStatus"),
  288. "ipRegion": act_dict.get("ipRegion"),
  289. "credit": act_dict.get("credit"),
  290. "agentLevel": act_dict.get("agentLevel"),
  291. "agentId": act_dict.get("agentId"),
  292. "rmbPrice": cab_dict.get("rmbPrice"),
  293. "brand": cab_dict.get("brand"),
  294. "status": cab_dict.get("status"),
  295. "switchSt": cab_dict.get("switchSt"),
  296. "cardNo": cab_dict.get("cardNo"),
  297. "barcodeId": cab_dict.get("barcodeId"),
  298. "year": cab_dict.get("year"),
  299. "grade": cab_dict.get("grade"),
  300. "setName": cab_dict.get("setName"),
  301. "player": cab_dict.get("player"),
  302. "onSaleExpireTs": cab_dict.get("onSaleExpireTs"),
  303. "authenticNumber": cab_dict.get("authenticNumber")
  304. }
  305. info_list.append(info_dict)
  306. # 保存数据
  307. sql_pool.insert_many(table="weikajia_sold", data_list=info_list, ignore=True)
  308. # time.sleep(random.randint(1, 3))
  309. except Exception as e:
  310. log.error(f'Error: {e}')
  311. finally:
  312. log.info("爬虫程序运行结束,等待下一轮的采集任务.............")
  313. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  314. def sold_main(log):
  315. # 配置 MySQL 连接池
  316. sql_pool = MySQLConnectionPool(log=log)
  317. if not sql_pool.check_pool_health():
  318. log.error("数据库连接池异常")
  319. raise RuntimeError("数据库连接池异常")
  320. token = sql_pool.select_one("select token from wkj_token")
  321. run(log, sql_pool, token)
  322. if __name__ == '__main__':
  323. from loguru import logger
  324. sold_main(log=logger)