leka_history_spider.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # -*- coding: utf-8 -*-
  2. # Author : Charley
  3. # Python : 3.10.8
  4. # Date : 2025/3/25 18:47
  5. import time
  6. from mysql_pool import MySQLConnectionPool
  7. from settings import *
  8. @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
  9. def get_history_sold_one_page(log, shop_id, sql_pool, page, token):
  10. """
  11. 获取指定页面的已售数据
  12. :param log: logger对象
  13. :param shop_id: 商家id
  14. :param sql_pool: sql_pool对象
  15. :param page: 当前页码
  16. :param token: token
  17. :return: 所有页码, totalPage
  18. """
  19. url = "https://api.luckycards.com.cn/api/front/c/product/merchantProductShowList"
  20. data = {
  21. # "merchantCode": "81366056",
  22. "merchantCode": shop_id,
  23. "page": page,
  24. "saleStatus": 2
  25. }
  26. response = make_request(log, "POST", url, data=data, token=token)
  27. if not response:
  28. log.warning(f" get_history_sold_one_page for {shop_id}: Empty response")
  29. return 1
  30. resp_data_ = response.get("data", {})
  31. totalPage = resp_data_.get("totalPage", 1)
  32. resp_data_list = resp_data_.get("list", [])
  33. if not resp_data_list:
  34. log.warning(f" get_history_sold_one_page for {shop_id}: Empty response")
  35. else:
  36. all_in_db = True
  37. for resp_data in resp_data_list:
  38. product_id = resp_data.get("code")
  39. if not product_id:
  40. log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No product_id found")
  41. continue
  42. # 查询商品id在不在数据库中
  43. sql_exists_flag = """SELECT EXISTS (SELECT 1 FROM leka_product_record WHERE product_id = %s) AS exists_flag"""
  44. exists_flag = sql_pool.select_one(sql_exists_flag, (product_id,))
  45. exists_flag = exists_flag[0]
  46. if exists_flag == 1:
  47. log.info(
  48. f"----------------- The product_id {product_id} is already in the database, Not need save -----------------")
  49. else:
  50. all_in_db = False
  51. try:
  52. get_product_details(log, product_id, sql_pool, token)
  53. except Exception as e:
  54. log.error(f"Error fetching product {product_id}: {e}")
  55. continue
  56. if page < 5 and all_in_db:
  57. # if page == 1 and all_in_db:
  58. return -1 # 特定标志值,表示第一页数据全在数据库中
  59. return totalPage
  60. def get_history_all_sold(log, sql_pool, shop_id, token):
  61. """
  62. 获取店铺历史 sold 信息
  63. :param log: logger对象
  64. :param sql_pool: sql_pool对象
  65. :param shop_id: 商家id
  66. :param token: token
  67. """
  68. page = 1
  69. while True:
  70. log.info(f"----------------- The shop_id: {shop_id}, page: {page} is start -----------------")
  71. totalPage = get_history_sold_one_page(log, shop_id, sql_pool, page, token)
  72. if totalPage == -1: # 检查特定标志值, 方便断点续爬
  73. log.info(f"----------------- The shop_id: {shop_id}, 第一页数据全在数据库中,跳过后续页 -----------------")
  74. break
  75. if page >= totalPage:
  76. break
  77. page += 1
  78. @retry(stop=stop_after_attempt(50), wait=wait_fixed(1800), after=after_log)
  79. def leka_history_main(log):
  80. """
  81. 主函数
  82. :param log: logger对象
  83. """
  84. log.info(
  85. f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
  86. # 配置 MySQL 连接池
  87. sql_pool = MySQLConnectionPool(log=log)
  88. if not sql_pool.check_pool_health():
  89. log.error("数据库连接池异常")
  90. raise RuntimeError("数据库连接池异常")
  91. try:
  92. token = sql_pool.select_one("SELECT token FROM leka_token")
  93. token = token[0]
  94. shop_id_list = sql_pool.select_all("SELECT DISTINCT shop_id FROM leka_shop_record")
  95. shop_id_list = [pid[0] for pid in shop_id_list]
  96. for shop_id in shop_id_list:
  97. try:
  98. get_history_all_sold(log, sql_pool, shop_id, token)
  99. except Exception as e:
  100. log.error(f"Error fetching shop_id {shop_id}: {e}")
  101. continue
  102. time.sleep(60)
  103. # time.sleep(60)
  104. get_players(log, sql_pool, token)
  105. time.sleep(60)
  106. get_reports(log, sql_pool, token)
  107. except Exception as e:
  108. log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
  109. finally:
  110. log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
  111. if __name__ == '__main__':
  112. leka_history_main(logger)
  113. # sql_pool_ = MySQLConnectionPool(log=logger)
  114. # get_history_sold_one_page(logger, "1896238", sql_pool_, 1)