qd_mall.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/7/2 16:02
  5. import time
  6. from datetime import datetime
  7. import utils
  8. import inspect
  9. import schedule
  10. from loguru import logger
  11. from mysql_pool import MySQLConnectionPool
  12. from tenacity import retry, stop_after_attempt, wait_fixed
  13. logger.remove()
  14. logger.add("./logs/mall_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
  15. format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
  16. level="DEBUG", retention="7 day")
  17. CRAWL_CATEGORY = '潮玩'
  18. def after_log(retry_state):
  19. """
  20. retry 回调
  21. :param retry_state: RetryCallState 对象
  22. """
  23. # 检查 args 是否存在且不为空
  24. if retry_state.args and len(retry_state.args) > 0:
  25. log = retry_state.args[0] # 获取传入的 logger
  26. else:
  27. log = logger # 使用全局 logger
  28. if retry_state.outcome.failed:
  29. log.warning(
  30. f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
  31. else:
  32. log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
  33. def get_toy_mall_list(log, sql_pool,sql_mall_list):
  34. page = 1
  35. max_page = 500
  36. total_count = 0
  37. while page <= max_page:
  38. len_item, totalCount = get_toy_mall_single_page(log, page, sql_pool,sql_mall_list)
  39. if len_item < 20:
  40. log.debug(
  41. f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount} ---------------")
  42. break
  43. total_count += len_item
  44. if total_count >= int(totalCount):
  45. log.debug(f"total_count: {total_count} totalCount: {totalCount}")
  46. break
  47. page += 1
  48. # time.sleep(random.uniform(0.1, 1))
  49. def get_toy_mall_single_page(log, page, sql_pool,sql_mall_list):
  50. url = "https://api.qiandao.cn/shelf-web/list-home-recommender"
  51. params = {
  52. # "offset": 0,
  53. "offset": (page - 1) * 20,
  54. "limit": 20,
  55. "cmd": "b2c_homepage_feed",
  56. "name": "manghe",
  57. "project": "channel",
  58. # "type[]": "BLIND_BOX_MACHINE",
  59. # "type[]": "KUJI",
  60. # "type[]": "NEW_LUCKY_BAG",
  61. # "type[]": "B2C",
  62. "type": "",
  63. "withLive": "true"
  64. }
  65. try:
  66. resp_json = utils.request_get_data(logger, url, params, proxy=False)
  67. # print(resp_json)
  68. if not resp_json:
  69. log.error(f"get_toy_mall_single_page page:{page}")
  70. if resp_json.get("code", -1) == 0:
  71. rows = resp_json.get("data", {}).get("rows", [])
  72. total_count = resp_json.get("data", {}).get("count", 0)
  73. try:
  74. parse_mall_data(log, rows, sql_pool,sql_mall_list)
  75. except Exception as e:
  76. log.error(f"parse_mall_data error: {e}")
  77. return len(rows), total_count
  78. else:
  79. log.error(f"{inspect.currentframe().f_code.co_name} -> page:{page} error: {resp_json.get('message')}")
  80. return 0, 0
  81. except Exception as e:
  82. logger.error(f"Request failed: {e}")
  83. return 0, 0
  84. def parse_mall_data(log, rows, sql_pool,sql_mall_list):
  85. info_list = []
  86. for row in rows:
  87. pid = row.get("id")
  88. if pid in sql_mall_list:
  89. log.info(f"pid:{pid} is exist, skip .......")
  90. continue
  91. p_type = row.get("type")
  92. orgId = row.get("orgId")
  93. title = row.get("name")
  94. isSoldOut = row.get("isSoldOut")
  95. price = row.get("price", {}).get("unitPriceOfCash") if row.get("price") else None
  96. soldAmountText = row.get("soldAmountText") # 已售
  97. nickname = row.get("org", {}).get("nickname", '') if row.get("org") else None
  98. # sell_time = row.get("sellTime")
  99. # sell_time = utils.translate_s(log, sell_time) if sell_time or sell_time != '0' else None
  100. # images = row.get("coverImages", [])
  101. # images = '|'.join(images)
  102. data_dict = {
  103. "pid": pid,
  104. "p_type": p_type,
  105. "title": title,
  106. "price": price,
  107. "sold_amount_text": soldAmountText,
  108. "org_id": orgId,
  109. "nickname": nickname,
  110. # "description": description,
  111. "is_sold_out": isSoldOut,
  112. # "sell_time": sell_time,
  113. # "images": images,
  114. "crawl_category": CRAWL_CATEGORY
  115. }
  116. info_list.append(data_dict)
  117. # sql_mall_list.append(pid)
  118. if info_list:
  119. query = """
  120. INSERT INTO qiandao_mall_list_record
  121. (pid, p_type, title, price, sold_amount_text, org_id, nickname, is_sold_out, crawl_category)
  122. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
  123. ON DUPLICATE KEY UPDATE
  124. p_type = VALUES(p_type),
  125. title = VALUES(title),
  126. price = VALUES(price),
  127. sold_amount_text = VALUES(sold_amount_text),
  128. org_id = VALUES(org_id),
  129. nickname = VALUES(nickname),
  130. is_sold_out = VALUES(is_sold_out),
  131. crawl_category = VALUES(crawl_category)
  132. """
  133. args_list = [(item['pid'], item['p_type'], item['title'], item['price'], item['sold_amount_text'],
  134. item['org_id'], item['nickname'], item['is_sold_out'], item['crawl_category']) for item in
  135. info_list]
  136. try:
  137. sql_pool.insert_many(query=query, args_list=args_list)
  138. except Exception as e:
  139. log.error(f"parse_mall_data -> sql_pool.insert_many error: {e}")
  140. # ----------------------------------------------------------------------------------------------------------------------
  141. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  142. def get_group_id_list_for_pid_one_page(log, token, pid, sql_pool):
  143. """
  144. 获取 每个pid的 group_id列表
  145. :param log:
  146. :param token:
  147. :param pid:
  148. :param sql_pool:
  149. """
  150. log.debug(f"--------------- {inspect.currentframe().f_code.co_name}, pid:{pid} ---------------")
  151. url = "https://api.qiandao.cn/b2c-web/v1/kuji/query/group/next"
  152. params = {
  153. # "shelfId": "776833910955877820"
  154. "shelfId": pid
  155. }
  156. try:
  157. resp_json = utils.request_get_data(log, url, params, token)
  158. # print(resp_json)
  159. if resp_json.get("code") == 0:
  160. parse_group_id_list(log, resp_json, pid, sql_pool)
  161. elif resp_json.get("code", 0) == 2600030:
  162. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  163. sql_pool.update_one(
  164. query="UPDATE qiandao_mall_list_record SET bag_state = 2 WHERE pid = %s",
  165. args=(pid,))
  166. return
  167. else:
  168. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  169. except Exception as e:
  170. log.error(f"Request failed: {e}")
  171. def parse_group_id_list(log, resp_json, pid, sql_pool):
  172. log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
  173. data = resp_json.get("data", {}).get("groupSummary", [])
  174. for item in data:
  175. group_id = item.get("groupId")
  176. try:
  177. # 获取 该组 购买数据的最大时间。
  178. max_trading_time = sql_pool.select_one(
  179. query=f"SELECT MAX(trading_time) FROM qiandao_mall_card_bag_reward_buy_record WHERE group_id = %s AND crawl_category = '{CRAWL_CATEGORY}'",
  180. args=(group_id,))
  181. max_trading_time = max_trading_time[0] if max_trading_time else None
  182. get_card_bag_reward_buyer_data(log, group_id, pid, sql_pool, max_trading_time)
  183. except Exception as e:
  184. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  185. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  186. def get_card_bag_reward_buyer_data(log, group_id, pid, sql_pool, max_trading_time):
  187. """
  188. 获取 卡包赏 购买数据(支持自动翻页)
  189. :param log: 日志记录器
  190. :param group_id: 组ID
  191. :param pid: pid
  192. :param sql_pool: sql_pool
  193. :param max_trading_time: max_trading_time
  194. """
  195. url = "https://api.qiandao.cn/box/kuji/query/v3/draw-records"
  196. limit = "10" # 每页数量
  197. seq = "0" # 初始seq值
  198. while True:
  199. params = {
  200. "groupId": group_id,
  201. "limit": limit,
  202. "seq": seq
  203. }
  204. resp_json = utils.request_get_data(log, url, params)
  205. if resp_json.get("code") != 0:
  206. log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
  207. break
  208. # 解析并处理当前页数据
  209. result = parse_card_bag_reward_buyer_data(log, resp_json, pid, sql_pool, group_id, max_trading_time)
  210. if not result:
  211. log.debug("No more data to process.")
  212. break
  213. has_more, next_seq = result
  214. if not has_more:
  215. log.debug("No more pages to fetch.")
  216. break
  217. seq = next_seq
  218. # time.sleep(random.uniform(0.5, 1))
  219. def parse_card_bag_reward_buyer_data(log, resp_json, pid, sql_pool, group_id, max_trading_time):
  220. """
  221. 解析 卡包赏 购买数据,并返回是否有更多数据及下一页的seq
  222. :param log: 日志记录器
  223. :param resp_json: 响应数据
  224. :param pid: pid
  225. :param sql_pool: sql_pool
  226. :param group_id: group_id
  227. :param max_trading_time: max_trading_time
  228. :return: (has_more: bool, next_seq: str)
  229. """
  230. log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
  231. data = resp_json.get("data", {})
  232. records = data.get("records", [])
  233. if not records:
  234. return False, None
  235. info_list = []
  236. for item in records:
  237. # print(item)
  238. trading_time = item.get("time") # 交易时间
  239. trading_time_str = utils.transform_ms(log, trading_time)
  240. if not trading_time_str:
  241. log.debug(f"Invalid trading time: {trading_time_str}")
  242. continue # 跳过无效时间
  243. # 字符串 -> datetime
  244. trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
  245. if max_trading_time and trading_time <= max_trading_time:
  246. log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
  247. return False, None
  248. productName = item.get("productName")
  249. userName = item.get("userName")
  250. userId = item.get("userId")
  251. level = item.get("level") # 1代表A赏, 以此类推
  252. levelType = item.get("levelType")
  253. seq = item.get("seq") # 抽卡顺序
  254. buy_dict = {
  255. "pid": pid,
  256. "group_id": group_id,
  257. "product_name": productName,
  258. "user_name": userName,
  259. "user_id": userId,
  260. "level": level,
  261. "level_type": levelType,
  262. "seq": seq,
  263. "trading_time": trading_time_str,
  264. "crawl_category": CRAWL_CATEGORY
  265. }
  266. # print(buy_dict)
  267. info_list.append(buy_dict)
  268. if info_list:
  269. try:
  270. sql_pool.insert_many(table="qiandao_mall_card_bag_reward_buy_record", data_list=info_list)
  271. sql_pool.update_one(
  272. query="UPDATE qiandao_mall_list_record SET bag_state = 1 WHERE pid = %s",
  273. args=(pid,))
  274. except Exception as e:
  275. log.error(f"{inspect.currentframe().f_code.co_name} call insert_many error: {e}")
  276. has_more = data.get("hasMore", False)
  277. next_seq = records[-1].get("seq") if has_more else None # 取最后一条的 seq
  278. return has_more, next_seq
  279. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  280. def qd_mall_main(log=logger):
  281. """
  282. 主函数
  283. :param log: logger对象
  284. """
  285. log.info(
  286. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  287. # 配置 MySQL 连接池
  288. sql_pool = MySQLConnectionPool(log=log)
  289. if not sql_pool.check_pool_health():
  290. log.error("数据库连接池异常")
  291. raise RuntimeError("数据库连接池异常")
  292. try:
  293. # 请求列表页
  294. try:
  295. sql_mall_list = sql_pool.select_all(
  296. f"# SELECT DISTINCT pid FROM qiandao_mall_list_record WHERE crawl_category = '{CRAWL_CATEGORY}'")
  297. sql_mall_list = [item[0] for item in sql_mall_list]
  298. get_toy_mall_list(log, sql_pool, sql_mall_list)
  299. # get_toy_mall_list(log, sql_pool)
  300. except Exception as e:
  301. log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
  302. time.sleep(5)
  303. # 请求详情页
  304. sql_token = sql_pool.select_one("SELECT token FROM qiandao_token LIMIT 1")
  305. token = sql_token[0]
  306. sql_pid_one_list = sql_pool.select_all(
  307. f"SELECT DISTINCT pid FROM qiandao_mall_list_record WHERE p_type = 'KUJI' AND crawl_category = '{CRAWL_CATEGORY}'")
  308. sql_pid_one_list = [item[0] for item in sql_pid_one_list]
  309. # sql_pid_two_list = sql_pool.select_all(
  310. # f"# SELECT DISTINCT pid FROM qiandao_mall_list_record WHERE p_type = 'GACHA' AND crawl_category = '{CRAWL_CATEGORY}'")
  311. # sql_pid_two_list = [item[0] for item in sql_pid_two_list]
  312. # sql_pid_three_list = sql_pool.select_all(
  313. # f"# SELECT DISTINCT pid FROM qiandao_mall_list_record WHERE p_type = 'LIVE_GROUPON' AND crawl_category = '{CRAWL_CATEGORY}'")
  314. # sql_pid_three_list = [item[0] for item in sql_pid_three_list]
  315. # 测试
  316. if not sql_pid_one_list:
  317. # sql_pid2_list = ["873770249084734603"]
  318. log.debug("No sql_pid_one_list")
  319. return
  320. for pid2 in sql_pid_one_list:
  321. try:
  322. get_group_id_list_for_pid_one_page(log, token, pid2, sql_pool)
  323. except Exception as e:
  324. log.error(f"{inspect.currentframe().f_code.co_name} for sql_pid2_list: {pid2}, error: {e}")
  325. except Exception as e:
  326. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  327. finally:
  328. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  329. def schedule_task():
  330. """
  331. 爬虫模块 定时任务 的启动文件
  332. """
  333. # 立即运行一次任务
  334. # qd_mall_main()
  335. # 设置定时任务
  336. schedule.every().day.at("00:01").do(qd_mall_main)
  337. while True:
  338. schedule.run_pending()
  339. time.sleep(1)
  340. if __name__ == '__main__':
  341. schedule_task()