live_add.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/7/8 16:51
  5. from datetime import datetime
  6. import utils
  7. import inspect
  8. from loguru import logger
  9. from mysql_pool import MySQLConnectionPool
  10. from tenacity import retry, stop_after_attempt, wait_fixed
  11. logger.remove()
  12. logger.add("./logs/add_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  13. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  14. level="DEBUG", retention="7 day")
  15. def after_log(retry_state):
  16. """
  17. retry 回调
  18. :param retry_state: RetryCallState 对象
  19. """
  20. # 检查 args 是否存在且不为空
  21. if retry_state.args and len(retry_state.args) > 0:
  22. log = retry_state.args[0] # 获取传入的 logger
  23. else:
  24. log = logger # 使用全局 logger
  25. if retry_state.outcome.failed:
  26. log.warning(
  27. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  28. else:
  29. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  30. # ----------------------------------------------------------------------------------------------------------------------
  31. # ----------------------------------------------------------------------------------------------------------------------
  32. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  33. def get_order_first_group(log, live_id, goods_id, sql_pool):
  34. """
  35. 获取第一种情况的购买记录 查询所需的 groupId
  36. :param log:
  37. :param live_id:
  38. :param goods_id:
  39. :param sql_pool:
  40. """
  41. log.debug(
  42. f"{inspect.currentframe().f_code.co_name} get groupId start, live_id:{live_id}, goods_id:{goods_id} ..........")
  43. url = "https://api.qiandao.cn/box/live-draw/group"
  44. params = {
  45. # "shelfId": "876650715072717042"
  46. "shelfId": goods_id
  47. }
  48. resp_json = utils.request_get_data(log, url, params)
  49. # print(resp_json)
  50. if resp_json.get("code") == 0:
  51. groupId = resp_json.get("data", {}).get("groupId")
  52. if groupId:
  53. # 需要先查询 groupId 最大的 trading_time, 防止数据重复
  54. max_trading_time = sql_pool.select_one(
  55. query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s",
  56. args=(groupId,))
  57. max_trading_time = max_trading_time[0] if max_trading_time else None
  58. get_first_list(log, groupId, live_id, goods_id, sql_pool, max_trading_time)
  59. else:
  60. log.warning(f"live_id:{live_id}, 未获取到groupId............")
  61. else:
  62. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  63. def get_first_list(log, groupId, live_id, goods_id, sql_pool, max_trading_time):
  64. """
  65. 获取第一种情况的购买记录 列表 翻页
  66. :param log:
  67. :param groupId:
  68. :param live_id:
  69. :param goods_id:
  70. :param sql_pool:
  71. :param max_trading_time:
  72. :return:
  73. """
  74. page = 1
  75. max_page = 50
  76. # total_count = 0
  77. while page <= max_page:
  78. len_item = get_order_records_first(log, groupId, live_id, goods_id, sql_pool, page, max_trading_time)
  79. if len_item < 20:
  80. log.debug(f"--------------- page {page}, len_item: {len_item} ---------------")
  81. break
  82. page += 1
  83. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  84. def get_order_records_first(log, groupId, live_id, goods_id, sql_pool, page, max_trading_time):
  85. """
  86. 获取第一种情况的购买记录 详情见图 商品点开 直接是购买页面 "goodsType": "LIVE_DRAW",
  87. :param log:
  88. :param groupId:
  89. :param live_id:
  90. :param goods_id:
  91. :param sql_pool:
  92. :param page:
  93. :param max_trading_time:
  94. """
  95. log.debug(f"{inspect.currentframe().f_code.co_name} get order start, groupId:{groupId} ..........")
  96. url = "https://api.qiandao.cn/box/live-draw/draw-records"
  97. params = {
  98. "limit": "20",
  99. # "offset": "0",
  100. "offset": str((page - 1) * 20),
  101. # "groupId": "883762283266720281"
  102. "groupId": groupId
  103. }
  104. resp_json = utils.request_get_data(log, url, params)
  105. print(resp_json)
  106. if resp_json.get("code") == 0:
  107. res_data = resp_json.get("data", {})
  108. records = res_data.get("records", [])
  109. if not records:
  110. log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no records")
  111. return 0
  112. info_list = []
  113. for row in records:
  114. status = row.get("status")
  115. drawResult = row.get("drawResult") # 抽到的号码
  116. userName = row.get("userName")
  117. trading_time = row.get("createdAt", "")
  118. trading_time_str = utils.transform_ms(log, trading_time)
  119. if not trading_time_str:
  120. log.debug(f"Invalid trading time: {trading_time_str}")
  121. continue # 跳过无效时间
  122. # 字符串 -> datetime
  123. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  124. if max_trading_time and trading_time <= max_trading_time:
  125. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  126. break
  127. isGoodProduct = row.get("isGoodProduct") # 是否欧皇号码
  128. data_dict = {
  129. "live_id": live_id,
  130. "goods_id": goods_id,
  131. "group_id": groupId,
  132. "status": status,
  133. "draw_result": drawResult,
  134. "user_name": userName,
  135. # "created_at": createdAt,
  136. "trading_time": trading_time_str,
  137. "is_good_product": isGoodProduct
  138. }
  139. # print(data_dict)
  140. info_list.append(data_dict)
  141. if info_list:
  142. try:
  143. sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list)
  144. except Exception as e:
  145. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  146. return len(records)
  147. else:
  148. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  149. return 0
  150. # ----------------------------------------------------------------------------------------------------------------------
  151. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  152. def get_order_records_second(log, live_id, goods_id, sql_pool):
  153. """
  154. 获取第二种情况的购买记录 详情见图 "goodsType": "MALL",
  155. :param log:
  156. :param live_id:
  157. :param goods_id:
  158. :param sql_pool:
  159. """
  160. log.debug(
  161. f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........")
  162. url = "https://api.qiandao.cn/mall/dynamic-detail"
  163. params = {
  164. # "shelfId": "873540181645335865",
  165. "shelfId": goods_id,
  166. "deliverPatterns": "0"
  167. }
  168. resp_json = utils.request_get_data(log, url, params)
  169. # print(resp_json)
  170. if resp_json.get("code") == 0:
  171. res_data = resp_json.get("data", {})
  172. rows = res_data.get("recentPurchase", {}).get("rows", [])
  173. if not rows:
  174. log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no rows")
  175. return
  176. # 需要先查询 groupId 最大的 trading_time, 防止数据重复
  177. max_trading_time = sql_pool.select_one(
  178. query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s",
  179. args=(live_id,))
  180. max_trading_time = max_trading_time[0] if max_trading_time else None
  181. info_list = []
  182. for row in rows:
  183. productName = row.get("productName")
  184. productCount = row.get("productCount")
  185. trading_time = row.get("paidAt")
  186. trading_time_str = utils.transform_ms(log, trading_time)
  187. if not trading_time_str:
  188. log.debug(f"Invalid trading time: {trading_time_str}")
  189. continue # 跳过无效时间
  190. # 字符串 -> datetime
  191. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  192. if max_trading_time and trading_time <= max_trading_time:
  193. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  194. break
  195. userNickname = row.get("userNickname")
  196. data_dict = {
  197. "live_id": live_id,
  198. "goods_id": goods_id,
  199. "product_name": productName,
  200. "product_count": productCount,
  201. "trading_time": trading_time_str,
  202. "user_name": userNickname
  203. }
  204. # print(data_dict)
  205. info_list.append(data_dict)
  206. if info_list:
  207. try:
  208. sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list)
  209. except Exception as e:
  210. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  211. else:
  212. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  213. # ----------------------------------------------------------------------------------------------------------------------
  214. def get_third_list(log, live_id, goods_id, sql_pool):
  215. """
  216. 获取第三种情况的购买记录列表 翻页
  217. :param log:
  218. :param live_id:
  219. :param goods_id:
  220. :param sql_pool:
  221. """
  222. page = 1
  223. max_page = 50
  224. # total_count = 0
  225. # 需要先查询 groupId 最大的 trading_time, 防止数据重复
  226. max_trading_time = sql_pool.select_one(
  227. query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s",
  228. args=(live_id,))
  229. max_trading_time = max_trading_time[0] if max_trading_time else None
  230. while page <= max_page:
  231. len_item = get_order_records_third(log, live_id, goods_id, sql_pool, page, max_trading_time)
  232. if len_item < 10:
  233. log.debug(f"--------------- page {page}, len_item: {len_item} ---------------")
  234. break
  235. page += 1
  236. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  237. def get_order_records_third(log, live_id, goods_id, sql_pool, page, max_trading_time):
  238. """
  239. 获取第三种情况的购买记录 详情见图 "goodsType": "UNBOX",
  240. :param log:
  241. :param live_id:
  242. :param goods_id:
  243. :param sql_pool:
  244. :param page:
  245. :param max_trading_time:
  246. """
  247. log.debug(
  248. f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........")
  249. url = "https://api.qiandao.cn/box/unbox/recent-purchase-record"
  250. params = {
  251. # "shelfId": "876400156344348863",
  252. "shelfId": goods_id,
  253. "limit": "10",
  254. # "offset": "0"
  255. "offset": str((page - 1) * 10)
  256. }
  257. resp_json = utils.request_get_data(log, url, params)
  258. # print(resp_json)
  259. if resp_json.get("code") == 0:
  260. res_data = resp_json.get("data", {})
  261. rows = res_data.get("rows", [])
  262. if not rows:
  263. log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no rows")
  264. return 0
  265. info_list = []
  266. for row in rows:
  267. productName = row.get("productName")
  268. productCount = row.get("productCount")
  269. trading_time = row.get("paidAt")
  270. trading_time_str = utils.transform_ms(log, trading_time)
  271. if not trading_time_str:
  272. log.debug(f"Invalid trading time: {trading_time_str}")
  273. continue # 跳过无效时间
  274. # 字符串 -> datetime
  275. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  276. if max_trading_time and trading_time <= max_trading_time:
  277. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  278. break
  279. userNickname = row.get("userNickname")
  280. orderId = row.get("orderId")
  281. data_dict = {
  282. "live_id": live_id,
  283. "goods_id": goods_id,
  284. "product_name": productName,
  285. "product_count": productCount,
  286. "trading_time": trading_time_str,
  287. "user_name": userNickname,
  288. "order_id": orderId
  289. }
  290. # print(data_dict)
  291. info_list.append(data_dict)
  292. if info_list:
  293. try:
  294. sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list)
  295. except Exception as e:
  296. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  297. return len(rows)
  298. else:
  299. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  300. return 0
  301. # ----------------------------------------------------------------------------------------------------------------------
  302. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  303. def get_order_records_fourth(log, live_id, goods_id, sql_pool):
  304. """
  305. 获取第四种情况的购买记录 详情见图 "goodsType": "AUCTION", auction_id = goodsId
  306. :param log:
  307. :param live_id:
  308. :param goods_id:
  309. :param sql_pool:
  310. """
  311. log.debug(
  312. f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........")
  313. url = "https://api.qiandao.cn/auctioneer/bid/list"
  314. params = {
  315. # "auction_id": "883831521360768262",
  316. "auction_id": goods_id,
  317. "max_results": "200"
  318. }
  319. resp_json = utils.request_get_data(log, url, params)
  320. # print(resp_json)
  321. if resp_json.get("code") == 0:
  322. res_data = resp_json.get("data", {})
  323. items = res_data.get("items", [])
  324. if not items:
  325. log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no items")
  326. return
  327. # 需要先查询 groupId 最大的 trading_time, 防止数据重复
  328. max_trading_time = sql_pool.select_one(
  329. query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s",
  330. args=(live_id,))
  331. max_trading_time = max_trading_time[0] if max_trading_time else None
  332. info_list = []
  333. for item in items:
  334. user_id = item.get("user_id")
  335. nickname = item.get("nickname")
  336. price = item.get("price")
  337. trading_time = item.get("bid_time")
  338. trading_time_str = utils.transform_ms(log, trading_time)
  339. if not trading_time_str:
  340. log.debug(f"Invalid trading time: {trading_time_str}")
  341. continue # 跳过无效时间
  342. # 字符串 -> datetime
  343. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  344. if max_trading_time and trading_time <= max_trading_time:
  345. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  346. break
  347. hammered = item.get("hammered") # 是否最终成交
  348. data_dict = {
  349. "live_id": live_id,
  350. "goods_id": goods_id,
  351. "user_id": user_id,
  352. "user_name": nickname,
  353. "price": price,
  354. "trading_time": trading_time_str,
  355. "hammered": hammered
  356. }
  357. # print(data_dict)
  358. info_list.append(data_dict)
  359. if info_list:
  360. try:
  361. sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list)
  362. except Exception as e:
  363. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  364. else:
  365. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  366. def test():
  367. url = "https://api.qiandao.cn/auctioneer/bid/list"
  368. params = {
  369. "auction_id": "883831521360768262",
  370. "max_results": "200"
  371. }
  372. resp_json = utils.request_get_data(logger, url, params)
  373. print(resp_json)
  374. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  375. def order_main(log):
  376. """
  377. 主函数 详情页
  378. :param log: logger对象
  379. """
  380. log.info(
  381. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  382. # 配置 MySQL 连接池
  383. sql_pool = MySQLConnectionPool(log=log)
  384. if not sql_pool.check_pool_health():
  385. log.error("数据库连接池异常")
  386. raise RuntimeError("数据库连接池异常")
  387. try:
  388. # LIVE_DRAW MALL UNBOX AUCTION
  389. # 查询第一种情况
  390. sql_first_list = sql_pool.select_all(
  391. "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'LIVE_DRAW'")
  392. # sql_first_list = [item[0] for item in sql_first_list]
  393. # print(sql_first_list)
  394. for live_id, goods_id in sql_first_list:
  395. try:
  396. get_order_first_group(log, live_id, goods_id, sql_pool)
  397. except Exception as e:
  398. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  399. # 查询第二种情况
  400. sql_second_list = sql_pool.select_all(
  401. "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'MALL'")
  402. # sql_second_list = [item[0] for item in sql_second_list]
  403. for live_id, goods_id in sql_second_list:
  404. try:
  405. get_order_records_second(log, live_id, goods_id, sql_pool)
  406. except Exception as e:
  407. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  408. # 查询第三种情况
  409. sql_third_list = sql_pool.select_all(
  410. "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'UNBOX'")
  411. # sql_third_list = [item[0] for item in sql_third_list]
  412. for live_id, goods_id in sql_third_list:
  413. try:
  414. get_third_list(log, live_id, goods_id, sql_pool)
  415. except Exception as e:
  416. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  417. # 查询第四种情况
  418. sql_fourth_list = sql_pool.select_all(
  419. "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'AUCTION'")
  420. # sql_fourth_list = [item[0] for item in sql_fourth_list]
  421. for live_id, goods_id in sql_fourth_list:
  422. try:
  423. get_order_records_fourth(log, live_id, goods_id, sql_pool)
  424. except Exception as e:
  425. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  426. except Exception as e:
  427. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  428. if __name__ == '__main__':
  429. order_main(log=logger)