抽卡机补充.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/6/30 19:35
  5. import time
  6. from datetime import datetime
  7. import utils
  8. import random
  9. import inspect
  10. import schedule
  11. from loguru import logger
  12. from mysql_pool import MySQLConnectionPool
  13. from tenacity import retry, stop_after_attempt, wait_fixed
  14. logger.remove()
  15. logger.add("./logs/chouka_add_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  16. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  17. level="DEBUG", retention="7 day")
  18. def after_log(retry_state):
  19. """
  20. retry 回调
  21. :param retry_state: RetryCallState 对象
  22. """
  23. # 检查 args 是否存在且不为空
  24. if retry_state.args and len(retry_state.args) > 0:
  25. log = retry_state.args[0] # 获取传入的 logger
  26. else:
  27. log = logger # 使用全局 logger
  28. if retry_state.outcome.failed:
  29. log.warning(
  30. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  31. else:
  32. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  33. def get_draw_card_list(log, sql_pool):
  34. """
  35. 获取 抽卡机 列表页信息
  36. :param log:
  37. :param sql_pool:
  38. """
  39. page = 1
  40. max_page = 500
  41. total_count = 0
  42. sql_pid_list = sql_pool.select_all("SELECT pid FROM qiandao_mall_draw_card_list_record")
  43. sql_pid_list = [item[0] for item in sql_pid_list]
  44. # sql_pid_list = []
  45. while page <= max_page:
  46. len_item, totalCount = draw_card_one_page(log, page, sql_pool, sql_pid_list)
  47. if len_item < 20:
  48. log.debug(
  49. f"--------------- page {page}, len_item: {len_item}, totalCount: {totalCount}, break !!! ---------------")
  50. break
  51. total_count += len_item
  52. if total_count >= int(totalCount):
  53. log.debug(f"total_count: {total_count} >= totalCount: {totalCount}, break...........")
  54. break
  55. page += 1
  56. time.sleep(random.uniform(0.1, 1))
  57. sql_pid_list.clear()
  58. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  59. def draw_card_one_page(log, page, sql_pool, sql_pid_list):
  60. """
  61. 获取 抽卡机 单页信息
  62. :param log:
  63. :param page:
  64. :param sql_pool:
  65. :param sql_pid_list:
  66. """
  67. log.debug(f"--------------- {inspect.currentframe().f_code.co_name}, page: {page} ---------------")
  68. url = "https://api.qiandao.com/shelf-web/list-home-recommender"
  69. params = {
  70. # "offset": "0",
  71. "offset": str((page - 1) * 20),
  72. "limit": "20",
  73. "cmd": "b2c_homepage_feed",
  74. "name": "cgmarket",
  75. "project": "channel",
  76. "type": "LIVE_GROUPON",
  77. "typeIds": [1443323],
  78. "withLive": "true",
  79. "tagIds": [1701855],
  80. "quickCouponTemplateId": "0"
  81. }
  82. resp_json = utils.request_get_data(log, url, params)
  83. # print(resp_json)
  84. rows = resp_json["data"]["rows"]
  85. parse_draw_card_data(log, rows, sql_pool, sql_pid_list)
  86. len_rows = len(rows)
  87. total_count = resp_json["data"]["count"]
  88. return len_rows, total_count
  89. def parse_draw_card_data(log, resp_json, sql_pool, sql_pid_list):
  90. log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
  91. info_list = []
  92. for item in resp_json:
  93. # print(item)
  94. pid = item.get("id")
  95. if pid in sql_pid_list:
  96. log.debug(f"{inspect.currentframe().f_code.co_name}, pid: {pid} already exists")
  97. continue
  98. p_type = item.get("type")
  99. orgId = item.get("orgId")
  100. title = item.get("name")
  101. # description = item.get("description")
  102. isSoldOut = item.get("isSoldOut")
  103. price = item.get("price", {}).get("unitPriceOfCash")
  104. soldAmountText = item.get("soldAmountText")
  105. nickname = item.get("org", {}).get("nickname", '')
  106. info_dict = {
  107. "pid": pid,
  108. "p_type": p_type,
  109. "title": title,
  110. "price": price,
  111. "sold_amount_text": soldAmountText,
  112. "org_id": orgId,
  113. "nickname": nickname,
  114. # "description": description,
  115. "is_sold_out": isSoldOut
  116. }
  117. # print(info_dict)
  118. sql_pid_list.append(pid)
  119. info_list.append(info_dict)
  120. if info_list:
  121. # sql_pool.insert_many(table="qiandao_mall_draw_card_list_record", data_list=info_list)
  122. query = """
  123. INSERT INTO qiandao_mall_draw_card_list_record
  124. (pid, p_type, title, price, sold_amount_text, org_id, nickname, is_sold_out)
  125. VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
  126. ON DUPLICATE KEY UPDATE
  127. p_type = VALUES(p_type),
  128. title = VALUES(title),
  129. price = VALUES(price),
  130. sold_amount_text = VALUES(sold_amount_text),
  131. org_id = VALUES(org_id),
  132. nickname = VALUES(nickname),
  133. is_sold_out = VALUES(is_sold_out)
  134. """
  135. args_list = [(item['pid'], item['p_type'], item['title'], item['price'], item['sold_amount_text'],
  136. item['org_id'], item['nickname'], item['is_sold_out']) for item in info_list]
  137. sql_pool.insert_many(query=query, args_list=args_list)
  138. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  139. def get_draw_card_details(log, sql_pool, pid):
  140. """
  141. 获取 抽卡机 详情页信息
  142. :param log:
  143. :param sql_pool:
  144. :param pid:
  145. :return:
  146. {'pid': '867827029033121899', 'p_type': 'GACHA', 'org_id': '710150753377039896', 'title': '0.01/包 灵焰包第2弹 可合成 累计抽卡领好礼', 'price': 1.4, 'sold_amount_text': '已售2000+', 'nickname': '哇咔咔', 'is_sold_out': False}
  147. """
  148. url = "https://api.qiandao.com/b2c-web/v1/gacha/query/detail"
  149. params = {
  150. "shelfId": "867827029033121899"
  151. }
  152. resp_json = utils.request_get_data(log, url, params)
  153. # print(resp_json)
  154. if resp_json.get("code") == 0:
  155. parse_draw_card_details_data(log, resp_json, pid, sql_pool)
  156. else:
  157. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  158. def parse_draw_card_details_data(log, resp_json, pid, sql_pool):
  159. log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
  160. data = resp_json.get("data")
  161. specification = data.get("packageConfigRamark") # 规格说明
  162. introImages = data.get('images', {}).get("introImages", [])
  163. introImages = '|'.join(introImages) # 多图链接, |分割
  164. categoryId = data.get("categoryId")
  165. vipPrice = data.get("vipPrice")
  166. unitPriceOfCash = data.get("unitPriceOfCash")
  167. detail_dict = {
  168. "specification": specification,
  169. "images": introImages,
  170. "category_id": categoryId,
  171. "unit_price_of_cash": unitPriceOfCash,
  172. "vip_price": vipPrice,
  173. }
  174. # print(detail_dict)
  175. sql_pool.update_one_or_dict(table="qiandao_mall_draw_card_list_record", data=detail_dict, condition={"pid": pid})
  176. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  177. def get_draw_ids(log, pid, sql_pool,max_trading_time):
  178. """
  179. 获取 抽卡记录 id列表 信息
  180. :param log:
  181. :param pid:
  182. :param sql_pool:
  183. :param max_trading_time:
  184. """
  185. log.debug(f"--------------- {inspect.currentframe().f_code.co_name}, pid: {pid} ---------------")
  186. url = "https://api.qiandao.cn/b2c-web/v1/box/gacha/query/digest-draw-records"
  187. params = {
  188. # "shelfId": "855250935293687891"
  189. "shelfId": pid
  190. }
  191. resp_json = utils.request_get_data(log, url, params)
  192. # print(resp_json)
  193. if resp_json.get("code") == 0:
  194. rows = resp_json.get("data", {}).get("rows", [])
  195. draw_id_list = [item.get("id") for item in rows]
  196. # 每隔10个抽奖,获取一次
  197. for i in range(0, len(draw_id_list), 10):
  198. log.debug(
  199. f"{inspect.currentframe().f_code.co_name}, get_draw_ids, i: {i}, len draw_id_list: {len(draw_id_list)}")
  200. try:
  201. get_draw_buy_data(log, pid, sql_pool, draw_id_list[i:i + 10],max_trading_time)
  202. except Exception as e:
  203. log.error(f"{inspect.currentframe().f_code.co_name} call draw_id_list error: {e}")
  204. else:
  205. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  206. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  207. def get_draw_buy_data(log, pid, sql_pool, ids: list,max_trading_time):
  208. """
  209. 获取 抽卡记录 信息
  210. :param log:
  211. :param pid:
  212. :param sql_pool:
  213. :param ids:
  214. :param max_trading_time:
  215. """
  216. url = "https://api.qiandao.cn/b2c-web/v1/box/gacha/query/ids-draw-records"
  217. # data = {
  218. # "ids": [
  219. # "878724165295676667",
  220. # "878724165295676666",
  221. # "878724165295676665",
  222. # "878724165295676664",
  223. # "878724165295676663",
  224. # "878723922630058286",
  225. # "878723922630058285",
  226. # "878723922630058284",
  227. # "878723922630058283",
  228. # "878723922630058282"
  229. # ],
  230. # "shelfId": "855250935293687891"
  231. # }
  232. data = {
  233. "ids": ids,
  234. # "shelfId": "855250935293687891"
  235. "shelfId": pid
  236. }
  237. resp_json = utils.request_post_data(log, url, data)
  238. # print(resp_json)
  239. if resp_json.get("code") == 0:
  240. try:
  241. parse_draw_buy_data(log, resp_json, pid, sql_pool,max_trading_time)
  242. except Exception as e:
  243. log.error(f"{inspect.currentframe().f_code.co_name} call parse_draw_buy_data error: {e}")
  244. else:
  245. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  246. def is_draw_id_exists(log, sql_pool, draw_id):
  247. """
  248. 判断 draw_id 是否存在于表中
  249. :param log: 日志记录器
  250. :param sql_pool: 数据库连接池
  251. :param draw_id: 要检查的 draw_id
  252. :return: True 如果存在,否则 False
  253. """
  254. try:
  255. query = """
  256. SELECT EXISTS (
  257. SELECT 1
  258. FROM qiandao_mall_draw_card_buy_record
  259. WHERE draw_id = %s
  260. ) AS found
  261. """
  262. result = sql_pool.select_one(query=query, args=(draw_id,))
  263. return bool(result[0]) if result else False
  264. except Exception as e:
  265. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  266. return False
  267. def parse_draw_buy_data(log, resp_json, pid, sql_pool,max_trading_time):
  268. log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
  269. rows = resp_json.get('data', {}).get('rows', [])
  270. info_list = []
  271. for item in rows:
  272. draw_id = item.get("id")
  273. # # 判断 draw_id 是否在 qiandao_mall_draw_card_buy_record 表中
  274. # if is_draw_id_exists(log, sql_pool, draw_id):
  275. # log.debug(f"draw_id: {draw_id} 已存在,跳过插入")
  276. # continue
  277. productInfo = item.get("productInfo", {})
  278. spu_id = productInfo.get("id", {})
  279. spuName = productInfo.get("spuName")
  280. spuImage = productInfo.get("spuImage")
  281. rarity = item.get("rarity")
  282. userInfo = item.get("userInfo", {})
  283. buyer_id = userInfo.get("id")
  284. buyer_name = userInfo.get("name")
  285. draw_time = item.get("time")
  286. trading_time_str = utils.transform_ms(log, draw_time)
  287. if not trading_time_str:
  288. log.debug(f"Invalid trading time: {trading_time_str}")
  289. continue # 跳过无效时间
  290. # 字符串 -> datetime
  291. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  292. if max_trading_time and trading_time <= max_trading_time:
  293. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  294. break
  295. draw_dict = {
  296. "pid": pid,
  297. "draw_id": draw_id,
  298. "spu_id": spu_id,
  299. "spu_name": spuName,
  300. "spu_image": spuImage,
  301. "rarity": rarity,
  302. "buyer_id": buyer_id,
  303. "buyer_name": buyer_name,
  304. "draw_time": trading_time_str
  305. }
  306. # print(draw_dict)
  307. info_list.append(draw_dict)
  308. if info_list:
  309. try:
  310. sql_pool.insert_many(table="qiandao_mall_draw_card_buy_record", data_list=info_list)
  311. except Exception as e:
  312. log.error(f"{inspect.currentframe().f_code.co_name} call insert_many error: {e}")
  313. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  314. def qd_draw_card_main(log):
  315. """
  316. 主函数
  317. :param log: logger对象
  318. """
  319. log.info(
  320. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  321. # 配置 MySQL 连接池
  322. sql_pool = MySQLConnectionPool(log=log)
  323. if not sql_pool.check_pool_health():
  324. log.error("数据库连接池异常")
  325. raise RuntimeError("数据库连接池异常")
  326. try:
  327. # 请求抽卡记录
  328. # sql_pid2_list = sql_pool.select_all("SELECT pid FROM qiandao_mall_draw_card_list_record WHERE is_sold_out = 1")
  329. sql_pid2_list = sql_pool.select_all("SELECT pid FROM qiandao_mall_draw_card_list_record")
  330. sql_pid2_list = [item[0] for item in sql_pid2_list]
  331. # 测试
  332. if not sql_pid2_list:
  333. # sql_pid2_list = ["855250935293687891"]
  334. log.debug("No sql_pid2_list")
  335. return
  336. for pid2 in sql_pid2_list:
  337. try:
  338. # 获取 该pid 购买数据的最大时间。
  339. max_trading_time = sql_pool.select_one(
  340. query="SELECT MAX(draw_time) FROM qiandao_mall_draw_card_buy_record WHERE pid = %s",
  341. args=(pid2,))
  342. max_trading_time = max_trading_time[0] if max_trading_time else None
  343. get_draw_ids(log, pid2, sql_pool,max_trading_time)
  344. except Exception as e:
  345. log.error(f"{inspect.currentframe().f_code.co_name} for sql_pid2_list: {pid2}, error: {e}")
  346. except Exception as e:
  347. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  348. finally:
  349. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  350. if __name__ == '__main__':
  351. qd_draw_card_main(log=logger)