weika_sold_spider.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.8.10
  4. # Date: 2024-09-30 13:29
  5. import json
  6. import random
  7. import time
  8. import requests
  9. from retrying import retry
  10. from datetime import datetime
  11. from mysq_pool import MySQLConnectionPool
  12. # logger.remove()
  13. # logger.add("sold_logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  14. # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  15. # level="DEBUG", retention="1 day")
  16. headers = {
  17. "appVersion": "1.6.5",
  18. "osVersion": "9",
  19. "deviceModel": "M2007J22C",
  20. "appVersionCode": "168",
  21. "deviceBrand": "xiaomi",
  22. "platform": "android",
  23. "token": "",
  24. "user-agent": "Mozilla/5.0 (Linux; Android 9; M2007J22C Build/QP1A.190711.020; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/92.0.4515.131 Mobile Safari/537.36",
  25. "Content-Type": "application/json",
  26. "Connection": "Keep-Alive"
  27. }
  28. base_url = "https://api.weikajia.com"
  29. def transform_timestamp(timestamp):
  30. """
  31. 将 timestamp 格式转换为 %Y-%m-%d %H:%M:%S
  32. :param timestamp:
  33. :return: formatted_time
  34. """
  35. # 将Unix时间戳转换为datetime对象
  36. dt_object = datetime.fromtimestamp(int(timestamp))
  37. # 格式化时间
  38. formatted_time = dt_object.strftime('%Y-%m-%d %H:%M:%S')
  39. return formatted_time
  40. @retry(stop_max_attempt_number=3, wait_fixed=1000)
  41. def get_action(log, auctionId):
  42. """
  43. 获取auction信息
  44. :param log:
  45. :param auctionId:
  46. :return: agentUserInfo
  47. """
  48. log.debug(f'正在查询auctionId为: {auctionId}的信息..............')
  49. url = f"{base_url}/api/v2/auction/detail"
  50. params = {
  51. "auctionId": auctionId
  52. }
  53. response = requests.get(url, headers=headers, params=params, timeout=5)
  54. # print(f'get_action: {response.json()}')
  55. if response.json()["resultCode"] == 200:
  56. try:
  57. agentUserInfo = response.json()["data"].get("agentUserInfo")
  58. agentId = response.json()["data"].get("agentId")
  59. agentUserInfo["agentId"] = agentId
  60. return agentUserInfo
  61. except Exception as e:
  62. log.error(f"get_action agentUserInfo , error: {e}")
  63. return {}
  64. else:
  65. log.debug("get_action 请求失败,重试中...........")
  66. # raise Exception("请求失败")
  67. return {}
  68. @retry(stop_max_attempt_number=3, wait_fixed=1000)
  69. def get_cabinet(log, cabinetId):
  70. """
  71. 获取cabinet信息
  72. :param log:
  73. :param cabinetId:
  74. :return: cab_dict
  75. """
  76. log.debug(f'正在查询cabinetId为: {cabinetId}的信息..............')
  77. url = f"{base_url}/api/v2/cabinet/detail"
  78. params = {
  79. "cabinetId": cabinetId
  80. }
  81. response = requests.get(url, headers=headers, params=params, timeout=5)
  82. # print(f'get_cabinet: {response.json()}')
  83. if response.json()["resultCode"] == 200:
  84. data = response.json()["data"]
  85. cab_dict = {"rmbPrice": data.get("rmbPrice"), "brand": data.get("brand"), "status": data.get("status"),
  86. "switchSt": data.get("switchSt"), "cardNo": data.get("cardNo"),
  87. "barcodeId": data.get("barcodeId"), "year": data.get("year"), "grade": data.get("grade"),
  88. "setName": data.get("setName"), "player": data.get("player"),
  89. "onSaleExpireTs": data.get("onSaleExpireTs"), "authenticNumber": data.get("authenticNumber")
  90. }
  91. return cab_dict
  92. else:
  93. log.debug("get_cabinet 请求失败,重试中...........")
  94. raise Exception("请求失败")
  95. @retry(stop_max_attempt_number=3, wait_fixed=1000)
  96. def get_sold_xhr_page(log):
  97. """
  98. 获取已售数据页数
  99. :return: total
  100. """
  101. log.info("开始获取总页数....")
  102. url = f"{base_url}/search/searchAuctionItem"
  103. data = {
  104. "page": 1,
  105. "pageSize": 10,
  106. "orderStatus": "3",
  107. "sortType": "auction_end",
  108. "ascSort": "desc"
  109. }
  110. response = requests.post(url, headers=headers, json=data)
  111. total = response.json()['data']['total']
  112. if total:
  113. return total
  114. else:
  115. log.error("get_sold_xhr_page, error")
  116. raise Exception("获取get_sold_xhr_page数据失败")
  117. def fetch_all_pages(log):
  118. """
  119. 查询所有页数的数据
  120. :return: page_data 每页的数据
  121. """
  122. log.info("开始获取所有页数据....")
  123. total = get_sold_xhr_page(log)
  124. pages = (total + 9) // 10 # 计算页码
  125. log.info(f"一共有{total}条已售数据, 总页数: {pages}..................................")
  126. for page in range(1, pages + 1):
  127. data = {
  128. "page": page,
  129. "pageSize": 10,
  130. "orderStatus": "3",
  131. "sortType": "auction_end",
  132. "ascSort": "desc"
  133. }
  134. response = requests.post(f"{base_url}/search/searchAuctionItem", headers=headers, json=data)
  135. page_data = response.json()['data']['cardCabinet']
  136. # all_data.extend(page_data)
  137. yield page_data
  138. time.sleep(1)
  139. @retry(stop_max_attempt_number=3, wait_fixed=1000)
  140. def get_bid(log, aid, page, token):
  141. """
  142. 获取竞价相关数据 每个用户的最后一条竞价信息
  143. :param token:
  144. :param log:
  145. :param aid: auctionItemId
  146. :param page:
  147. :return: result: recordList
  148. """
  149. url = f"{base_url}/api/v2/auction/record"
  150. params = {
  151. "auctionItemId": aid,
  152. "pageNumber": str(page),
  153. "pageSize": "10"
  154. }
  155. log.debug(f'正在获取竞价相关第{page}页的数据..................')
  156. headers["token"] = token[0]
  157. response = requests.get(url, headers=headers, params=params)
  158. # print(f'get_bid: {response.json()}')
  159. if response.status_code != 200:
  160. log.error(f"请求失败,状态码: {response.status_code}")
  161. raise Exception("请求失败")
  162. time.sleep(1)
  163. recordList = response.json()['data']['recordList']
  164. if recordList:
  165. return recordList
  166. else:
  167. log.error(f"get_bid, error")
  168. raise Exception("获取get_bid数据失败")
  169. def get_bid_list(log, aid, bidIndex, token):
  170. """
  171. 获取竞价相关数据 每个用户的最后一条竞价信息
  172. :param token:
  173. :param log:
  174. :param aid: auctionItemId
  175. :param bidIndex: 竞价总条数
  176. :return: result: JSON列表格式
  177. """
  178. # if bidIndex <= 0:
  179. # bidIndex = 1
  180. log.info(f"开始获取第{aid}的get_bid_list数据, 一共{bidIndex}条")
  181. pages = (bidIndex + 9) // 10 # 计算页码
  182. resp_list = []
  183. # for page in range(1, int(pages) + 1):
  184. # recordList = get_bid(log, aid, page, token)
  185. # resp_list.extend(recordList)
  186. for page in range(1, int(pages) + 1):
  187. try:
  188. recordList = get_bid(log, aid, page, token)
  189. resp_list.extend(recordList)
  190. except Exception as e:
  191. log.error(f"recordList get_bid error: {e}")
  192. break
  193. # 创建一个字典来存储每个用户的最新竞价信息
  194. latest_bids = {}
  195. for bid in resp_list:
  196. nick_name = bid['nickName']
  197. if nick_name not in latest_bids or bid['bidTime'] > latest_bids[nick_name]['bidTime']:
  198. latest_bids[nick_name] = {'nickName': nick_name, 'bidPrice': bid['bidPrice'], 'bidTime': bid['bidTime']}
  199. result = [
  200. {'nickName': i['nickName'], 'bidPrice': i['bidPrice'], 'bidTime': transform_timestamp(i['bidTime'])}
  201. for i in latest_bids.values()
  202. ]
  203. return result
  204. def save_data(sql_pool, info):
  205. """
  206. 保存数据
  207. :param sql_pool:
  208. :param info:
  209. :return:
  210. """
  211. sql = """INSERT INTO weikajia_sold(cabinetId, auctionItemId, imgs, title, bidIndex, price, lastBidPrice, auctionStart, auctionEnd, bid_list, nickName, following, certifyStatus, ipRegion, credit, agentLevel, agentId, rmbPrice, brand, status, switchSt, cardNo, barcodeId, year, grade, setName, player, onSaleExpireTs, authenticNumber) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
  212. sql_pool.insert_one(sql, info)
  213. @retry(stop_max_attempt_number=3, wait_fixed=1000)
  214. def run(sql_pool, log, token):
  215. """
  216. 主运行函数
  217. """
  218. try:
  219. log.info("开始运行 sold_spider 爬虫任务............................................................")
  220. # token = sql_pool.select_one("select token from wkj_token")
  221. # headers["token"] = token[0]
  222. sql_cabinetId_list = sql_pool.select_all("select cabinetId from weikajia_sold")
  223. cabinetId_list = [i[0] for i in sql_cabinetId_list]
  224. # print(cabinetId_list)
  225. for page_data in fetch_all_pages(log):
  226. # info_list = []
  227. for auction in page_data:
  228. aid = auction.get('auctionItemId')
  229. cabinetId = auction.get("cabinetId")
  230. # 判断cabid是否在库中
  231. if cabinetId in cabinetId_list:
  232. log.info(
  233. f"{cabinetId}已存在,跳过............................................................")
  234. continue
  235. else:
  236. cabinetId_list.append(cabinetId)
  237. imgs = auction.get("imgs")
  238. title = auction.get("title")
  239. bidIndex = auction.get("currBidIndex")
  240. price = auction.get("price")
  241. lastBidPrice = auction.get("lastBidPrice")
  242. auctionStart_ = auction.get("auctionStart")
  243. auctionStart = transform_timestamp(auctionStart_)
  244. auctionEnd_ = auction.get("auctionEnd")
  245. auctionEnd = transform_timestamp(auctionEnd_)
  246. bid_list = get_bid_list(log, aid, bidIndex, token)
  247. # 获取详情页数据
  248. act_dict = get_action(log, aid)
  249. # time.sleep(random.randint(5, 10))
  250. cab_dict = get_cabinet(log, cabinetId)
  251. # follows = act_dict.get("follows")
  252. if not bid_list:
  253. bid_list = []
  254. info = (
  255. cabinetId, aid, imgs, title, bidIndex, price, lastBidPrice, auctionStart, auctionEnd,
  256. json.dumps(bid_list, ensure_ascii=False), act_dict.get("nickName"),
  257. act_dict.get("following"), act_dict.get("certifyStatus"), act_dict.get("ipRegion"),
  258. act_dict.get("credit"), act_dict.get("agentLevel"), act_dict.get("agentId"),
  259. cab_dict.get("rmbPrice"), cab_dict.get("brand"), cab_dict.get("status"),
  260. cab_dict.get("switchSt"), cab_dict.get("cardNo"), cab_dict.get("barcodeId"),
  261. cab_dict.get("year"), cab_dict.get("grade"), cab_dict.get("setName"),
  262. cab_dict.get("player"), cab_dict.get("onSaleExpireTs"), cab_dict.get("authenticNumber")
  263. )
  264. # info_list.append(info)
  265. # 保存每页的数据
  266. # logger.info(info)
  267. save_data(sql_pool, info)
  268. time.sleep(random.randint(1, 3))
  269. cabinetId_list.clear()
  270. except Exception as e:
  271. log.error(f'Error: {e}')
  272. finally:
  273. log.info("爬虫程序运行结束,等待下一轮的采集任务.............")
  274. @retry(stop_max_attempt_number=100, wait_fixed=3600000)
  275. def sold_main(log):
  276. sql_pool = MySQLConnectionPool(log=log)
  277. if not sql_pool:
  278. log.error("数据库连接失败")
  279. raise Exception("数据库连接失败")
  280. token = sql_pool.select_one("select token from wkj_token")
  281. run(sql_pool, log, token)
  282. if __name__ == '__main__':
  283. from loguru import logger
  284. sold_main(log=logger)