| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/8/4 16:08
- import random
- import time
- import requests
- import inspect
- import schedule
- from loguru import logger
- from parsel import Selector
- from tenacity import retry, stop_after_attempt, wait_fixed
- from mysql_pool import MySQLConnectionPool
- # logger.remove()
- # logger.add("./logs/id_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- # format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- # level="DEBUG", retention="7 day")
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_product_detail(log, product_id, sql_pool):
- log.debug(f'--------------- {inspect.currentframe().f_code.co_name}, product_id {product_id} start ---------------')
- headers = {
- "Accept": "application/json, text/plain, */*",
- "Accept-Language": "en,zh-CN;q=0.9,zh;q=0.8",
- # "Authorization": "Bearer 4001|rZwBadHCeDlJTRK52IFVpvcuay2hQjYFDLMO72xo",
- "Connection": "keep-alive",
- "Content-Type": "application/json",
- "Referer": "https://www.urboxwin.com/",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
- }
- url = "https://jp.urboxwin.com/webapi/breaking/detail"
- data = {
- # "id": "353"
- "id": f"{product_id}"
- }
- response = requests.post(url, headers=headers, json=data)
- # print(response.text)
- logger.debug(response.json())
- # print(response)
- # time.sleep(11111)
- logger.debug(response.status_code)
- response.raise_for_status()
- resp_json = response.json()
- if resp_json.get("error"):
- log.debug(f"product_id:{product_id}, message:{resp_json.get('error', {}).get('message', '')}")
- return
- # time.sleep(11111)
- json_data = resp_json.get("data", {})
- sale_status = json_data.get("sale_status")
- card_number = json_data.get("number")
- tags = json_data.get("tags", [])
- tags_name_list = [tag.get("name") for tag in tags] if tags else []
- tags_name = " ".join(tags_name_list) if tags_name_list else ""
- title = json_data.get("title")
- score = json_data.get("score")
- available_at = json_data.get("available_at")
- end_at = json_data.get("end_at")
- buy_count = json_data.get("buy_count") # str
- total_stock = json_data.get("total_stock") # 总库存量
- available_stock = json_data.get("available_stock") # 可用库存量
- # 使用 parsel 去除 HTML 标签
- try:
- detail = json_data.get("detail")
- if not detail:
- clean_detail = ""
- else:
- # 确保是字符串类型
- if isinstance(detail, bytes):
- detail = detail.decode('utf-8')
- selector = Selector(text=detail)
- text_nodes = selector.xpath('//text()').getall()
- clean_detail = ''.join(text_nodes).strip()
- except UnicodeDecodeError as e:
- log.error(f"商品 {product_id} 的detail字段编码错误: {e}")
- clean_detail = ""
- except Exception as e:
- log.error(f"商品 {product_id} 的detail字段解析失败: {e}")
- clean_detail = ""
- cover_image = json_data.get("cover_image", {}).get('thumb_o_file') # 列表页图片
- top_images = json_data.get("top_images", [])
- top_images = [item.get('thumb_o_file') for item in top_images]
- top_images = '|'.join(top_images) # 多图
- detail_images = json_data.get("detail_images", [])
- detail_images = [item.get('thumb_o_file') for item in detail_images]
- detail_images = '|'.join(detail_images) # 详情信息图片
- current_at = json_data.get("current_at")
- live_time = json_data.get("live_time")
- stock_percentage = json_data.get("stock_percentage") # 库存百分比
- live_link = json_data.get("live_link")
- buy_mode = json_data.get("buy_mode") # 购买模式
- data_dict = {
- "goods_id": product_id,
- "sale_status": sale_status,
- "card_number": card_number,
- "tags_name": tags_name,
- "title": title,
- "score": score,
- "available_at": available_at,
- "end_at": end_at,
- "buy_count": buy_count,
- "total_stock": total_stock,
- "available_stock": available_stock,
- "detail_str": clean_detail,
- "cover_image": cover_image,
- "top_images": top_images,
- "detail_images": detail_images,
- "current_at": current_at,
- "live_time": live_time,
- "stock_percentage": stock_percentage,
- "live_link": live_link,
- "buy_mode": buy_mode
- }
- # print(data_dict)
- # 根据商品id更新数据库
- # sql_pool.update_one_or_dict(table="urbox_product_record", data=data_dict, condition={"goods_id": product_id})
- sql_pool.insert_one_or_dict(table="urbox_product_record", data=data_dict)
- def get_player_list(log, goods_id, sql_pool):
- page = 1
- while page <= 500:
- try:
- log.debug(
- f'--------------- {inspect.currentframe().f_code.co_name}, page {page}, goods_id {goods_id} start ---------------')
- len_items = get_player_single_page(log, goods_id, sql_pool, page)
- except Exception as e:
- log.error(
- f"{inspect.currentframe().f_code.co_name} Request get_player_single_page for page:{page}, {e}")
- len_items = 0
- if len_items < 20:
- log.debug(f'--------------- page {page} has {len_items} items, break ---------------')
- break
- page += 1
- # 设置等待时间 避免查询太频繁
- time.sleep(random.uniform(0.5, 1))
- # 更新商品状态
- sql_pool.update_one_or_dict(table="urbox_product_record", data={"state": 1}, condition={"goods_id": goods_id})
- def get_player_single_page(log, goods_id, sql_pool, page):
- headers = {
- "Accept": "application/json, text/plain, */*",
- "Accept-Language": "en,zh-CN;q=0.9,zh;q=0.8",
- "Authorization": "Bearer 4001|rZwBadHCeDlJTRK52IFVpvcuay2hQjYFDLMO72xo",
- "Connection": "keep-alive",
- "Content-Type": "application/json",
- "Referer": "https://www.urboxwin.com/",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36",
- }
- url = "https://jp.urboxwin.com/webapi/breaking/report"
- data = {
- # "page": 1,
- "page": page,
- "per_page": 20,
- "id": f"{goods_id}"
- # "id": "353"
- }
- response = requests.post(url, headers=headers, json=data)
- # print(response.text)
- response.raise_for_status()
- resp_json = response.json()
- json_list = resp_json.get("data", {}).get("data", [])
- if not json_list:
- log.debug("没有数据")
- return 0
- info_list = []
- for item in json_list:
- data_id = item.get("id")
- user_name = item.get("user", {}).get("name")
- title = item.get("title")
- is_win_prize = item.get("is_win_prize") # 是否中奖
- win_prize_text = item.get("win_prize_text") # 中奖信息
- draw_finish_at = item.get("draw_finish_at") # 开奖时间
- prize_image = item.get("prize_image", {}) # 奖品图片
- prize_image = prize_image.get('thumb_o_file') if prize_image else ""
- data_dict = {
- "goods_id": goods_id,
- "data_id": data_id,
- "user_name": user_name,
- "title": title,
- "is_win_prize": is_win_prize,
- "win_prize_text": win_prize_text,
- "draw_finish_at": draw_finish_at,
- "prize_image": prize_image
- }
- # print(data_dict)
- info_list.append(data_dict)
- if info_list:
- try:
- sql_pool.insert_many(table="urbox_player_record", data_list=info_list, ignore=True)
- except Exception as e:
- log.error(f"商品 {goods_id} 的player_record数据插入失败: {e}")
- return len(json_list)
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def urbox_id_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务.................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- # 查询商品详情数据
- log.debug(f'Request start for product detail.........')
- # 查询库中的最大的商品id
- sql_max = "SELECT MAX(goods_id) AS max_goods_id FROM urbox_product_record"
- max_goods_id = sql_pool.select_one(sql_max)
- max_goods_id = max_goods_id[0]
- # print("max_goods_id: ", max_goods_id)
- log.debug(f'max_goods_id: {max_goods_id}')
- # 查询库中的goods_id 拿到不在库中的goods_id列表
- sql_goods_id_list = sql_pool.select_all("select goods_id from urbox_product_record")
- sql_goods_id_list = [item[0] for item in sql_goods_id_list]
- goods_id_list = []
- for i in range(1, max_goods_id + 50):
- if i not in sql_goods_id_list:
- goods_id_list.append(i)
- logger.debug(f'goods_id_list: {goods_id_list}')
- for goods_id in goods_id_list:
- try:
- get_product_detail(log, goods_id, sql_pool)
- except Exception as e:
- log.error(
- f"{inspect.currentframe().f_code.co_name} Request get_product_detail for goods_id:{goods_id} error: {e}")
- # 设置等待时间 避免查询太频繁
- time.sleep(random.uniform(0.5, 1))
- log.success(f'Request product detail end.................................................')
- # 查询 player 数据, 先查询 urbox_product_record 表中 状态为0的
- log.debug(f'Request start for player list.........')
- sql_goods_id_list_player = sql_pool.select_all(
- query="SELECT goods_id FROM urbox_product_record WHERE state = 0")
- sql_goods_id_list_player = [item[0] for item in sql_goods_id_list_player]
- for goods_id in sql_goods_id_list_player:
- try:
- get_player_list(log, goods_id, sql_pool)
- except Exception as e:
- log.error(
- f"{inspect.currentframe().f_code.co_name} Request get_player_list for goods_id:{goods_id} error: {e}")
- log.success(f'Request player list end.................................................')
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- def schedule_task():
- """
- 爬虫模块 定时任务 的启动文件
- """
- # 立即运行一次任务
- urbox_id_main(log=logger)
- # 设置定时任务
- schedule.every().day.at("00:01").do(urbox_id_main, log=logger)
- while True:
- schedule.run_pending()
- time.sleep(1)
- if __name__ == '__main__':
- # get_product_list()
- # get_product_detail(logger,354, None)
- # get_player_list(logger, 350, None)
- # schedule_task()
- urbox_id_main(log=logger)
|