| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/12/22 10:44
- import time
- from mysql_pool import MySQLConnectionPool
- from settings import *
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_all_sold_one_page(log, page: int, last_id, lastSalePrice):
- """
- 获取指定页面的已售数据
- :param log: logger对象
- :param page: 页码
- :param last_id: last_id
- :param lastSalePrice: lastSalePrice
- :return: 该页的数据, totalPage, total
- """
- log.info(f"Starting < get_all_sold_one_page > to fetch page {page}")
- url = "https://api.joycard.xyz/api/front/c/product/productShowList"
- # https://api.joycard.xyz/api/front/c/product/productShowList
- data = {
- "lastId": last_id,
- "lastSalePrice": lastSalePrice,
- "limit": 20,
- "openMode": "",
- "page": page,
- "saleStatus": "2",
- "sort": "0"
- }
- # {
- # "lastId": 0,
- # "lastSalePrice": "",
- # "limit": 20,
- # "openMode": "",
- # "page": 1,
- # "saleStatus": "1",
- # "sort": "0"
- # }
-
- try:
- # data = json.dumps(data, separators=(',', ':'))
- response = make_request(log, 'POST', url, data=data)
- # print(response)
- if response:
- items = response["data"]["list"]
- total_page = response["data"]["totalPage"]
- total = response["data"]["total"]
- log.info(f"Successfully fetched page {page}: {len(items)} items")
- return items, total_page, total
- else:
- return [], 0, 0
- except requests.exceptions.RequestException as e:
- log.error(f"Error fetching page {page}: {e}")
- raise e
- except ValueError as e:
- log.error(f"Error parsing JSON for page {page}: {e}")
- raise e
- def get_shop_detail(log, shop_id):
- """
- 获取店铺详情信息
- :param log:
- :param shop_id:
- :return:
- """
- log.info(f"Start fetching shop {shop_id}")
- url = f"https://api.joycard.xyz/api/front/c/merchant/{shop_id}"
- try:
- response = make_request(log, 'GET', url)
- if response:
- item = response["data"]
- fans_num = item.get("fansNum")
- group_num = item.get("salesQuantity")
- create_time = item.get("createTime")
- log.info(f"Successfully fetched shop {shop_id}")
- return fans_num, group_num, create_time
- else:
- return None, None, None
- except Exception as e:
- log.error(f"Error fetching shop {shop_id}: {e}")
- return None, None, None
- def parse_shop_items(log, items, sql_pool):
- if not items:
- log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found")
- return
-
- info_list = []
- for item in items:
- shop_id = item.get("merchantCode")
-
- fans_num, group_num, create_time = get_shop_detail(log, shop_id)
-
- shop_name = item.get("merchantName")
- shop_info_dict = {
- "shop_id": shop_id,
- "shop_name": shop_name,
- "fans_num": fans_num,
- "group_num": group_num,
- "create_time": create_time
- }
- info_list.append(shop_info_dict)
-
- # 插入数据
- if info_list:
- sql_pool.insert_many(table="yueka_shop_record", data_list=info_list, ignore=True)
- def get_product(log, items, sql_pool, last_product_id):
- if not items:
- log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found")
- return True
-
- should_stop = False
- info_list = []
- for item in items:
- product_id = item.get("code")
- if not product_id:
- log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No product_id found")
- continue
- info_list.append(
- {
- "product_id": product_id,
- }
- )
-
- # 判断是否是昨天的最后一条id, 如果是 则停止翻页 / 只有当 last_product_id 不为 None 时才判断是否停止翻页
- if last_product_id and product_id == last_product_id:
- log.info(
- f"----------------- The product_id {product_id} is the last product_id:{last_product_id}, stop fetching -----------------")
- should_stop = True
-
- # 插入数据
- if info_list:
- sql_pool.insert_many(table="yueka_product_record", data_list=info_list, ignore=True)
-
- # 如果 items 数量小于 20,说明已经到达最后一页
- if len(items) < 20:
- log.info(
- f"----------------- {len(items)} items found, less than 20, stop fetching -----------------")
- should_stop = True
-
- return should_stop
- def get_all_sold_data(log, sql_pool, last_product_id):
- """
- 获取 全部类别的已售数据
- :param sql_pool: MySQL连接池对象
- :param log: logger对象
- :param last_product_id: last_product_id,如果为 None 则表示从头开始采集
- """
- page = 1
- max_page = 500
- last_id = 0
- lastSalePrice = ''
-
- while page <= max_page:
- items, total_page, total = get_all_sold_one_page(log, page, last_id, lastSalePrice)
- if not items:
- break
-
- # 处理 items 数据
- parse_shop_items(log, items, sql_pool)
-
- stop_page = get_product(log, items, sql_pool, last_product_id)
- if stop_page:
- log.info(
- f"----------------- The product_id {last_product_id} is the last product_id, stop fetching -----------------")
- break
-
- # 更新lastId为最后一条的userId
- last_id = items[-1].get("id")
- lastSalePrice = items[-1].get("unitPriceStr")
- if not last_id:
- log.error("API response missing userId in last item, cannot paginate")
- break
- if not lastSalePrice:
- log.error("API response missing lastSalePrice in last item, cannot paginate")
- break
-
- page += 1
-
- log.info(f"Finished fetching all data. Total pages: {total_page}, total items: {total}")
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def yueka_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
-
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
-
- try:
- token = sql_pool.select_one("SELECT token FROM yueka_token")
- token = token[0]
- try:
- # 获取最后一条pid的数据
- last_product_id_result = sql_pool.select_one(
- "SELECT product_id FROM yueka_product_record ORDER BY finish_time DESC LIMIT 1")
- # 如果表中没有数据,last_product_id_result 为 None
- last_product_id = last_product_id_result[0] if last_product_id_result else None
-
- get_all_sold_data(log, sql_pool, last_product_id)
- except Exception as e:
- log.error(f"Error fetching last_product_id: {e}")
- time.sleep(5)
-
- # 获取商品详情
- try:
- get_product_detail_list(log, sql_pool, token)
- except Exception as e:
- log.error(f"Error fetching product_detail_list: {e}")
- time.sleep(5)
-
- # 获取商品玩家
- try:
- get_players(log, sql_pool, token)
- except Exception as e:
- log.error(f"Error fetching players: {e}")
- time.sleep(5)
-
- # 获取拆卡报告
- try:
- get_reports(log, sql_pool, token)
- except Exception as e:
- log.error(f"Error fetching reports: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- if __name__ == '__main__':
- yueka_main(logger)
- # get_all_sold_one_page(logger, 1, 0, '')
|