| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/6/17 15:56
- import datetime
- import inspect
- import time
- import schedule
- from loguru import logger
- from mysql_pool import MySQLConnectionPool
- from DrissionPage import ChromiumPage, ChromiumOptions
- from tenacity import retry, stop_after_attempt, wait_fixed
- logger.remove()
- logger.add("./logs/shopee_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_response(log, page_url) -> (None, None):
- """
- 获取页面源码
- :param log: log 对象
- :param page_url: 页面 URL
- :return: 页面源码, tag_turn_href
- """
- options = ChromiumOptions().set_paths(local_port=9130, user_data_path=r'D:\Drissionpage_temp\local_port_9130')
- # options = ChromiumOptions()
- options.set_argument("--disable-gpu")
- options.set_argument("-accept-lang=en-US")
- options.no_imgs(True)
- page = ChromiumPage(options)
- page.get(page_url)
- # try:
- page.listen.start('/api/v4/shop/rcmd_items') # 开始监听
- page.get(page_url)
- # page.wait.load_start() # 等待页面进入加载状态
- # 等待符合条件的请求
- for _ in range(10): # 最多尝试10次
- try:
- res = page.listen.wait(timeout=10) # 等待请求完成
- # print("请求信息:", res.request.postData)
- if res and res.method == 'POST':
- # 获取请求体
- post_data = res.request.postData
- if not post_data:
- log.debug("请求体为空, 重新请求..........")
- continue
- # 判断请求参数
- if post_data.get('sort_type') == 2:
- log.debug("请求参数正确, 获取响应..........")
- # 获取响应内容
- response_body = res.response.body
- # print("找到符合条件的响应:", response_body)
- return response_body
- else:
- log.debug("请求参数错误, 重新请求..........")
- continue
- except Exception as e:
- logger.error(f"等待请求超时或发生错误: {e}")
- continue
- # time.sleep(111111)
- #
- # except Exception as e:
- # log.error(f'get_response error: {e}')
- # raise 'get_response error'
- # finally:
- # page.close()
- # page.quit()
- # # driver.quit()
- def parse_data(log, resp_json, sql_pool):
- res_error = resp_json.get('error')
- if res_error == 0:
- item_cards = resp_json.get('data', {}).get('centralize_item_card', {}).get('item_cards', [])
- for item_card in item_cards:
- # 详情链接是根据 item_id 和 shop_id 拼接
- # title:POP MART Twinkle Twinkle Be a Little Star Series - Plush Pendant Blind Box
- # shop_id:458606128
- # item_id:28883839628
- # https://my.xiapibuy.com/POP-MART-Twinkle-Twinkle-Be-a-Little-Star-Series-Plush-Pendant-Blind-Box-i.458606128.28883839628
- item_id = item_card.get('itemid')
- shop_id = item_card.get('shopid')
- cat_id = item_card.get('catid')
- item_card_displayed_asset = item_card.get('item_card_displayed_asset', {})
- title = item_card_displayed_asset.get('name')
- images = item_card_displayed_asset.get('images', []) # 拼接 https://down-my.img.susercontent.com/file/
- images = "|".join(images) if images else ''
- liked_count = item_card.get('liked_count')
- status = item_card.get('status')
- ctime = item_card.get('ctime')
- if ctime:
- ctime_format = datetime.datetime.fromtimestamp(ctime).strftime('%Y-%m-%d %H:%M:%S')
- else:
- ctime_format = None
- item_status = item_card.get('item_status')
- price = item_card.get('item_card_display_price', {}).get('price')
- if price:
- price = float(price / 100000)
- price = f"{price:.2f}"
- historical_sold_count = item_card.get('item_card_display_sold_count', {}).get('historical_sold_count') # 已售
- monthly_sold_count = item_card.get('item_card_display_sold_count', {}).get('monthly_sold_count') # 月销量
- rating_text = item_card_displayed_asset.get('rating', {}).get('rating_text') # 评分
- if rating_text:
- rating_text = rating_text.replace(' 商店评价', '')
- info_dict = {
- "item_id": item_id,
- "shop_id": shop_id,
- "cat_id": cat_id,
- "title": title,
- "images": images,
- "liked_count": liked_count,
- "status": status,
- "ctime": ctime_format,
- "item_status": item_status,
- "price": price,
- "historical_sold_count": historical_sold_count,
- "monthly_sold_count": monthly_sold_count,
- "rating_text": rating_text
- }
- print(info_dict)
- else:
- log.error(f"接口返回错误: {res_error}, error_msg:{resp_json.get('error_msg')}")
- def shopee_main(log):
- # for i in range(1, 12):
- i = 1
- resp_dict = get_response(log, f'https://my.xiapibuy.com/popmartofficial.my?page={i}&sortBy=ctime&tab=0')
- print(resp_dict)
- parse_data(log, resp_dict, None)
- if __name__ == '__main__':
- shopee_main(logger)
|