# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/3/24 14:02 import random import time from mysql_pool import MySQLConnectionPool from settings import * @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_all_sold_one_page(log, page: int, last_id, lastSalePrice): """ 获取指定页面的已售数据 :param log: logger对象 :param page: 页码 :param last_id: last_id :param lastSalePrice: lastSalePrice :return: 该页的数据, totalPage, total """ log.info(f"Starting < get_all_sold_one_page > to fetch page {page}") url = "https://api.luckycards.com.cn/api/front/c/product/productShowList" data = { "lastId": last_id, "lastSalePrice": lastSalePrice, "limit": 20, "openMode": "", "page": page, "saleStatus": "2", "sort": "0" } try: response = make_request(log, 'POST', url, data=data) # print(response) if response: items = response["data"]["list"] total_page = response["data"]["totalPage"] total = response["data"]["total"] log.info(f"Successfully fetched page {page}: {len(items)} items") return items, total_page, total else: return [], 0, 0 except requests.exceptions.RequestException as e: log.error(f"Error fetching page {page}: {e}") raise e except ValueError as e: log.error(f"Error parsing JSON for page {page}: {e}") raise e def get_shop_detail(log, shop_id): """ 获取店铺详情信息 :param log: :param shop_id: :return: """ log.info(f"Start fetching shop {shop_id}") url = f"https://api.luckycards.com.cn/api/front/c/merchant/{shop_id}" try: response = make_request(log, 'GET', url) if response: item = response["data"] fans_num = item.get("fansNum") group_num = item.get("salesQuantity") create_time = item.get("createTime") log.info(f"Successfully fetched shop {shop_id}") return fans_num, group_num, create_time else: return None, None, None except Exception as e: log.error(f"Error fetching shop {shop_id}: {e}") return None, None, None def parse_shop_items(log, items, sql_pool, sql_shop_list): if not items: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found") return for item in items: shop_id = item.get("merchantCode") # fans_num, group_num, create_time = get_shop_detail(log, shop_id) # 查询商家id在不在数据库中 如果在数据库中则更新数据 不在数据库中则插入数据 # sql_exists_flag = """SELECT EXISTS (SELECT 1 FROM leka_shop_record WHERE shop_id = %s) AS exists_flag""" # exists_flag = sql_pool.select_one(sql_exists_flag, (shop_id,)) # exists_flag = exists_flag[0] # if exists_flag == 1: if shop_id in sql_shop_list: log.debug( f"----------------- The shop_id {shop_id} is already in the database, Not need save -----------------") # sql_pool.update_one( # "UPDATE leka_shop_record SET fans_num = %s, group_num = %s, create_time = %s WHERE shop_id = %s", # (fans_num, group_num, create_time, shop_id)) else: fans_num, group_num, create_time = get_shop_detail(log, shop_id) shop_name = item.get("merchantName") shop_info_dict = { "shop_id": shop_id, "shop_name": shop_name, "fans_num": fans_num, "group_num": group_num, "create_time": create_time } sql_pool.insert_one_or_dict("leka_shop_record", shop_info_dict) sql_shop_list.append(shop_id) def get_product(log, items, sql_pool, last_product_id, sql_product_id_list): if not items: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found") return stop_page = True for item in items: product_id = item.get("code") if not product_id: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No product_id found") continue # 查询商家id在不在数据库中 # sql_exists_flag = """SELECT EXISTS (SELECT 1 FROM leka_product_record WHERE product_id = %s) AS exists_flag""" # exists_flag = sql_pool.select_one(sql_exists_flag, (product_id,)) # exists_flag = exists_flag[0] # if exists_flag == 1: if product_id in sql_product_id_list: log.debug( f"----------------- The product_id {product_id} is already in the database, Not need save -----------------") else: sql_pool.insert_one_or_dict("leka_product_record", {"product_id": product_id}) sql_product_id_list.append(product_id) # try: # get_product_details(log, product_id, sql_pool) # # # get_player_list(log, product_id, sql_pool) # except Exception as e: # log.error(f"Error fetching product {product_id}: {e}") # continue # 判断是否是昨天的最后一条id, 如果是 则停止翻页 if product_id == last_product_id: log.info( f"----------------- The product_id {product_id} is the last product_id:{last_product_id}, stop fetching -----------------") stop_page = False return stop_page def get_all_sold_data(log, sql_pool, last_product_id, sql_shop_list, sql_product_id_list): """ 获取 全部类别的已售数据 :param sql_pool: MySQL连接池对象 :param log: logger对象 :param last_product_id: last_product_id :param sql_shop_list: sql_shop_list :param sql_product_id_list: sql_product_id_list """ page = 1 # page = 246 max_page = 200 last_id = 0 lastSalePrice = '' while page <= max_page: # while True: items, total_page, total = get_all_sold_one_page(log, page, last_id, lastSalePrice) if not items: break # 处理 items 数据 parse_shop_items(log, items, sql_pool, sql_shop_list) stop_page = get_product(log, items, sql_pool, last_product_id, sql_product_id_list) if not stop_page: break # 更新lastId为最后一条的userId last_id = items[-1].get("id") lastSalePrice = items[-1].get("unitPriceStr") if not last_id: log.error("API response missing userId in last item, cannot paginate") break if not lastSalePrice: log.error("API response missing lastSalePrice in last item, cannot paginate") break page += 1 # time.sleep(random.uniform(1, 3)) log.info(f"Finished fetching all data. Total pages: {total_page}, total items: {total}") @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def leka_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: token = sql_pool.select_one("SELECT token FROM leka_token") token = token[0] try: # 获取最后一条pid的数据 last_product_id = sql_pool.select_one( "SELECT product_id FROM leka_product_record ORDER BY finish_time DESC LIMIT 1") last_product_id = last_product_id[0] # 获取shop_list sql_shop_list = sql_pool.select_all("SELECT shop_id FROM leka_shop_record") sql_shop_list = [item[0] for item in sql_shop_list] # 获取 product_id_list sql_product_id_list = sql_pool.select_all("SELECT product_id FROM leka_product_record") sql_product_id_list = [item[0] for item in sql_product_id_list] get_all_sold_data(log, sql_pool, last_product_id, sql_shop_list, sql_product_id_list) sql_shop_list.clear() sql_product_id_list.clear() except Exception as e: log.error(f"Error fetching last_product_id: {e}") time.sleep(5) # 获取商品详情 try: get_product_detail_list(log, sql_pool,token) except Exception as e: log.error(f"Error fetching product_detail_list: {e}") time.sleep(5) # 获取商品玩家 try: get_players(log, sql_pool,token) except Exception as e: log.error(f"Error fetching players: {e}") time.sleep(5) #获取拆卡报告 try: get_reports(log, sql_pool,token) except Exception as e: log.error(f"Error fetching reports: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': leka_main(logger) # get_all_sold_one_page(logger, 1, 0, '')