| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/8/12 16:59
- import random
- import time
- import inspect
- import requests
- import schedule
- from loguru import logger
- from mysql_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- # logger.remove()
- # logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- # level="DEBUG", retention="7 day")
- category = "潮玩"
- max_page = 50
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- def get_single_page(log, page_no):
- headers = {
- "User-Agent": "okhttp/4.10.0",
- "Accept-Encoding": "gzip",
- "Content-Type": "application/json",
- # "x-access-token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiIxIiwiZXhwIjoxNzU1MTkyNDI2LCJ1c2VybmFtZSI6ImNoYXJsZXlfbGVvQDE2My5jb20ifQ.byBS1zj-LyD1mKHrCx9eLy5X2d0QzTO0FwApj2egSVI",
- "country": "1",
- "lang": "zh",
- "platform": "Android",
- "content-type": "application/json; charset=UTF-8"
- }
- url = "https://cp.hoopi.xyz/hoopiserver/hoopi/api/toy/listToys"
- data = {
- "sortType": "2",
- "containType": [
- "0"
- ],
- "pageNo": page_no,
- "pageSize": 10
- }
- response = requests.post(url, headers=headers, json=data)
- print(response.text)
- response.raise_for_status()
- if response.status_code == 200:
- result = response.json()
- if result["success"]:
- return result["result"]
- else:
- log.warning(f"result_message: {result['message']}")
- else:
- log.warning(f" {inspect.currentframe().f_code.co_name} Request failed with status code: {response.status_code}")
- return None
- def parse_list_items(log, items, sql_pool):
- log.info(f"{inspect.currentframe().f_code.co_name} Start parsing items")
- if items:
- info_list = []
- for item in items:
- item_id = item.get('id')
- info_list.append(item_id)
- if info_list:
- sql_pool.insert_many(table="hoopi_mall_record", data_list=info_list, ignore=True)
- else:
- log.warning(f" {inspect.currentframe().f_code.co_name} No items found")
- def get_mall_sold_list(log, sql_pool):
- page = 1
- total_items = 0
- # while True:
- while page <= max_page:
- result = get_single_page(log, page)
- if result is None:
- break
- items = result.get("list", [])
- if not items:
- log.warning("No items found on page %s", page)
- break
- try:
- parse_list_items(log, items, sql_pool)
- except Exception as e:
- log.error("Error parsing items on page %s: %s", page, e)
- total_items += len(items)
- pages = result.get("pages")
- total = result.get("total")
- # 判断条件 1: 根据 pages 判断
- if pages is not None and page >= pages:
- log.debug("已爬取 %s 页,共 %s 页" % (page, pages))
- break
- # 判断条件 2: 根据 list 的长度判断
- if len(items) < 10: # pageSize 为 10
- log.debug("已获取数据量小于15,停止爬取......................")
- break
- # 判断条件 3: 根据 total 和已获取数据量判断
- if total is not None and total_items >= total:
- log.debug("已获取数据量已满足要求,停止爬取......................")
- break
- page += 1
- # time.sleep(random.uniform(0.1, 0.5)) # 添加延时,避免频繁请求
- def parse_detail(log, item, sql_pool, item_id):
- log.debug("开始解析详情页数据........................")
- try:
- title = item.get('name')
- shopId = item.get('shopId')
- shopAppUserId = item.get('shopAppUserId')
- shopName = item.get('shopName')
- infoImgs = item.get('infoImgs') # 详情图片, 多图, 逗号分隔
- cardTypeName = item.get('cardTypeName') # 卡类型
- explainIntroduce = item.get('explainIntroduce') # 描述
- price = item.get('price')
- freightPrice = item.get('freightPrice') # 运费
- currency = item.get('currency') # 币种
- soldCount = item.get('soldCount') # 售出计数
- sellOffCount = item.get('sellOffCount') # 抛售计数
- status = item.get('status') # 2:售罄
- finishTime = item.get('finishTime')
- conditionTypeName = item.get('conditionTypeName') # 评级/状况
- countryName = item.get('countryName') # 国家
- shopSoldCount = item.get('shopSoldCount') # 店铺已售
- data_dict = {
- 'title': title,
- "shop_id": shopId,
- 'shop_name': shopName,
- 'shop_app_user_id': shopAppUserId,
- 'info_imgs': infoImgs,
- 'card_type_name': cardTypeName,
- 'explain_introduce': explainIntroduce,
- 'price': price,
- 'freight_price': freightPrice,
- 'currency': currency,
- 'sold_count': soldCount,
- 'sell_off_count': sellOffCount,
- 'status': status,
- 'finish_time': finishTime,
- 'condition_type_name': conditionTypeName,
- 'country_name': countryName,
- 'shop_sold_count': shopSoldCount
- }
- print(data_dict)
- # try:
- # sql_pool.update_one_or_dict(table='hoopi_mall_record', data=data_dict, condition={'item_id': item_id})
- # except Exception as e:
- # log.error(f'解析详情页数据 update_one_or_dict 报错:{e[:500]}')
- except Exception as e:
- log.error(f'解析详情页数据error, {e[:500]}')
- def get_detail(log, item_id, sql_pool):
- headers = {
- "User-Agent": "okhttp/4.10.0",
- "Accept-Encoding": "gzip",
- # "x-access-token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJsb2dpblR5cGUiOiIxIiwiZXhwIjoxNzU1MTkyNDI2LCJ1c2VybmFtZSI6ImNoYXJsZXlfbGVvQDE2My5jb20ifQ.byBS1zj-LyD1mKHrCx9eLy5X2d0QzTO0FwApj2egSVI",
- "country": "1",
- "lang": "zh",
- "platform": "Android",
- "content-length": "0"
- }
- url = "https://cp.hoopi.xyz/hoopiserver/hoopi/api/goods/getGoodsInfo/1954822331293704194"
- # url = f"https://cp.hoopi.xyz/hoopiserver/hoopi/api/goods/getGoodsInfo/{item_id}"
- response = requests.post(url, headers=headers)
- print(response.text)
- response.raise_for_status()
- data = response.json()
- result = data.get("result", {})
- parse_detail(log, result, sql_pool, item_id)
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def hoopi_mall_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- try:
- pass
- except Exception as e:
- log.error(f"Request get_shop_data_list error: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- # EmailSender().send(subject="【千岛 拍卖 - 爬虫通知】今日任务已完成",
- # content="数据采集和处理已全部完成,请查收结果。\n\n ------ 来自 Python 爬虫系统。")
- def schedule_task():
- """
- 爬虫模块 定时任务 的启动文件
- """
- # 立即运行一次任务
- # hoopi_mall_main(log=logger)
- # 设置定时任务
- schedule.every().day.at("01:06").do(hoopi_mall_main, log=logger)
- while True:
- schedule.run_pending()
- time.sleep(1)
- if __name__ == '__main__':
- # get_mall_sold_list(logger, None)
- get_detail(logger, "1954822331293704194", None)
|