| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/3/24 14:02
- import json
- import random
- import time
- from mysql_pool import MySQLConnectionPool
- from settings import *
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_all_sold_one_page(log, page: int, last_id, lastSalePrice):
- """
- 获取指定页面的已售数据
- :param log: logger对象
- :param page: 页码
- :param last_id: last_id
- :param lastSalePrice: lastSalePrice
- :return: 该页的数据, totalPage, total
- """
- log.info(f"Starting < get_all_sold_one_page > to fetch page {page}")
- url = "https://api.luckycards.com.cn/api/front/c/product/productShowList"
- data = {
- "lastId": last_id,
- "lastSalePrice": lastSalePrice,
- "limit": 20,
- "openMode": "",
- "page": page,
- "saleStatus": "2",
- "sort": "0"
- }
- try:
- # data = json.dumps(data, separators=(',', ':'))
- response = make_request(log, 'POST', url, data=data)
- # print(response)
- if response:
- items = response["data"]["list"]
- total_page = response["data"]["totalPage"]
- total = response["data"]["total"]
- log.info(f"Successfully fetched page {page}: {len(items)} items")
- return items, total_page, total
- else:
- return [], 0, 0
- except requests.exceptions.RequestException as e:
- log.error(f"Error fetching page {page}: {e}")
- raise e
- except ValueError as e:
- log.error(f"Error parsing JSON for page {page}: {e}")
- raise e
- def get_shop_detail(log, shop_id):
- """
- 获取店铺详情信息
- :param log:
- :param shop_id:
- :return:
- """
- log.info(f"Start fetching shop {shop_id}")
- url = f"https://api.luckycards.com.cn/api/front/c/merchant/{shop_id}"
- try:
- response = make_request(log, 'GET', url)
- if response:
- item = response["data"]
- fans_num = item.get("fansNum")
- group_num = item.get("salesQuantity")
- create_time = item.get("createTime")
- log.info(f"Successfully fetched shop {shop_id}")
- return fans_num, group_num, create_time
- else:
- return None, None, None
- except Exception as e:
- log.error(f"Error fetching shop {shop_id}: {e}")
- return None, None, None
- def parse_shop_items(log, items, sql_pool, sql_shop_list):
- if not items:
- log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found")
- return
- for item in items:
- shop_id = item.get("merchantCode")
- # fans_num, group_num, create_time = get_shop_detail(log, shop_id)
- # 查询商家id在不在数据库中 如果在数据库中则更新数据 不在数据库中则插入数据
- # sql_exists_flag = """SELECT EXISTS (SELECT 1 FROM leka_shop_record WHERE shop_id = %s) AS exists_flag"""
- # exists_flag = sql_pool.select_one(sql_exists_flag, (shop_id,))
- # exists_flag = exists_flag[0]
- # if exists_flag == 1:
- if shop_id in sql_shop_list:
- log.debug(
- f"----------------- The shop_id {shop_id} is already in the database, Not need save -----------------")
- # sql_pool.update_one(
- # "UPDATE leka_shop_record SET fans_num = %s, group_num = %s, create_time = %s WHERE shop_id = %s",
- # (fans_num, group_num, create_time, shop_id))
- else:
- fans_num, group_num, create_time = get_shop_detail(log, shop_id)
- shop_name = item.get("merchantName")
- shop_info_dict = {
- "shop_id": shop_id,
- "shop_name": shop_name,
- "fans_num": fans_num,
- "group_num": group_num,
- "create_time": create_time
- }
- sql_pool.insert_one_or_dict("leka_shop_record", shop_info_dict)
- sql_shop_list.append(shop_id)
- def get_product(log, items, sql_pool, last_product_id, sql_product_id_list):
- if not items:
- log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found")
- return
- stop_page = True
- for item in items:
- product_id = item.get("code")
- if not product_id:
- log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No product_id found")
- continue
- # 查询商家id在不在数据库中
- # sql_exists_flag = """SELECT EXISTS (SELECT 1 FROM leka_product_record WHERE product_id = %s) AS exists_flag"""
- # exists_flag = sql_pool.select_one(sql_exists_flag, (product_id,))
- # exists_flag = exists_flag[0]
- # if exists_flag == 1:
- if product_id in sql_product_id_list:
- log.debug(
- f"----------------- The product_id {product_id} is already in the database, Not need save -----------------")
- else:
- sql_pool.insert_one_or_dict("leka_product_record", {"product_id": product_id})
- sql_product_id_list.append(product_id)
- # try:
- # get_product_details(log, product_id, sql_pool)
- #
- # # get_player_list(log, product_id, sql_pool)
- # except Exception as e:
- # log.error(f"Error fetching product {product_id}: {e}")
- # continue
- # 判断是否是昨天的最后一条id, 如果是 则停止翻页
- if product_id == last_product_id:
- log.info(
- f"----------------- The product_id {product_id} is the last product_id:{last_product_id}, stop fetching -----------------")
- stop_page = False
- return stop_page
- def get_all_sold_data(log, sql_pool, last_product_id, sql_shop_list, sql_product_id_list):
- """
- 获取 全部类别的已售数据
- :param sql_pool: MySQL连接池对象
- :param log: logger对象
- :param last_product_id: last_product_id
- :param sql_shop_list: sql_shop_list
- :param sql_product_id_list: sql_product_id_list
- """
- page = 1
- # page = 246
- max_page = 200
- last_id = 0
- lastSalePrice = ''
- while page <= max_page:
- # while True:
- items, total_page, total = get_all_sold_one_page(log, page, last_id, lastSalePrice)
- if not items:
- break
- # 处理 items 数据
- parse_shop_items(log, items, sql_pool, sql_shop_list)
- stop_page = get_product(log, items, sql_pool, last_product_id, sql_product_id_list)
- if not stop_page:
- break
- # 更新lastId为最后一条的userId
- last_id = items[-1].get("id")
- lastSalePrice = items[-1].get("unitPriceStr")
- if not last_id:
- log.error("API response missing userId in last item, cannot paginate")
- break
- if not lastSalePrice:
- log.error("API response missing lastSalePrice in last item, cannot paginate")
- break
- page += 1
- # time.sleep(random.uniform(1, 3))
- log.info(f"Finished fetching all data. Total pages: {total_page}, total items: {total}")
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def leka_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- token = sql_pool.select_one("SELECT token FROM leka_token")
- token = token[0]
- try:
- # 获取最后一条pid的数据
- last_product_id = sql_pool.select_one(
- "SELECT product_id FROM leka_product_record ORDER BY finish_time DESC LIMIT 1")
- last_product_id = last_product_id[0]
- # 获取shop_list
- sql_shop_list = sql_pool.select_all("SELECT shop_id FROM leka_shop_record")
- sql_shop_list = [item[0] for item in sql_shop_list]
- # 获取 product_id_list
- sql_product_id_list = sql_pool.select_all("SELECT product_id FROM leka_product_record")
- sql_product_id_list = [item[0] for item in sql_product_id_list]
- get_all_sold_data(log, sql_pool, last_product_id, sql_shop_list, sql_product_id_list)
- sql_shop_list.clear()
- sql_product_id_list.clear()
- except Exception as e:
- log.error(f"Error fetching last_product_id: {e}")
- # time.sleep(5)
- #
- # # 获取商品详情
- # try:
- # get_product_detail_list(log, sql_pool,token)
- # except Exception as e:
- # log.error(f"Error fetching product_detail_list: {e}")
- # time.sleep(5)
- #
- # # 获取商品玩家
- # try:
- # get_players(log, sql_pool,token)
- # except Exception as e:
- # log.error(f"Error fetching players: {e}")
- # time.sleep(5)
- #
- # #获取拆卡报告
- # try:
- # get_reports(log, sql_pool,token)
- # except Exception as e:
- # log.error(f"Error fetching reports: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- if __name__ == '__main__':
- leka_main(logger)
- # get_all_sold_one_page(logger, 1, 0, '')
|