# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/3/25 18:47 import time from mysql_pool import MySQLConnectionPool from settings import * @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_history_sold_one_page(log, shop_id, sql_pool, page, token): """ 获取指定页面的已售数据 :param log: logger对象 :param shop_id: 商家id :param sql_pool: sql_pool对象 :param page: 当前页码 :param token: token :return: 所有页码, totalPage """ url = "https://api.luckycards.com.cn/api/front/c/product/merchantProductShowList" data = { # "merchantCode": "81366056", "merchantCode": shop_id, "page": page, "saleStatus": 2 } response = make_request(log, "POST", url, data=data, token=token) if not response: log.warning(f" get_history_sold_one_page for {shop_id}: Empty response") return 1 resp_data_ = response.get("data", {}) totalPage = resp_data_.get("totalPage", 1) resp_data_list = resp_data_.get("list", []) if not resp_data_list: log.warning(f" get_history_sold_one_page for {shop_id}: Empty response") else: all_in_db = True for resp_data in resp_data_list: product_id = resp_data.get("code") if not product_id: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No product_id found") continue # 查询商品id在不在数据库中 sql_exists_flag = """SELECT EXISTS (SELECT 1 FROM leka_product_record WHERE product_id = %s) AS exists_flag""" exists_flag = sql_pool.select_one(sql_exists_flag, (product_id,)) exists_flag = exists_flag[0] if exists_flag == 1: log.info( f"----------------- The product_id {product_id} is already in the database, Not need save -----------------") else: all_in_db = False try: get_product_details(log, product_id, sql_pool, token) except Exception as e: log.error(f"Error fetching product {product_id}: {e}") continue if page < 5 and all_in_db: # if page == 1 and all_in_db: return -1 # 特定标志值,表示第一页数据全在数据库中 return totalPage def get_history_all_sold(log, sql_pool, shop_id, token): """ 获取店铺历史 sold 信息 :param log: logger对象 :param sql_pool: sql_pool对象 :param shop_id: 商家id :param token: token """ page = 1 while True: log.info(f"----------------- The shop_id: {shop_id}, page: {page} is start -----------------") totalPage = get_history_sold_one_page(log, shop_id, sql_pool, page, token) if totalPage == -1: # 检查特定标志值, 方便断点续爬 log.info(f"----------------- The shop_id: {shop_id}, 第一页数据全在数据库中,跳过后续页 -----------------") break if page >= totalPage: break page += 1 @retry(stop=stop_after_attempt(50), wait=wait_fixed(1800), after=after_log) def leka_history_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: token = sql_pool.select_one("SELECT token FROM leka_token") token = token[0] shop_id_list = sql_pool.select_all("SELECT DISTINCT shop_id FROM leka_shop_record") shop_id_list = [pid[0] for pid in shop_id_list] for shop_id in shop_id_list: try: get_history_all_sold(log, sql_pool, shop_id, token) except Exception as e: log.error(f"Error fetching shop_id {shop_id}: {e}") continue time.sleep(60) # time.sleep(60) get_players(log, sql_pool, token) time.sleep(60) get_reports(log, sql_pool, token) except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': leka_history_main(logger) # sql_pool_ = MySQLConnectionPool(log=logger) # get_history_sold_one_page(logger, "1896238", sql_pool_, 1)