# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/12/22 10:44 import time from mysql_pool import MySQLConnectionPool from settings import * @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_all_sold_one_page(log, page: int, last_id, lastSalePrice): """ 获取指定页面的已售数据 :param log: logger对象 :param page: 页码 :param last_id: last_id :param lastSalePrice: lastSalePrice :return: 该页的数据, totalPage, total """ log.info(f"Starting < get_all_sold_one_page > to fetch page {page}") url = "https://api.joycard.xyz/api/front/c/product/productShowList" # https://api.joycard.xyz/api/front/c/product/productShowList data = { "lastId": last_id, "lastSalePrice": lastSalePrice, "limit": 20, "openMode": "", "page": page, "saleStatus": "2", "sort": "0" } # { # "lastId": 0, # "lastSalePrice": "", # "limit": 20, # "openMode": "", # "page": 1, # "saleStatus": "1", # "sort": "0" # } try: # data = json.dumps(data, separators=(',', ':')) response = make_request(log, 'POST', url, data=data) # print(response) if response: items = response["data"]["list"] total_page = response["data"]["totalPage"] total = response["data"]["total"] log.info(f"Successfully fetched page {page}: {len(items)} items") return items, total_page, total else: return [], 0, 0 except requests.exceptions.RequestException as e: log.error(f"Error fetching page {page}: {e}") raise e except ValueError as e: log.error(f"Error parsing JSON for page {page}: {e}") raise e def get_shop_detail(log, shop_id): """ 获取店铺详情信息 :param log: :param shop_id: :return: """ log.info(f"Start fetching shop {shop_id}") url = f"https://api.joycard.xyz/api/front/c/merchant/{shop_id}" try: response = make_request(log, 'GET', url) if response: item = response["data"] fans_num = item.get("fansNum") group_num = item.get("salesQuantity") create_time = item.get("createTime") log.info(f"Successfully fetched shop {shop_id}") return fans_num, group_num, create_time else: return None, None, None except Exception as e: log.error(f"Error fetching shop {shop_id}: {e}") return None, None, None def parse_shop_items(log, items, sql_pool): if not items: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found") return info_list = [] for item in items: shop_id = item.get("merchantCode") fans_num, group_num, create_time = get_shop_detail(log, shop_id) shop_name = item.get("merchantName") shop_info_dict = { "shop_id": shop_id, "shop_name": shop_name, "fans_num": fans_num, "group_num": group_num, "create_time": create_time } info_list.append(shop_info_dict) # 插入数据 if info_list: sql_pool.insert_many(table="yueka_shop_record", data_list=info_list, ignore=True) def get_product(log, items, sql_pool, last_product_id): if not items: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found") return True should_stop = False info_list = [] for item in items: product_id = item.get("code") if not product_id: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No product_id found") continue info_list.append( { "product_id": product_id, } ) # 判断是否是昨天的最后一条id, 如果是 则停止翻页 / 只有当 last_product_id 不为 None 时才判断是否停止翻页 if last_product_id and product_id == last_product_id: log.info( f"----------------- The product_id {product_id} is the last product_id:{last_product_id}, stop fetching -----------------") should_stop = True # 插入数据 if info_list: sql_pool.insert_many(table="yueka_product_record", data_list=info_list, ignore=True) # 如果 items 数量小于 20,说明已经到达最后一页 if len(items) < 20: log.info( f"----------------- {len(items)} items found, less than 20, stop fetching -----------------") should_stop = True return should_stop def get_all_sold_data(log, sql_pool, last_product_id): """ 获取 全部类别的已售数据 :param sql_pool: MySQL连接池对象 :param log: logger对象 :param last_product_id: last_product_id,如果为 None 则表示从头开始采集 """ page = 1 max_page = 500 last_id = 0 lastSalePrice = '' while page <= max_page: items, total_page, total = get_all_sold_one_page(log, page, last_id, lastSalePrice) if not items: break # 处理 items 数据 parse_shop_items(log, items, sql_pool) stop_page = get_product(log, items, sql_pool, last_product_id) if stop_page: log.info( f"----------------- The product_id {last_product_id} is the last product_id, stop fetching -----------------") break # 更新lastId为最后一条的userId last_id = items[-1].get("id") lastSalePrice = items[-1].get("unitPriceStr") if not last_id: log.error("API response missing userId in last item, cannot paginate") break if not lastSalePrice: log.error("API response missing lastSalePrice in last item, cannot paginate") break page += 1 log.info(f"Finished fetching all data. Total pages: {total_page}, total items: {total}") @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def yueka_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: token = sql_pool.select_one("SELECT token FROM yueka_token") token = token[0] try: # 获取最后一条pid的数据 last_product_id_result = sql_pool.select_one( "SELECT product_id FROM yueka_product_record ORDER BY finish_time DESC LIMIT 1") # 如果表中没有数据,last_product_id_result 为 None last_product_id = last_product_id_result[0] if last_product_id_result else None get_all_sold_data(log, sql_pool, last_product_id) except Exception as e: log.error(f"Error fetching last_product_id: {e}") time.sleep(5) # 获取商品详情 try: get_product_detail_list(log, sql_pool, token) except Exception as e: log.error(f"Error fetching product_detail_list: {e}") time.sleep(5) # 获取商品玩家 try: get_players(log, sql_pool, token) except Exception as e: log.error(f"Error fetching players: {e}") time.sleep(5) # 获取拆卡报告 try: get_reports(log, sql_pool, token) except Exception as e: log.error(f"Error fetching reports: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': yueka_main(logger) # get_all_sold_one_page(logger, 1, 0, '')