live_spider.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/7/8 16:51
  5. import threading
  6. import time
  7. from datetime import datetime
  8. import utils
  9. import inspect
  10. import schedule
  11. from loguru import logger
  12. from mysql_pool import MySQLConnectionPool
  13. from tenacity import retry, stop_after_attempt, wait_fixed
  14. logger.remove()
  15. logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  16. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  17. level="DEBUG", retention="7 day")
  18. def after_log(retry_state):
  19. """
  20. retry 回调
  21. :param retry_state: RetryCallState 对象
  22. """
  23. # 检查 args 是否存在且不为空
  24. if retry_state.args and len(retry_state.args) > 0:
  25. log = retry_state.args[0] # 获取传入的 logger
  26. else:
  27. log = logger # 使用全局 logger
  28. if retry_state.outcome.failed:
  29. log.warning(
  30. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  31. else:
  32. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  33. def get_live_list(log, sql_pool, sql_live_id_list):
  34. """
  35. 获取 live 数据列表
  36. :param log:
  37. :param sql_pool:
  38. :param sql_live_id_list:
  39. :return:
  40. """
  41. log.debug(f"{inspect.currentframe().f_code.co_name} crawl start ..........")
  42. page = 1
  43. max_page = 50
  44. total_count = 0
  45. while page <= max_page:
  46. len_item, totalCount = get_live_one_page(log, page, sql_pool, sql_live_id_list)
  47. if len_item < 20:
  48. log.debug(f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount} ---------------")
  49. break
  50. total_count += len_item
  51. if total_count >= int(totalCount):
  52. log.debug(f"total_count: {total_count} totalCount: {totalCount}")
  53. break
  54. page += 1
  55. # time.sleep(random.uniform(0.1, 1))
  56. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  57. def get_live_one_page(log, page, sql_pool, sql_live_id_list):
  58. """
  59. 获取 live 单页数据
  60. :param log:
  61. :param page:
  62. :param sql_pool:
  63. :param sql_live_id_list:
  64. :return:
  65. """
  66. log.debug(f"{inspect.currentframe().f_code.co_name} for page:{page}..........")
  67. url = "https://api.qiandao.com/live/list"
  68. data = {
  69. "limit": 20,
  70. # "offset": 0,
  71. "offset": (page - 1) * 20,
  72. "status": [
  73. "PREPARE",
  74. "UNDERWAY"
  75. ],
  76. "type": ""
  77. }
  78. resp_json = utils.request_post_data(logger, url, data)
  79. # print(resp_json)
  80. if not resp_json:
  81. log.error("get_luckybag_one_page error")
  82. raise RuntimeError("get_luckybag_one_page error")
  83. rows = resp_json.get("data", {}).get("rows", [])
  84. total_count = resp_json.get("data", {}).get("count", 0)
  85. try:
  86. parse_live_data(log, rows, sql_pool, sql_live_id_list)
  87. except Exception as e:
  88. log.error(f"parse_live_data error: {e}")
  89. return len(rows), total_count
  90. def parse_live_data(log, rows, sql_pool, sql_live_id_list):
  91. """
  92. 解析 live 数据
  93. :param log:
  94. :param rows:
  95. :param sql_pool:
  96. :param sql_live_id_list:
  97. :return:
  98. """
  99. live_list = []
  100. for row in rows:
  101. live_id = row.get("id")
  102. if live_id in sql_live_id_list:
  103. log.info(f"live_id:{live_id} is exist, skip .......")
  104. continue
  105. ownerId = row.get("ownerId")
  106. title = row.get("title")
  107. images = row.get("images", {}).get("coverImages", [])[0]
  108. startTime = row.get("startTime")
  109. startTime = utils.transform_ms(log, startTime)
  110. endTime = row.get("endTime")
  111. if endTime != '0':
  112. endTime = utils.transform_ms(log, endTime)
  113. status = row.get("status")
  114. pullUrl = row.get("pullUrl")
  115. createdAt = row.get("createdAt")
  116. updatedAt = row.get("updatedAt")
  117. user_id = row.get("user", {}).get("id")
  118. officialUserId = row.get("user", {}).get("officialUserId")
  119. nickname = row.get("user", {}).get("nickname")
  120. live_type = row.get("type")
  121. live_dict = {
  122. "live_id": live_id,
  123. "owner_id": ownerId,
  124. "title": title,
  125. "images": images,
  126. "start_time": startTime,
  127. "end_time": endTime,
  128. "status": status,
  129. "pull_url": pullUrl,
  130. "created_at": createdAt,
  131. "updated_at": updatedAt,
  132. "user_id": user_id,
  133. "official_user_id": officialUserId,
  134. "nickname": nickname,
  135. "live_type": live_type
  136. }
  137. # print(live_dict)
  138. live_list.append(live_dict)
  139. sql_live_id_list.append(live_id)
  140. # print(live_list)
  141. if live_list:
  142. try:
  143. sql_pool.insert_many(table="qiandao_live_list_record", data_list=live_list)
  144. except Exception as e:
  145. log.error(f"insert_many error: {e}")
  146. # def get_live_detail(log, live_id):
  147. # url = "https://api.qiandao.cn/live/detail"
  148. # params = {
  149. # "liveId": "883752924533007599"
  150. # }
  151. # resp_json = utils.request_get_data(log, url, params)
  152. # print(resp_json)
  153. # if resp_json.get("code") == 0:
  154. # res_data = resp_json.get("data", {})
  155. # ownerId = res_data.get("ownerId")
  156. # title = res_data.get("title")
  157. # else:
  158. # log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  159. def get_live_product_list(log, live_id, sql_pool):
  160. """
  161. 获取 live 商品列表
  162. :param log:
  163. :param live_id:
  164. :param sql_pool:
  165. :return:
  166. """
  167. page = 1
  168. max_page = 10
  169. total_count = 0
  170. while page <= max_page:
  171. len_item, totalCount = get_product_single_page(log, live_id, page, sql_pool)
  172. if len_item < 50:
  173. log.debug(
  174. f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount}, Less than 50, break ---------------")
  175. break
  176. total_count += len_item
  177. if total_count >= int(totalCount):
  178. log.debug(f"total_count: {total_count} totalCount: {totalCount}")
  179. break
  180. page += 1
  181. # time.sleep(random.uniform(0.1, 1))
  182. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  183. def get_product_single_page(log, live_id, page, sql_pool):
  184. """
  185. 获取 live 商品列表单页数据
  186. :param log:
  187. :param live_id:
  188. :param page:
  189. :param sql_pool:
  190. :return:
  191. """
  192. log.debug(f"{inspect.currentframe().f_code.co_name} for live_id:{live_id}, .... page:{page}..........")
  193. url = "https://api.qiandao.cn/live/goods/list"
  194. params = {
  195. # "liveId": "883752924533007599",
  196. "liveId": live_id,
  197. "limit": "50",
  198. # "offset": "0"
  199. "offset": str((page - 1) * 50),
  200. }
  201. resp_json = utils.request_get_data(log, url, params,proxy=False)
  202. # print(resp_json)
  203. if resp_json.get("code") == 0:
  204. res_data = resp_json.get("data", {})
  205. totalCount = res_data.get("count")
  206. rows = res_data.get("rows", [])
  207. if rows:
  208. parse_product_data(log, rows, sql_pool, live_id)
  209. return len(rows), totalCount
  210. else:
  211. return 0, 0
  212. else:
  213. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  214. return 0, 0
  215. def parse_product_data(log, items, sql_pool, live_id):
  216. """
  217. 解析 live 商品数据
  218. :param log:
  219. :param items:
  220. :param sql_pool:
  221. :param live_id:
  222. :return:
  223. """
  224. log.debug(f"{inspect.currentframe().f_code.co_name} parse product start, live_id:{live_id} ..........")
  225. info_list = []
  226. for row in items:
  227. row_id = row.get("id") # 883752962113930805
  228. goodsId = row.get("goodsId") # 883585812724201771
  229. # sql 查询goodsId是否在库中
  230. # if goodsId in sql_product_list:
  231. # log.info(f"live_id:{live_id} goodsId:{goodsId} is exist, skip .......")
  232. # continue
  233. goodsType = row.get("goodsType")
  234. goodsName = row.get("goodsName")
  235. goodsImages = row.get("goodsImages", {})
  236. if goodsImages:
  237. goods_image = goodsImages.get("coverImages", [])[0]
  238. else:
  239. goods_image = ""
  240. goods_status = row.get("status")
  241. price = row.get("price")
  242. isSoldOut = row.get("isSoldOut")
  243. # data_dict = {
  244. # "live_id": live_id,
  245. # "row_id": row_id,
  246. # "goods_id": goodsId,
  247. # "goods_type": goodsType,
  248. # "goods_name": goodsName,
  249. # "goods_image": goods_image,
  250. # "goods_status": goods_status,
  251. # "price": price,
  252. # "is_sold_out": isSoldOut
  253. # }
  254. data_ = (live_id, row_id, goodsId, goodsType, goodsName, goods_image, goods_status, price, isSoldOut)
  255. # print(data_dict)
  256. info_list.append(data_)
  257. # sql_product_list.append(goodsId)
  258. if info_list:
  259. try:
  260. sql = "INSERT IGNORE INTO qiandao_live_product_record (live_id, row_id, goods_id, goods_type, goods_name, goods_image, goods_status, price, is_sold_out) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"
  261. sql_pool.insert_all(sql, info_list)
  262. # sql_pool.insert_all(table="qiandao_live_product_record", data_list=info_list)
  263. except Exception as e:
  264. log.warning(f"插入失败: {e}")
  265. # ----------------------------------------------------------------------------------------------------------------------
  266. # ----------------------------------------------------------------------------------------------------------------------
  267. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  268. def get_order_first_group(log, live_id, goods_id, sql_pool):
  269. """
  270. 获取第一种情况的购买记录 查询所需的 groupId
  271. :param log:
  272. :param live_id:
  273. :param goods_id:
  274. :param sql_pool:
  275. """
  276. log.debug(
  277. f"{inspect.currentframe().f_code.co_name} get groupId start, live_id:{live_id}, goods_id:{goods_id} ..........")
  278. url = "https://api.qiandao.cn/box/live-draw/group"
  279. params = {
  280. # "shelfId": "876650715072717042"
  281. "shelfId": goods_id
  282. }
  283. resp_json = utils.request_get_data(log, url, params)
  284. # print(resp_json)
  285. if resp_json.get("code") == 0:
  286. groupId = resp_json.get("data", {}).get("groupId")
  287. if groupId:
  288. # 需要先查询 groupId 最大的 trading_time, 防止数据重复
  289. max_trading_time = sql_pool.select_one(
  290. query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s",
  291. args=(goods_id,))
  292. max_trading_time = max_trading_time[0] if max_trading_time else None
  293. get_first_list(log, groupId, live_id, goods_id, sql_pool, max_trading_time)
  294. else:
  295. log.warning(f"live_id:{live_id}, 未获取到groupId............")
  296. else:
  297. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  298. def get_first_list(log, groupId, live_id, goods_id, sql_pool, max_trading_time):
  299. """
  300. 获取第一种情况的购买记录 列表 翻页
  301. :param log:
  302. :param groupId:
  303. :param live_id:
  304. :param goods_id:
  305. :param sql_pool:
  306. :param max_trading_time:
  307. :return:
  308. """
  309. page = 1
  310. max_page = 50
  311. # total_count = 0
  312. while page <= max_page:
  313. len_item = get_order_records_first(log, groupId, live_id, goods_id, sql_pool, page, max_trading_time)
  314. if len_item < 20:
  315. log.debug(f"--------------- page {page}, len_item: {len_item} ---------------")
  316. break
  317. page += 1
  318. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  319. def get_order_records_first(log, groupId, live_id, goods_id, sql_pool, page, max_trading_time):
  320. """
  321. 获取第一种情况的购买记录 详情见图 商品点开 直接是购买页面 "goodsType": "LIVE_DRAW",
  322. :param log:
  323. :param groupId:
  324. :param live_id:
  325. :param goods_id:
  326. :param sql_pool:
  327. :param page:
  328. :param max_trading_time:
  329. """
  330. log.debug(f"{inspect.currentframe().f_code.co_name} get order start, groupId:{groupId} ..........")
  331. url = "https://api.qiandao.cn/box/live-draw/draw-records"
  332. params = {
  333. "limit": "20",
  334. # "offset": "0",
  335. "offset": str((page - 1) * 20),
  336. # "groupId": "883762283266720281"
  337. "groupId": groupId
  338. }
  339. resp_json = utils.request_get_data(log, url, params)
  340. # print(resp_json)
  341. if resp_json.get("code") == 0:
  342. res_data = resp_json.get("data", {})
  343. records = res_data.get("records", [])
  344. if not records:
  345. log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no records")
  346. return 0
  347. info_list = []
  348. for row in records:
  349. status = row.get("status")
  350. drawResult = row.get("drawResult") # 抽到的号码
  351. userName = row.get("userName")
  352. trading_time = row.get("createdAt", "")
  353. trading_time_str = utils.transform_ms(log, trading_time)
  354. if not trading_time_str:
  355. log.debug(f"Invalid trading time: {trading_time_str}")
  356. continue # 跳过无效时间
  357. # 字符串 -> datetime
  358. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  359. if max_trading_time and trading_time <= max_trading_time:
  360. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  361. break
  362. isGoodProduct = row.get("isGoodProduct") # 是否欧皇号码
  363. data_dict = {
  364. "live_id": live_id,
  365. "goods_id": goods_id,
  366. "group_id": groupId,
  367. "status": status,
  368. "draw_result": drawResult,
  369. "user_name": userName,
  370. # "created_at": createdAt,
  371. "trading_time": trading_time_str,
  372. "is_good_product": isGoodProduct
  373. }
  374. # print(data_dict)
  375. info_list.append(data_dict)
  376. if info_list:
  377. try:
  378. sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list)
  379. except Exception as e:
  380. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  381. return len(records)
  382. else:
  383. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  384. return 0
  385. # ----------------------------------------------------------------------------------------------------------------------
  386. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  387. def get_order_records_second(log, live_id, goods_id, sql_pool):
  388. """
  389. 获取第二种情况的购买记录 详情见图 "goodsType": "MALL",
  390. :param log:
  391. :param live_id:
  392. :param goods_id:
  393. :param sql_pool:
  394. """
  395. log.debug(
  396. f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........")
  397. url = "https://api.qiandao.cn/mall/dynamic-detail"
  398. params = {
  399. # "shelfId": "873540181645335865",
  400. "shelfId": goods_id,
  401. "deliverPatterns": "0"
  402. }
  403. resp_json = utils.request_get_data(log, url, params)
  404. # print(resp_json)
  405. if resp_json.get("code") == 0:
  406. res_data = resp_json.get("data", {})
  407. rows = res_data.get("recentPurchase", {}).get("rows", [])
  408. if not rows:
  409. log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no rows")
  410. return
  411. # 需要先查询 groupId 最大的 trading_time, 防止数据重复
  412. max_trading_time = sql_pool.select_one(
  413. query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s",
  414. args=(goods_id,))
  415. max_trading_time = max_trading_time[0] if max_trading_time else None
  416. info_list = []
  417. for row in rows:
  418. productName = row.get("productName")
  419. productCount = row.get("productCount")
  420. trading_time = row.get("paidAt")
  421. trading_time_str = utils.transform_ms(log, trading_time)
  422. if not trading_time_str:
  423. log.debug(f"Invalid trading time: {trading_time_str}")
  424. continue # 跳过无效时间
  425. # 字符串 -> datetime
  426. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  427. if max_trading_time and trading_time <= max_trading_time:
  428. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  429. break
  430. userNickname = row.get("userNickname")
  431. data_dict = {
  432. "live_id": live_id,
  433. "goods_id": goods_id,
  434. "product_name": productName,
  435. "product_count": productCount,
  436. "trading_time": trading_time_str,
  437. "user_name": userNickname
  438. }
  439. # print(data_dict)
  440. info_list.append(data_dict)
  441. if info_list:
  442. try:
  443. sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list)
  444. except Exception as e:
  445. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  446. else:
  447. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  448. # ----------------------------------------------------------------------------------------------------------------------
  449. def get_third_list(log, live_id, goods_id, sql_pool):
  450. """
  451. 获取第三种情况的购买记录列表 翻页
  452. :param log:
  453. :param live_id:
  454. :param goods_id:
  455. :param sql_pool:
  456. """
  457. page = 1
  458. max_page = 50
  459. # total_count = 0
  460. # 需要先查询 groupId 最大的 trading_time, 防止数据重复
  461. max_trading_time = sql_pool.select_one(
  462. query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s",
  463. args=(goods_id,))
  464. max_trading_time = max_trading_time[0] if max_trading_time else None
  465. while page <= max_page:
  466. len_item = get_order_records_third(log, live_id, goods_id, sql_pool, page, max_trading_time)
  467. if len_item < 10:
  468. log.debug(f"--------------- page {page}, len_item: {len_item} ---------------")
  469. break
  470. page += 1
  471. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  472. def get_order_records_third(log, live_id, goods_id, sql_pool, page, max_trading_time):
  473. """
  474. 获取第三种情况的购买记录 详情见图 "goodsType": "UNBOX",
  475. :param log:
  476. :param live_id:
  477. :param goods_id:
  478. :param sql_pool:
  479. :param page:
  480. :param max_trading_time:
  481. """
  482. log.debug(
  483. f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........")
  484. url = "https://api.qiandao.cn/box/unbox/recent-purchase-record"
  485. params = {
  486. # "shelfId": "876400156344348863",
  487. "shelfId": goods_id,
  488. "limit": "10",
  489. # "offset": "0"
  490. "offset": str((page - 1) * 10)
  491. }
  492. resp_json = utils.request_get_data(log, url, params)
  493. # print(resp_json)
  494. if resp_json.get("code") == 0:
  495. res_data = resp_json.get("data", {})
  496. rows = res_data.get("rows", [])
  497. if not rows:
  498. log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no rows")
  499. return 0
  500. info_list = []
  501. for row in rows:
  502. productName = row.get("productName")
  503. productCount = row.get("productCount")
  504. trading_time = row.get("paidAt")
  505. trading_time_str = utils.transform_ms(log, trading_time)
  506. if not trading_time_str:
  507. log.debug(f"Invalid trading time: {trading_time_str}")
  508. continue # 跳过无效时间
  509. # 字符串 -> datetime
  510. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  511. if max_trading_time and trading_time <= max_trading_time:
  512. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  513. break
  514. userNickname = row.get("userNickname")
  515. orderId = row.get("orderId")
  516. data_dict = {
  517. "live_id": live_id,
  518. "goods_id": goods_id,
  519. "product_name": productName,
  520. "product_count": productCount,
  521. "trading_time": trading_time_str,
  522. "user_name": userNickname,
  523. "order_id": orderId
  524. }
  525. # print(data_dict)
  526. info_list.append(data_dict)
  527. if info_list:
  528. try:
  529. sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list)
  530. except Exception as e:
  531. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  532. return len(rows)
  533. else:
  534. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  535. return 0
  536. # ----------------------------------------------------------------------------------------------------------------------
  537. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  538. def get_order_records_fourth(log, live_id, goods_id, sql_pool):
  539. """
  540. 获取第四种情况的购买记录 详情见图 "goodsType": "AUCTION", auction_id = goodsId
  541. :param log:
  542. :param live_id:
  543. :param goods_id:
  544. :param sql_pool:
  545. """
  546. log.debug(
  547. f"{inspect.currentframe().f_code.co_name} get order start, live_id:{live_id}, goods_id:{goods_id} ..........")
  548. url = "https://api.qiandao.cn/auctioneer/bid/list"
  549. params = {
  550. # "auction_id": "883831521360768262",
  551. "auction_id": goods_id,
  552. "max_results": "200"
  553. }
  554. resp_json = utils.request_get_data(log, url, params)
  555. # print(resp_json)
  556. if resp_json.get("code") == 0:
  557. res_data = resp_json.get("data", {})
  558. items = res_data.get("items", [])
  559. if not items:
  560. log.debug(f"{inspect.currentframe().f_code.co_name}, live_id:{live_id} no items")
  561. return
  562. # 需要先查询 groupId 最大的 trading_time, 防止数据重复
  563. max_trading_time = sql_pool.select_one(
  564. query="SELECT MAX(trading_time) FROM qiandao_live_order_record WHERE goods_id = %s",
  565. args=(goods_id,))
  566. max_trading_time = max_trading_time[0] if max_trading_time else None
  567. info_list = []
  568. for item in items:
  569. user_id = item.get("user_id")
  570. nickname = item.get("nickname")
  571. price = item.get("price")
  572. trading_time = item.get("bid_time")
  573. trading_time_str = utils.transform_ms(log, trading_time)
  574. if not trading_time_str:
  575. log.debug(f"Invalid trading time: {trading_time_str}")
  576. continue # 跳过无效时间
  577. # 字符串 -> datetime
  578. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  579. if max_trading_time and trading_time <= max_trading_time:
  580. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  581. break
  582. hammered = item.get("hammered") # 是否最终成交
  583. data_dict = {
  584. "live_id": live_id,
  585. "goods_id": goods_id,
  586. "user_id": user_id,
  587. "user_name": nickname,
  588. "price": price,
  589. "trading_time": trading_time_str,
  590. "hammered": hammered
  591. }
  592. # print(data_dict)
  593. info_list.append(data_dict)
  594. if info_list:
  595. try:
  596. sql_pool.insert_many(table="qiandao_live_order_record", data_list=info_list)
  597. except Exception as e:
  598. log.error(f" {inspect.currentframe().f_code.co_name} 插入失败:", e)
  599. else:
  600. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  601. def test():
  602. url = "https://api.qiandao.cn/auctioneer/bid/list"
  603. params = {
  604. "auction_id": "883831521360768262",
  605. "max_results": "200"
  606. }
  607. resp_json = utils.request_get_data(logger, url, params)
  608. print(resp_json)
  609. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  610. def list_main(log):
  611. """
  612. 主函数 列表页
  613. :param log: logger对象
  614. """
  615. log.info(
  616. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  617. # 配置 MySQL 连接池
  618. sql_pool = MySQLConnectionPool(log=log)
  619. if not sql_pool.check_pool_health():
  620. log.error("数据库连接池异常")
  621. raise RuntimeError("数据库连接池异常")
  622. try:
  623. sql_live_id_list = sql_pool.select_all("SELECT live_id FROM qiandao_live_list_record")
  624. sql_live_id_list = [item[0] for item in sql_live_id_list]
  625. get_live_list(log, sql_pool, sql_live_id_list)
  626. time.sleep(10)
  627. # sql_pid_list = sql_pool.select_all("SELECT DISTINCT goods_id FROM qiandao_live_product_record")
  628. # sql_pid_list = [item[0] for item in sql_pid_list]
  629. for live_id in sql_live_id_list:
  630. try:
  631. get_live_product_list(log, live_id, sql_pool)
  632. except Exception as e:
  633. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  634. # sql_pid_list.clear()
  635. except Exception as e:
  636. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  637. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  638. def order_main(log):
  639. """
  640. 主函数 详情页
  641. :param log: logger对象
  642. """
  643. log.info(
  644. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  645. # 配置 MySQL 连接池
  646. sql_pool = MySQLConnectionPool(log=log)
  647. if not sql_pool.check_pool_health():
  648. log.error("数据库连接池异常")
  649. raise RuntimeError("数据库连接池异常")
  650. try:
  651. # LIVE_DRAW MALL UNBOX AUCTION
  652. # 查询第一种情况
  653. sql_first_list = sql_pool.select_all(
  654. "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'LIVE_DRAW'")
  655. # sql_first_list = [item[0] for item in sql_first_list]
  656. for live_id, goods_id in sql_first_list:
  657. try:
  658. get_order_first_group(log, live_id, goods_id, sql_pool)
  659. except Exception as e:
  660. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  661. # 查询第二种情况
  662. sql_second_list = sql_pool.select_all(
  663. "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'MALL'")
  664. # sql_second_list = [item[0] for item in sql_second_list]
  665. for live_id, goods_id in sql_second_list:
  666. try:
  667. get_order_records_second(log, live_id, goods_id, sql_pool)
  668. except Exception as e:
  669. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  670. # 查询第三种情况
  671. sql_third_list = sql_pool.select_all(
  672. "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'UNBOX'")
  673. # sql_third_list = [item[0] for item in sql_third_list]
  674. for live_id, goods_id in sql_third_list:
  675. try:
  676. get_third_list(log, live_id, goods_id, sql_pool)
  677. except Exception as e:
  678. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  679. # 查询第四种情况
  680. sql_fourth_list = sql_pool.select_all(
  681. "SELECT live_id, goods_id FROM qiandao_live_product_record WHERE goods_type = 'AUCTION'")
  682. # sql_fourth_list = [item[0] for item in sql_fourth_list]
  683. for live_id, goods_id in sql_fourth_list:
  684. try:
  685. get_order_records_fourth(log, live_id, goods_id, sql_pool)
  686. except Exception as e:
  687. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  688. except Exception as e:
  689. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  690. def run_threaded(job_func, *args, **kwargs):
  691. """
  692. 在新线程中运行给定的函数,并传递参数。
  693. :param job_func: 要运行的目标函数
  694. :param args: 位置参数
  695. :param kwargs: 关键字参数
  696. """
  697. job_thread = threading.Thread(target=job_func, args=args, kwargs=kwargs)
  698. job_thread.start()
  699. def schedule_task():
  700. """
  701. 爬虫模块 定时任务 的启动文件
  702. """
  703. # 立即运行一次任务
  704. list_main(log=logger)
  705. # order_main(log=logger)
  706. # 设置定时任务
  707. schedule.every(30).minutes.do(run_threaded, list_main, log=logger)
  708. schedule.every().day.at("01:06").do(run_threaded, order_main, log=logger)
  709. while True:
  710. schedule.run_pending()
  711. time.sleep(1)
  712. if __name__ == '__main__':
  713. # get_live_list(logger, None, [])
  714. # get_live_detail(logger, "883752924533007599")
  715. # get_live_product_list(logger, "883752924533007599", None, [])
  716. # test()
  717. schedule_task()