leka_new_daily_spider.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/3/24 14:02
  5. import random
  6. import time
  7. from mysql_pool import MySQLConnectionPool
  8. from settings import *
  9. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  10. def get_all_sold_one_page(log, page: int, last_id, lastSalePrice):
  11. """
  12. 获取指定页面的已售数据
  13. :param log: logger对象
  14. :param page: 页码
  15. :param last_id: last_id
  16. :param lastSalePrice: lastSalePrice
  17. :return: 该页的数据, totalPage, total
  18. """
  19. log.info(f"Starting < get_all_sold_one_page > to fetch page {page}")
  20. url = "https://api.luckycards.com.cn/api/front/c/product/productShowList"
  21. data = {
  22. "lastId": last_id,
  23. "lastSalePrice": lastSalePrice,
  24. "limit": 20,
  25. "openMode": "",
  26. "page": page,
  27. "saleStatus": "2",
  28. "sort": "0"
  29. }
  30. try:
  31. response = make_request(log, 'POST', url, data=data)
  32. # print(response)
  33. if response:
  34. items = response["data"]["list"]
  35. total_page = response["data"]["totalPage"]
  36. total = response["data"]["total"]
  37. log.info(f"Successfully fetched page {page}: {len(items)} items")
  38. return items, total_page, total
  39. else:
  40. return [], 0, 0
  41. except requests.exceptions.RequestException as e:
  42. log.error(f"Error fetching page {page}: {e}")
  43. raise e
  44. except ValueError as e:
  45. log.error(f"Error parsing JSON for page {page}: {e}")
  46. raise e
  47. def get_shop_detail(log, shop_id):
  48. """
  49. 获取店铺详情信息
  50. :param log:
  51. :param shop_id:
  52. :return:
  53. """
  54. log.info(f"Start fetching shop {shop_id}")
  55. url = f"https://api.luckycards.com.cn/api/front/c/merchant/{shop_id}"
  56. try:
  57. response = make_request(log, 'GET', url)
  58. if response:
  59. item = response["data"]
  60. fans_num = item.get("fansNum")
  61. group_num = item.get("salesQuantity")
  62. create_time = item.get("createTime")
  63. log.info(f"Successfully fetched shop {shop_id}")
  64. return fans_num, group_num, create_time
  65. else:
  66. return None, None, None
  67. except Exception as e:
  68. log.error(f"Error fetching shop {shop_id}: {e}")
  69. return None, None, None
  70. def parse_shop_items(log, items, sql_pool, sql_shop_list):
  71. if not items:
  72. log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found")
  73. return
  74. for item in items:
  75. shop_id = item.get("merchantCode")
  76. # fans_num, group_num, create_time = get_shop_detail(log, shop_id)
  77. # 查询商家id在不在数据库中 如果在数据库中则更新数据 不在数据库中则插入数据
  78. # sql_exists_flag = """SELECT EXISTS (SELECT 1 FROM leka_shop_record WHERE shop_id = %s) AS exists_flag"""
  79. # exists_flag = sql_pool.select_one(sql_exists_flag, (shop_id,))
  80. # exists_flag = exists_flag[0]
  81. # if exists_flag == 1:
  82. if shop_id in sql_shop_list:
  83. log.debug(
  84. f"----------------- The shop_id {shop_id} is already in the database, Not need save -----------------")
  85. # sql_pool.update_one(
  86. # "UPDATE leka_shop_record SET fans_num = %s, group_num = %s, create_time = %s WHERE shop_id = %s",
  87. # (fans_num, group_num, create_time, shop_id))
  88. else:
  89. fans_num, group_num, create_time = get_shop_detail(log, shop_id)
  90. shop_name = item.get("merchantName")
  91. shop_info_dict = {
  92. "shop_id": shop_id,
  93. "shop_name": shop_name,
  94. "fans_num": fans_num,
  95. "group_num": group_num,
  96. "create_time": create_time
  97. }
  98. sql_pool.insert_one_or_dict("leka_shop_record", shop_info_dict)
  99. sql_shop_list.append(shop_id)
  100. def get_product(log, items, sql_pool, last_product_id, sql_product_id_list):
  101. if not items:
  102. log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found")
  103. return
  104. stop_page = True
  105. for item in items:
  106. product_id = item.get("code")
  107. if not product_id:
  108. log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No product_id found")
  109. continue
  110. # 查询商家id在不在数据库中
  111. # sql_exists_flag = """SELECT EXISTS (SELECT 1 FROM leka_product_record WHERE product_id = %s) AS exists_flag"""
  112. # exists_flag = sql_pool.select_one(sql_exists_flag, (product_id,))
  113. # exists_flag = exists_flag[0]
  114. # if exists_flag == 1:
  115. if product_id in sql_product_id_list:
  116. log.debug(
  117. f"----------------- The product_id {product_id} is already in the database, Not need save -----------------")
  118. else:
  119. sql_pool.insert_one_or_dict("leka_product_record", {"product_id": product_id})
  120. sql_product_id_list.append(product_id)
  121. # try:
  122. # get_product_details(log, product_id, sql_pool)
  123. #
  124. # # get_player_list(log, product_id, sql_pool)
  125. # except Exception as e:
  126. # log.error(f"Error fetching product {product_id}: {e}")
  127. # continue
  128. # 判断是否是昨天的最后一条id, 如果是 则停止翻页
  129. if product_id == last_product_id:
  130. log.info(
  131. f"----------------- The product_id {product_id} is the last product_id:{last_product_id}, stop fetching -----------------")
  132. stop_page = False
  133. return stop_page
  134. def get_all_sold_data(log, sql_pool, last_product_id, sql_shop_list, sql_product_id_list):
  135. """
  136. 获取 全部类别的已售数据
  137. :param sql_pool: MySQL连接池对象
  138. :param log: logger对象
  139. :param last_product_id: last_product_id
  140. :param sql_shop_list: sql_shop_list
  141. :param sql_product_id_list: sql_product_id_list
  142. """
  143. page = 1
  144. # page = 246
  145. max_page = 200
  146. last_id = 0
  147. lastSalePrice = ''
  148. while page <= max_page:
  149. # while True:
  150. items, total_page, total = get_all_sold_one_page(log, page, last_id, lastSalePrice)
  151. if not items:
  152. break
  153. # 处理 items 数据
  154. parse_shop_items(log, items, sql_pool, sql_shop_list)
  155. stop_page = get_product(log, items, sql_pool, last_product_id, sql_product_id_list)
  156. if not stop_page:
  157. break
  158. # 更新lastId为最后一条的userId
  159. last_id = items[-1].get("id")
  160. lastSalePrice = items[-1].get("unitPriceStr")
  161. if not last_id:
  162. log.error("API response missing userId in last item, cannot paginate")
  163. break
  164. if not lastSalePrice:
  165. log.error("API response missing lastSalePrice in last item, cannot paginate")
  166. break
  167. page += 1
  168. # time.sleep(random.uniform(1, 3))
  169. log.info(f"Finished fetching all data. Total pages: {total_page}, total items: {total}")
  170. @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
  171. def leka_main(log):
  172. """
  173. 主函数
  174. :param log: logger对象
  175. """
  176. log.info(
  177. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  178. # 配置 MySQL 连接池
  179. sql_pool = MySQLConnectionPool(log=log)
  180. if not sql_pool.check_pool_health():
  181. log.error("数据库连接池异常")
  182. raise RuntimeError("数据库连接池异常")
  183. try:
  184. token = sql_pool.select_one("SELECT token FROM leka_token")
  185. token = token[0]
  186. try:
  187. # 获取最后一条pid的数据
  188. last_product_id = sql_pool.select_one(
  189. "SELECT product_id FROM leka_product_record ORDER BY finish_time DESC LIMIT 1")
  190. last_product_id = last_product_id[0]
  191. # 获取shop_list
  192. sql_shop_list = sql_pool.select_all("SELECT shop_id FROM leka_shop_record")
  193. sql_shop_list = [item[0] for item in sql_shop_list]
  194. # 获取 product_id_list
  195. sql_product_id_list = sql_pool.select_all("SELECT product_id FROM leka_product_record")
  196. sql_product_id_list = [item[0] for item in sql_product_id_list]
  197. get_all_sold_data(log, sql_pool, last_product_id, sql_shop_list, sql_product_id_list)
  198. sql_shop_list.clear()
  199. sql_product_id_list.clear()
  200. except Exception as e:
  201. log.error(f"Error fetching last_product_id: {e}")
  202. time.sleep(5)
  203. # 获取商品详情
  204. try:
  205. get_product_detail_list(log, sql_pool,token)
  206. except Exception as e:
  207. log.error(f"Error fetching product_detail_list: {e}")
  208. time.sleep(5)
  209. # 获取商品玩家
  210. try:
  211. get_players(log, sql_pool,token)
  212. except Exception as e:
  213. log.error(f"Error fetching players: {e}")
  214. time.sleep(5)
  215. #获取拆卡报告
  216. try:
  217. get_reports(log, sql_pool,token)
  218. except Exception as e:
  219. log.error(f"Error fetching reports: {e}")
  220. except Exception as e:
  221. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  222. finally:
  223. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  224. if __name__ == '__main__':
  225. leka_main(logger)
  226. # get_all_sold_one_page(logger, 1, 0, '')