live_spider.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/7/8 16:51
  5. import threading
  6. import time
  7. from datetime import datetime
  8. import utils
  9. import inspect
  10. import schedule
  11. from loguru import logger
  12. from mysql_pool import MySQLConnectionPool
  13. from tenacity import retry, stop_after_attempt, wait_fixed
  14. logger.remove()
  15. logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  16. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  17. level="DEBUG", retention="7 day")
  18. def after_log(retry_state):
  19. """
  20. retry 回调
  21. :param retry_state: RetryCallState 对象
  22. """
  23. # 检查 args 是否存在且不为空
  24. if retry_state.args and len(retry_state.args) > 0:
  25. log = retry_state.args[0] # 获取传入的 logger
  26. else:
  27. log = logger # 使用全局 logger
  28. if retry_state.outcome.failed:
  29. log.warning(
  30. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  31. else:
  32. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  33. def get_live_list(log, sql_pool, sql_live_id_list):
  34. """
  35. 获取 live 数据列表
  36. :param log:
  37. :param sql_pool:
  38. :param sql_live_id_list:
  39. :return:
  40. """
  41. log.debug(f"{inspect.currentframe().f_code.co_name} crawl start ..........")
  42. page = 1
  43. max_page = 50
  44. total_count = 0
  45. while page <= max_page:
  46. len_item, totalCount = get_live_one_page(log, page, sql_pool, sql_live_id_list)
  47. if len_item < 20:
  48. log.debug(f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount} ---------------")
  49. break
  50. total_count += len_item
  51. if total_count >= int(totalCount):
  52. log.debug(f"total_count: {total_count} totalCount: {totalCount}")
  53. break
  54. page += 1
  55. # time.sleep(random.uniform(0.1, 1))
  56. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  57. def get_live_one_page(log, page, sql_pool, sql_live_id_list):
  58. """
  59. 获取 live 单页数据
  60. :param log:
  61. :param page:
  62. :param sql_pool:
  63. :param sql_live_id_list:
  64. :return:
  65. """
  66. log.debug(f"{inspect.currentframe().f_code.co_name} for page:{page}..........")
  67. url = "https://api.qiandao.com/live/list"
  68. data = {
  69. "limit": 20,
  70. # "offset": 0,
  71. "offset": (page - 1) * 20,
  72. "status": [
  73. "PREPARE",
  74. "UNDERWAY"
  75. ],
  76. "type": ""
  77. }
  78. resp_json = utils.request_post_data(logger, url, data)
  79. # print(resp_json)
  80. if not resp_json:
  81. log.error("get_luckybag_one_page error")
  82. raise RuntimeError("get_luckybag_one_page error")
  83. rows = resp_json.get("data", {}).get("rows", [])
  84. total_count = resp_json.get("data", {}).get("count", 0)
  85. try:
  86. parse_live_data(log, rows, sql_pool, sql_live_id_list)
  87. except Exception as e:
  88. log.error(f"parse_live_data error: {e}")
  89. return len(rows), total_count
  90. def parse_live_data(log, rows, sql_pool, sql_live_id_list):
  91. """
  92. 解析 live 数据
  93. :param log:
  94. :param rows:
  95. :param sql_pool:
  96. :param sql_live_id_list:
  97. :return:
  98. """
  99. live_list = []
  100. for row in rows:
  101. live_id = row.get("id")
  102. if live_id in sql_live_id_list:
  103. log.info(f"live_id:{live_id} is exist, skip .......")
  104. continue
  105. ownerId = row.get("ownerId")
  106. title = row.get("title")
  107. images = row.get("images", {}).get("coverImages", [])[0]
  108. startTime = row.get("startTime")
  109. startTime = utils.transform_ms(log, startTime)
  110. endTime = row.get("endTime")
  111. if endTime != '0':
  112. endTime = utils.transform_ms(log, endTime)
  113. status = row.get("status")
  114. pullUrl = row.get("pullUrl")
  115. createdAt = row.get("createdAt")
  116. updatedAt = row.get("updatedAt")
  117. user_id = row.get("user", {}).get("id")
  118. officialUserId = row.get("user", {}).get("officialUserId")
  119. nickname = row.get("user", {}).get("nickname")
  120. live_type = row.get("type")
  121. live_dict = {
  122. "live_id": live_id,
  123. "owner_id": ownerId,
  124. "title": title,
  125. "images": images,
  126. "start_time": startTime,
  127. "end_time": endTime,
  128. "status": status,
  129. "pull_url": pullUrl,
  130. "created_at": createdAt,
  131. "updated_at": updatedAt,
  132. "user_id": user_id,
  133. "official_user_id": officialUserId,
  134. "nickname": nickname,
  135. "live_type": live_type
  136. }
  137. # print(live_dict)
  138. live_list.append(live_dict)
  139. sql_live_id_list.append(live_id)
  140. # print(live_list)
  141. if live_list:
  142. try:
  143. sql_pool.insert_many(table="qiandao_live_list_record", data_list=live_list)
  144. except Exception as e:
  145. log.error(f"insert_many error: {e}")
  146. # def get_live_detail(log, live_id):
  147. # url = "https://api.qiandao.cn/live/detail"
  148. # params = {
  149. # "liveId": "883752924533007599"
  150. # }
  151. # resp_json = utils.request_get_data(log, url, params)
  152. # print(resp_json)
  153. # if resp_json.get("code") == 0:
  154. # res_data = resp_json.get("data", {})
  155. # ownerId = res_data.get("ownerId")
  156. # title = res_data.get("title")
  157. # else:
  158. # log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  159. def get_live_product_list(log, live_id, sql_pool, sql_product_list):
  160. """
  161. 获取 live 商品列表
  162. :param log:
  163. :param live_id:
  164. :param sql_pool:
  165. :param sql_product_list:
  166. :return:
  167. """
  168. page = 1
  169. max_page = 10
  170. total_count = 0
  171. while page <= max_page:
  172. len_item, totalCount = get_product_single_page(log, live_id, page, sql_pool, sql_product_list)
  173. if len_item < 50:
  174. log.debug(
  175. f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount}, Less than 50, break ---------------")
  176. break
  177. total_count += len_item
  178. if total_count >= int(totalCount):
  179. log.debug(f"total_count: {total_count} totalCount: {totalCount}")
  180. break
  181. page += 1
  182. # time.sleep(random.uniform(0.1, 1))
  183. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  184. def get_product_single_page(log, live_id, page, sql_pool, sql_product_list):
  185. """
  186. 获取 live 商品列表单页数据
  187. :param log:
  188. :param live_id:
  189. :param page:
  190. :param sql_pool:
  191. :param sql_product_list:
  192. :return:
  193. """
  194. log.debug(f"{inspect.currentframe().f_code.co_name} for live_id:{live_id}, .... page:{page}..........")
  195. url = "https://api.qiandao.cn/live/goods/list"
  196. params = {
  197. # "liveId": "883752924533007599",
  198. "liveId": live_id,
  199. "limit": "50",
  200. # "offset": "0"
  201. "offset": str((page - 1) * 50),
  202. }
  203. resp_json = utils.request_get_data(log, url, params)
  204. # print(resp_json)
  205. if resp_json.get("code") == 0:
  206. res_data = resp_json.get("data", {})
  207. totalCount = res_data.get("count")
  208. rows = res_data.get("rows", [])
  209. if rows:
  210. parse_product_data(log, rows, sql_pool, sql_product_list, live_id)
  211. return len(rows), totalCount
  212. else:
  213. return 0, 0
  214. else:
  215. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  216. return 0, 0
  217. def parse_product_data(log, items, sql_pool, sql_product_list, live_id):
  218. """
  219. 解析 live 商品数据
  220. :param log:
  221. :param items:
  222. :param sql_pool:
  223. :param sql_product_list:
  224. :param live_id:
  225. :return:
  226. """
  227. log.debug(f"{inspect.currentframe().f_code.co_name} parse product start, live_id:{live_id} ..........")
  228. info_list = []
  229. for row in items:
  230. row_id = row.get("id") # 883752962113930805
  231. goodsId = row.get("goodsId") # 883585812724201771
  232. if goodsId in sql_product_list:
  233. log.info(f"live_id:{live_id} goodsId:{goodsId} is exist, skip .......")
  234. continue
  235. goodsType = row.get("goodsType")
  236. goodsName = row.get("goodsName")
  237. goodsImages = row.get("goodsImages", {})
  238. if goodsImages:
  239. goods_image = goodsImages.get("coverImages", [])[0]
  240. else:
  241. goods_image = ""
  242. goods_status = row.get("status")
  243. price = row.get("price")
  244. isSoldOut = row.get("isSoldOut")
  245. data_dict = {
  246. "live_id": live_id,
  247. "row_id": row_id,
  248. "goods_id": goodsId,
  249. "goods_type": goodsType,
  250. "goods_name": goodsName,
  251. "goods_image": goods_image,
  252. "goods_status": goods_status,
  253. "price": price,
  254. "is_sold_out": isSoldOut
  255. }
  256. # print(data_dict)
  257. info_list.append(data_dict)
  258. sql_product_list.append(goodsId)
  259. if info_list:
  260. try:
  261. sql_pool.insert_many(table="qiandao_live_product_record", data_list=info_list)
  262. except Exception as e:
  263. log.warning(f"插入失败: {e}")
  264. # ----------------------------------------------------------------------------------------------------------------------
  265. # ----------------------------------------------------------------------------------------------------------------------
  266. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  267. def get_order_first_group(log, live_id, goods_id, sql_pool):
  268. """
  269. 获取第一种情况的购买记录 查询所需的 groupId
  270. :param log:
  271. :param live_id:
  272. :param goods_id:
  273. :param sql_pool:
  274. """
  275. log.debug(
  276. f"{inspect.currentframe().f_code.co_name} get groupId start, live_id:{live_id}, goods_id:{goods_id} ..........")
  277. url = "https://api.qiandao.cn/box/live-draw/group"
  278. params = {
  279. # "shelfId": "876650715072717042"
  280. "shelfId": goods_id
  281. }
  282. resp_json = utils.request_get_data(log, url, params)
  283. # print(resp_json)
  284. if resp_json.get("code") == 0:
  285. groupId = resp_json.get("data", {}).get("groupId")
  286. if groupId:
  287. # 需要先查询 groupId 最大的 trading_time, 防止数据重复
  288. max_trading_time = sql_pool.select_one(
  289. query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s",
  290. args=(goods_id,))
  291. max_trading_time = max_trading_time[0] if max_trading_time else None
  292. get_first_list(log, groupId, live_id, goods_id, sql_pool, max_trading_time)
  293. else:
  294. log.warning(f"live_id:{live_id}, 未获取到groupId............")
  295. else:
  296. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  297. def get_first_list(log, groupId, live_id, goods_id, sql_pool, max_trading_time):
  298. """
  299. 获取第一种情况的购买记录 列表 翻页
  300. :param log:
  301. :param groupId:
  302. :param live_id:
  303. :param goods_id:
  304. :param sql_pool:
  305. :param max_trading_time:
  306. :return:
  307. """
  308. page = 1
  309. max_page = 50
  310. # total_count = 0
  311. while page <= max_page:
  312. len_item = get_order_records_first(log, groupId, live_id, goods_id, sql_pool, page, max_trading_time)
  313. if len_item < 20:
  314. log.debug(f"--------------- page {page}, len_item: {len_item} ---------------")
  315. break
  316. page += 1
  317. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  318. def get_order_records_first(log, groupId, live_id, goods_id, sql_pool, page, max_trading_time):
  319. """
  320. 获取第一种情况的购买记录 详情见图 商品点开 直接是购买页面 "goodsType": "LIVE_DRAW",
  321. :param log:
  322. :param groupId:
  323. :param live_id:
  324. :param goods_id:
  325. :param sql_pool:
  326. :param page:
  327. :param max_trading_time:
  328. """
  329. log.debug(f"{inspect.currentframe().f_code.co_name} get order start, groupId:{groupId} ..........")
  330. url = "https://api.qiandao.cn/box/live-draw/draw-records"
  331. params = {
  332. "limit": "20",
  333. # "offset": "0",
  334. "offset": str((page - 1) * 20),
  335. # "groupId": "883762283266720281"
  336. "groupId": groupId
  337. }
  338. resp_json = utils.request_get_data(log, url, params)
  339. # print(resp_json)
  340. if resp_json.get("code") == 0:
  341. res_data = resp_json.get("data", {})
  342. records = res_data.get("records", [])
  343. if not records:
  344. log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no records")
  345. return 0
  346. info_list = []
  347. for row in records:
  348. status = row.get("status")
  349. drawResult = row.get("drawResult") # 抽到的号码
  350. userName = row.get("userName")
  351. trading_time = row.get("createdAt", "")
  352. trading_time_str = utils.transform_ms(log, trading_time)
  353. if not trading_time_str:
  354. log.debug(f"Invalid trading time: {trading_time_str}")
  355. continue # 跳过无效时间
  356. # 字符串 -> datetime
  357. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  358. if max_trading_time and trading_time <= max_trading_time:
  359. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  360. break
  361. isGoodProduct = row.get("isGoodProduct") # 是否欧皇号码
  362. data_dict = {
  363. "live_id": live_id,
  364. "goods_id": goods_id,
  365. "group_id": groupId,
  366. "status": status,
  367. "draw_result": drawResult,
  368. "user_name": userName,
  369. # "created_at": createdAt,
  370. "trading_time": trading_time_str,
  371. "is_good_product": isGoodProduct
  372. }
  373. # print(data_dict)
  374. info_list.append(data_dict)
  375. if info_list:
  376. try:
  377. sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list)
  378. except Exception as e:
  379. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  380. return len(records)
  381. else:
  382. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  383. return 0
  384. # ----------------------------------------------------------------------------------------------------------------------
  385. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  386. def get_order_records_second(log, live_id, goods_id, sql_pool):
  387. """
  388. 获取第二种情况的购买记录 详情见图 "goodsType": "MALL",
  389. :param log:
  390. :param live_id:
  391. :param goods_id:
  392. :param sql_pool:
  393. """
  394. log.debug(
  395. f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........")
  396. url = "https://api.qiandao.cn/mall/dynamic-detail"
  397. params = {
  398. # "shelfId": "873540181645335865",
  399. "shelfId": goods_id,
  400. "deliverPatterns": "0"
  401. }
  402. resp_json = utils.request_get_data(log, url, params)
  403. # print(resp_json)
  404. if resp_json.get("code") == 0:
  405. res_data = resp_json.get("data", {})
  406. rows = res_data.get("recentPurchase", {}).get("rows", [])
  407. if not rows:
  408. log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no rows")
  409. return
  410. # 需要先查询 groupId 最大的 trading_time, 防止数据重复
  411. max_trading_time = sql_pool.select_one(
  412. query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s",
  413. args=(goods_id,))
  414. max_trading_time = max_trading_time[0] if max_trading_time else None
  415. info_list = []
  416. for row in rows:
  417. productName = row.get("productName")
  418. productCount = row.get("productCount")
  419. trading_time = row.get("paidAt")
  420. trading_time_str = utils.transform_ms(log, trading_time)
  421. if not trading_time_str:
  422. log.debug(f"Invalid trading time: {trading_time_str}")
  423. continue # 跳过无效时间
  424. # 字符串 -> datetime
  425. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  426. if max_trading_time and trading_time <= max_trading_time:
  427. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  428. break
  429. userNickname = row.get("userNickname")
  430. data_dict = {
  431. "live_id": live_id,
  432. "goods_id": goods_id,
  433. "product_name": productName,
  434. "product_count": productCount,
  435. "trading_time": trading_time_str,
  436. "user_name": userNickname
  437. }
  438. # print(data_dict)
  439. info_list.append(data_dict)
  440. if info_list:
  441. try:
  442. sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list)
  443. except Exception as e:
  444. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  445. else:
  446. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  447. # ----------------------------------------------------------------------------------------------------------------------
  448. def get_third_list(log, live_id, goods_id, sql_pool):
  449. """
  450. 获取第三种情况的购买记录列表 翻页
  451. :param log:
  452. :param live_id:
  453. :param goods_id:
  454. :param sql_pool:
  455. """
  456. page = 1
  457. max_page = 50
  458. # total_count = 0
  459. # 需要先查询 groupId 最大的 trading_time, 防止数据重复
  460. max_trading_time = sql_pool.select_one(
  461. query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s",
  462. args=(goods_id,))
  463. max_trading_time = max_trading_time[0] if max_trading_time else None
  464. while page <= max_page:
  465. len_item = get_order_records_third(log, live_id, goods_id, sql_pool, page, max_trading_time)
  466. if len_item < 10:
  467. log.debug(f"--------------- page {page}, len_item: {len_item} ---------------")
  468. break
  469. page += 1
  470. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  471. def get_order_records_third(log, live_id, goods_id, sql_pool, page, max_trading_time):
  472. """
  473. 获取第三种情况的购买记录 详情见图 "goodsType": "UNBOX",
  474. :param log:
  475. :param live_id:
  476. :param goods_id:
  477. :param sql_pool:
  478. :param page:
  479. :param max_trading_time:
  480. """
  481. log.debug(
  482. f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........")
  483. url = "https://api.qiandao.cn/box/unbox/recent-purchase-record"
  484. params = {
  485. # "shelfId": "876400156344348863",
  486. "shelfId": goods_id,
  487. "limit": "10",
  488. # "offset": "0"
  489. "offset": str((page - 1) * 10)
  490. }
  491. resp_json = utils.request_get_data(log, url, params)
  492. # print(resp_json)
  493. if resp_json.get("code") == 0:
  494. res_data = resp_json.get("data", {})
  495. rows = res_data.get("rows", [])
  496. if not rows:
  497. log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no rows")
  498. return 0
  499. info_list = []
  500. for row in rows:
  501. productName = row.get("productName")
  502. productCount = row.get("productCount")
  503. trading_time = row.get("paidAt")
  504. trading_time_str = utils.transform_ms(log, trading_time)
  505. if not trading_time_str:
  506. log.debug(f"Invalid trading time: {trading_time_str}")
  507. continue # 跳过无效时间
  508. # 字符串 -> datetime
  509. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  510. if max_trading_time and trading_time <= max_trading_time:
  511. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  512. break
  513. userNickname = row.get("userNickname")
  514. orderId = row.get("orderId")
  515. data_dict = {
  516. "live_id": live_id,
  517. "goods_id": goods_id,
  518. "product_name": productName,
  519. "product_count": productCount,
  520. "trading_time": trading_time_str,
  521. "user_name": userNickname,
  522. "order_id": orderId
  523. }
  524. # print(data_dict)
  525. info_list.append(data_dict)
  526. if info_list:
  527. try:
  528. sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list)
  529. except Exception as e:
  530. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  531. return len(rows)
  532. else:
  533. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  534. return 0
  535. # ----------------------------------------------------------------------------------------------------------------------
  536. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  537. def get_order_records_fourth(log, live_id, goods_id, sql_pool):
  538. """
  539. 获取第四种情况的购买记录 详情见图 "goodsType": "AUCTION", auction_id = goodsId
  540. :param log:
  541. :param live_id:
  542. :param goods_id:
  543. :param sql_pool:
  544. """
  545. log.debug(
  546. f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........")
  547. url = "https://api.qiandao.cn/auctioneer/bid/list"
  548. params = {
  549. # "auction_id": "883831521360768262",
  550. "auction_id": goods_id,
  551. "max_results": "200"
  552. }
  553. resp_json = utils.request_get_data(log, url, params)
  554. # print(resp_json)
  555. if resp_json.get("code") == 0:
  556. res_data = resp_json.get("data", {})
  557. items = res_data.get("items", [])
  558. if not items:
  559. log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no items")
  560. return
  561. # 需要先查询 groupId 最大的 trading_time, 防止数据重复
  562. max_trading_time = sql_pool.select_one(
  563. query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s",
  564. args=(goods_id,))
  565. max_trading_time = max_trading_time[0] if max_trading_time else None
  566. info_list = []
  567. for item in items:
  568. user_id = item.get("user_id")
  569. nickname = item.get("nickname")
  570. price = item.get("price")
  571. trading_time = item.get("bid_time")
  572. trading_time_str = utils.transform_ms(log, trading_time)
  573. if not trading_time_str:
  574. log.debug(f"Invalid trading time: {trading_time_str}")
  575. continue # 跳过无效时间
  576. # 字符串 -> datetime
  577. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  578. if max_trading_time and trading_time <= max_trading_time:
  579. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  580. break
  581. hammered = item.get("hammered") # 是否最终成交
  582. data_dict = {
  583. "live_id": live_id,
  584. "goods_id": goods_id,
  585. "user_id": user_id,
  586. "user_name": nickname,
  587. "price": price,
  588. "trading_time": trading_time_str,
  589. "hammered": hammered
  590. }
  591. # print(data_dict)
  592. info_list.append(data_dict)
  593. if info_list:
  594. try:
  595. sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list)
  596. except Exception as e:
  597. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  598. else:
  599. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  600. def test():
  601. url = "https://api.qiandao.cn/auctioneer/bid/list"
  602. params = {
  603. "auction_id": "883831521360768262",
  604. "max_results": "200"
  605. }
  606. resp_json = utils.request_get_data(logger, url, params)
  607. print(resp_json)
  608. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  609. def list_main(log):
  610. """
  611. 主函数 列表页
  612. :param log: logger对象
  613. """
  614. log.info(
  615. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  616. # 配置 MySQL 连接池
  617. sql_pool = MySQLConnectionPool(log=log)
  618. if not sql_pool.check_pool_health():
  619. log.error("数据库连接池异常")
  620. raise RuntimeError("数据库连接池异常")
  621. try:
  622. sql_live_id_list = sql_pool.select_all("SELECT live_id FROM qiandao_live_list_record")
  623. sql_live_id_list = [item[0] for item in sql_live_id_list]
  624. get_live_list(log, sql_pool, sql_live_id_list)
  625. time.sleep(10)
  626. sql_pid_list = sql_pool.select_all("SELECT goods_id FROM qiandao_live_product_record")
  627. sql_pid_list = [item[0] for item in sql_pid_list]
  628. for live_id in sql_live_id_list:
  629. try:
  630. get_live_product_list(log, live_id, sql_pool, sql_pid_list)
  631. except Exception as e:
  632. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  633. except Exception as e:
  634. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  635. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  636. def order_main(log):
  637. """
  638. 主函数 详情页
  639. :param log: logger对象
  640. """
  641. log.info(
  642. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  643. # 配置 MySQL 连接池
  644. sql_pool = MySQLConnectionPool(log=log)
  645. if not sql_pool.check_pool_health():
  646. log.error("数据库连接池异常")
  647. raise RuntimeError("数据库连接池异常")
  648. try:
  649. # LIVE_DRAW MALL UNBOX AUCTION
  650. # 查询第一种情况
  651. sql_first_list = sql_pool.select_all(
  652. "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'LIVE_DRAW'")
  653. # sql_first_list = [item[0] for item in sql_first_list]
  654. for live_id, goods_id in sql_first_list:
  655. try:
  656. get_order_first_group(log, live_id, goods_id, sql_pool)
  657. except Exception as e:
  658. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  659. # 查询第二种情况
  660. sql_second_list = sql_pool.select_all(
  661. "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'MALL'")
  662. # sql_second_list = [item[0] for item in sql_second_list]
  663. for live_id, goods_id in sql_second_list:
  664. try:
  665. get_order_records_second(log, live_id, goods_id, sql_pool)
  666. except Exception as e:
  667. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  668. # 查询第三种情况
  669. sql_third_list = sql_pool.select_all(
  670. "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'UNBOX'")
  671. # sql_third_list = [item[0] for item in sql_third_list]
  672. for live_id, goods_id in sql_third_list:
  673. try:
  674. get_third_list(log, live_id, goods_id, sql_pool)
  675. except Exception as e:
  676. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  677. # 查询第四种情况
  678. sql_fourth_list = sql_pool.select_all(
  679. "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'AUCTION'")
  680. # sql_fourth_list = [item[0] for item in sql_fourth_list]
  681. for live_id, goods_id in sql_fourth_list:
  682. try:
  683. get_order_records_fourth(log, live_id, goods_id, sql_pool)
  684. except Exception as e:
  685. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  686. except Exception as e:
  687. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  688. def run_threaded(job_func, *args, **kwargs):
  689. """
  690. 在新线程中运行给定的函数,并传递参数。
  691. :param job_func: 要运行的目标函数
  692. :param args: 位置参数
  693. :param kwargs: 关键字参数
  694. """
  695. job_thread = threading.Thread(target=job_func, args=args, kwargs=kwargs)
  696. job_thread.start()
  697. def schedule_task():
  698. """
  699. 爬虫模块 定时任务 的启动文件
  700. """
  701. # 立即运行一次任务
  702. list_main(log=logger)
  703. # order_main(log=logger)
  704. # 设置定时任务
  705. schedule.every(30).minutes.do(run_threaded, list_main, log=logger)
  706. schedule.every().day.at("01:06").do(run_threaded, order_main, log=logger)
  707. while True:
  708. schedule.run_pending()
  709. time.sleep(1)
  710. if __name__ == '__main__':
  711. # get_live_list(logger, None, [])
  712. # get_live_detail(logger, "883752924533007599")
  713. # get_live_product_list(logger, "883752924533007599", None, [])
  714. # test()
  715. schedule_task()