leka_new_daily_spider.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/3/24 14:02
  5. import json
  6. import random
  7. import time
  8. from mysql_pool import MySQLConnectionPool
  9. from settings import *
  10. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  11. def get_all_sold_one_page(log, page: int, last_id, lastSalePrice):
  12. """
  13. 获取指定页面的已售数据
  14. :param log: logger对象
  15. :param page: 页码
  16. :param last_id: last_id
  17. :param lastSalePrice: lastSalePrice
  18. :return: 该页的数据, totalPage, total
  19. """
  20. log.info(f"Starting < get_all_sold_one_page > to fetch page {page}")
  21. url = "https://api.luckycards.com.cn/api/front/c/product/productShowList"
  22. data = {
  23. "lastId": last_id,
  24. "lastSalePrice": lastSalePrice,
  25. "limit": 20,
  26. "openMode": "",
  27. "page": page,
  28. "saleStatus": "2",
  29. "sort": "0"
  30. }
  31. try:
  32. # data = json.dumps(data, separators=(',', ':'))
  33. response = make_request(log, 'POST', url, data=data)
  34. # print(response)
  35. if response:
  36. items = response["data"]["list"]
  37. total_page = response["data"]["totalPage"]
  38. total = response["data"]["total"]
  39. log.info(f"Successfully fetched page {page}: {len(items)} items")
  40. return items, total_page, total
  41. else:
  42. return [], 0, 0
  43. except requests.exceptions.RequestException as e:
  44. log.error(f"Error fetching page {page}: {e}")
  45. raise e
  46. except ValueError as e:
  47. log.error(f"Error parsing JSON for page {page}: {e}")
  48. raise e
  49. def get_shop_detail(log, shop_id):
  50. """
  51. 获取店铺详情信息
  52. :param log:
  53. :param shop_id:
  54. :return:
  55. """
  56. log.info(f"Start fetching shop {shop_id}")
  57. url = f"https://api.luckycards.com.cn/api/front/c/merchant/{shop_id}"
  58. try:
  59. response = make_request(log, 'GET', url)
  60. if response:
  61. item = response["data"]
  62. fans_num = item.get("fansNum")
  63. group_num = item.get("salesQuantity")
  64. create_time = item.get("createTime")
  65. log.info(f"Successfully fetched shop {shop_id}")
  66. return fans_num, group_num, create_time
  67. else:
  68. return None, None, None
  69. except Exception as e:
  70. log.error(f"Error fetching shop {shop_id}: {e}")
  71. return None, None, None
  72. def parse_shop_items(log, items, sql_pool, sql_shop_list):
  73. if not items:
  74. log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found")
  75. return
  76. for item in items:
  77. shop_id = item.get("merchantCode")
  78. # fans_num, group_num, create_time = get_shop_detail(log, shop_id)
  79. # 查询商家id在不在数据库中 如果在数据库中则更新数据 不在数据库中则插入数据
  80. # sql_exists_flag = """SELECT EXISTS (SELECT 1 FROM leka_shop_record WHERE shop_id = %s) AS exists_flag"""
  81. # exists_flag = sql_pool.select_one(sql_exists_flag, (shop_id,))
  82. # exists_flag = exists_flag[0]
  83. # if exists_flag == 1:
  84. if shop_id in sql_shop_list:
  85. log.debug(
  86. f"----------------- The shop_id {shop_id} is already in the database, Not need save -----------------")
  87. # sql_pool.update_one(
  88. # "UPDATE leka_shop_record SET fans_num = %s, group_num = %s, create_time = %s WHERE shop_id = %s",
  89. # (fans_num, group_num, create_time, shop_id))
  90. else:
  91. fans_num, group_num, create_time = get_shop_detail(log, shop_id)
  92. shop_name = item.get("merchantName")
  93. shop_info_dict = {
  94. "shop_id": shop_id,
  95. "shop_name": shop_name,
  96. "fans_num": fans_num,
  97. "group_num": group_num,
  98. "create_time": create_time
  99. }
  100. sql_pool.insert_one_or_dict("leka_shop_record", shop_info_dict)
  101. sql_shop_list.append(shop_id)
  102. def get_product(log, items, sql_pool, last_product_id, sql_product_id_list):
  103. if not items:
  104. log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found")
  105. return
  106. stop_page = True
  107. for item in items:
  108. product_id = item.get("code")
  109. if not product_id:
  110. log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No product_id found")
  111. continue
  112. # 查询商家id在不在数据库中
  113. # sql_exists_flag = """SELECT EXISTS (SELECT 1 FROM leka_product_record WHERE product_id = %s) AS exists_flag"""
  114. # exists_flag = sql_pool.select_one(sql_exists_flag, (product_id,))
  115. # exists_flag = exists_flag[0]
  116. # if exists_flag == 1:
  117. if product_id in sql_product_id_list:
  118. log.debug(
  119. f"----------------- The product_id {product_id} is already in the database, Not need save -----------------")
  120. else:
  121. sql_pool.insert_one_or_dict("leka_product_record", {"product_id": product_id})
  122. sql_product_id_list.append(product_id)
  123. # try:
  124. # get_product_details(log, product_id, sql_pool)
  125. #
  126. # # get_player_list(log, product_id, sql_pool)
  127. # except Exception as e:
  128. # log.error(f"Error fetching product {product_id}: {e}")
  129. # continue
  130. # 判断是否是昨天的最后一条id, 如果是 则停止翻页
  131. if product_id == last_product_id:
  132. log.info(
  133. f"----------------- The product_id {product_id} is the last product_id:{last_product_id}, stop fetching -----------------")
  134. stop_page = False
  135. return stop_page
  136. def get_all_sold_data(log, sql_pool, last_product_id, sql_shop_list, sql_product_id_list):
  137. """
  138. 获取 全部类别的已售数据
  139. :param sql_pool: MySQL连接池对象
  140. :param log: logger对象
  141. :param last_product_id: last_product_id
  142. :param sql_shop_list: sql_shop_list
  143. :param sql_product_id_list: sql_product_id_list
  144. """
  145. page = 1
  146. # page = 246
  147. max_page = 200
  148. last_id = 0
  149. lastSalePrice = ''
  150. while page <= max_page:
  151. # while True:
  152. items, total_page, total = get_all_sold_one_page(log, page, last_id, lastSalePrice)
  153. if not items:
  154. break
  155. # 处理 items 数据
  156. parse_shop_items(log, items, sql_pool, sql_shop_list)
  157. stop_page = get_product(log, items, sql_pool, last_product_id, sql_product_id_list)
  158. if not stop_page:
  159. break
  160. # 更新lastId为最后一条的userId
  161. last_id = items[-1].get("id")
  162. lastSalePrice = items[-1].get("unitPriceStr")
  163. if not last_id:
  164. log.error("API response missing userId in last item, cannot paginate")
  165. break
  166. if not lastSalePrice:
  167. log.error("API response missing lastSalePrice in last item, cannot paginate")
  168. break
  169. page += 1
  170. # time.sleep(random.uniform(1, 3))
  171. log.info(f"Finished fetching all data. Total pages: {total_page}, total items: {total}")
  172. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  173. def leka_main(log):
  174. """
  175. 主函数
  176. :param log: logger对象
  177. """
  178. log.info(
  179. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  180. # 配置 MySQL 连接池
  181. sql_pool = MySQLConnectionPool(log=log)
  182. if not sql_pool.check_pool_health():
  183. log.error("数据库连接池异常")
  184. raise RuntimeError("数据库连接池异常")
  185. try:
  186. token = sql_pool.select_one("SELECT token FROM leka_token")
  187. token = token[0]
  188. try:
  189. # 获取最后一条pid的数据
  190. last_product_id = sql_pool.select_one(
  191. "SELECT product_id FROM leka_product_record ORDER BY finish_time DESC LIMIT 1")
  192. last_product_id = last_product_id[0]
  193. # 获取shop_list
  194. sql_shop_list = sql_pool.select_all("SELECT shop_id FROM leka_shop_record")
  195. sql_shop_list = [item[0] for item in sql_shop_list]
  196. # 获取 product_id_list
  197. sql_product_id_list = sql_pool.select_all("SELECT product_id FROM leka_product_record")
  198. sql_product_id_list = [item[0] for item in sql_product_id_list]
  199. get_all_sold_data(log, sql_pool, last_product_id, sql_shop_list, sql_product_id_list)
  200. sql_shop_list.clear()
  201. sql_product_id_list.clear()
  202. except Exception as e:
  203. log.error(f"Error fetching last_product_id: {e}")
  204. time.sleep(5)
  205. # 获取商品详情
  206. try:
  207. get_product_detail_list(log, sql_pool,token)
  208. except Exception as e:
  209. log.error(f"Error fetching product_detail_list: {e}")
  210. time.sleep(5)
  211. # 获取商品玩家
  212. try:
  213. get_players(log, sql_pool,token)
  214. except Exception as e:
  215. log.error(f"Error fetching players: {e}")
  216. time.sleep(5)
  217. #获取拆卡报告
  218. try:
  219. get_reports(log, sql_pool,token)
  220. except Exception as e:
  221. log.error(f"Error fetching reports: {e}")
  222. except Exception as e:
  223. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  224. finally:
  225. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  226. if __name__ == '__main__':
  227. leka_main(logger)
  228. # get_all_sold_one_page(logger, 1, 0, '')