| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/7/2 16:02
- import time
- from datetime import datetime
- import utils
- import inspect
- import schedule
- from loguru import logger
- from mysql_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- logger.remove()
- logger.add("./logs/mall_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- CRAWL_CATEGORY = '潮玩'
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- def get_toy_mall_list(log, sql_pool,sql_mall_list):
- page = 1
- max_page = 500
- total_count = 0
- while page <= max_page:
- len_item, totalCount = get_toy_mall_single_page(log, page, sql_pool,sql_mall_list)
- if len_item < 20:
- log.debug(
- f"--------------- page {page}, len_item: {len_item} totalCount: {totalCount} ---------------")
- break
- total_count += len_item
- if total_count >= int(totalCount):
- log.debug(f"total_count: {total_count} totalCount: {totalCount}")
- break
- page += 1
- # time.sleep(random.uniform(0.1, 1))
- def get_toy_mall_single_page(log, page, sql_pool,sql_mall_list):
- url = "https://api.qiandao.cn/shelf-web/list-home-recommender"
- params = {
- # "offset": 0,
- "offset": (page - 1) * 20,
- "limit": 20,
- "cmd": "b2c_homepage_feed",
- "name": "manghe",
- "project": "channel",
- # "type[]": "BLIND_BOX_MACHINE",
- # "type[]": "KUJI",
- # "type[]": "NEW_LUCKY_BAG",
- # "type[]": "B2C",
- "type": "",
- "withLive": "true"
- }
- try:
- resp_json = utils.request_get_data(logger, url, params, proxy=False)
- # print(resp_json)
- if not resp_json:
- log.error(f"get_toy_mall_single_page page:{page}")
- if resp_json.get("code", -1) == 0:
- rows = resp_json.get("data", {}).get("rows", [])
- total_count = resp_json.get("data", {}).get("count", 0)
- try:
- parse_mall_data(log, rows, sql_pool,sql_mall_list)
- except Exception as e:
- log.error(f"parse_mall_data error: {e}")
- return len(rows), total_count
- else:
- log.error(f"{inspect.currentframe().f_code.co_name} -> page:{page} error: {resp_json.get('message')}")
- return 0, 0
- except Exception as e:
- logger.error(f"Request failed: {e}")
- return 0, 0
- def parse_mall_data(log, rows, sql_pool,sql_mall_list):
- info_list = []
- for row in rows:
- pid = row.get("id")
- if pid in sql_mall_list:
- log.info(f"pid:{pid} is exist, skip .......")
- continue
- p_type = row.get("type")
- orgId = row.get("orgId")
- title = row.get("name")
- isSoldOut = row.get("isSoldOut")
- price = row.get("price", {}).get("unitPriceOfCash") if row.get("price") else ''
- soldAmountText = row.get("soldAmountText") # 已售
- nickname = row.get("org", {}).get("nickname", '') if row.get("org") else ''
- # sell_time = row.get("sellTime")
- # sell_time = utils.translate_s(log, sell_time) if sell_time or sell_time != '0' else None
- # images = row.get("coverImages", [])
- # images = '|'.join(images)
- data_dict = {
- "pid": pid,
- "p_type": p_type,
- "title": title,
- "price": price,
- "sold_amount_text": soldAmountText,
- "org_id": orgId,
- "nickname": nickname,
- # "description": description,
- "is_sold_out": isSoldOut,
- # "sell_time": sell_time,
- # "images": images,
- "crawl_category": CRAWL_CATEGORY
- }
- info_list.append(data_dict)
- # sql_mall_list.append(pid)
- if info_list:
- query = """
- INSERT INTO qiandao_mall_list_record
- (pid, p_type, title, price, sold_amount_text, org_id, nickname, is_sold_out, crawl_category)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- p_type = VALUES(p_type),
- title = VALUES(title),
- price = VALUES(price),
- sold_amount_text = VALUES(sold_amount_text),
- org_id = VALUES(org_id),
- nickname = VALUES(nickname),
- is_sold_out = VALUES(is_sold_out),
- crawl_category = VALUES(crawl_category)
- """
- args_list = [(item['pid'], item['p_type'], item['title'], item['price'], item['sold_amount_text'],
- item['org_id'], item['nickname'], item['is_sold_out'], item['crawl_category']) for item in
- info_list]
- try:
- sql_pool.insert_many(query=query, args_list=args_list)
- except Exception as e:
- log.error(f"parse_mall_data -> sql_pool.insert_many error: {e}")
- # ----------------------------------------------------------------------------------------------------------------------
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_group_id_list_for_pid_one_page(log, token, pid, sql_pool):
- """
- 获取 每个pid的 group_id列表
- :param log:
- :param token:
- :param pid:
- :param sql_pool:
- """
- log.debug(f"--------------- {inspect.currentframe().f_code.co_name}, pid:{pid} ---------------")
- url = "https://api.qiandao.cn/b2c-web/v1/kuji/query/group/next"
- params = {
- # "shelfId": "776833910955877820"
- "shelfId": pid
- }
- try:
- resp_json = utils.request_get_data(log, url, params, token)
- # print(resp_json)
- if resp_json.get("code") == 0:
- parse_group_id_list(log, resp_json, pid, sql_pool)
- elif resp_json.get("code", 0) == 2600030:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
- sql_pool.update_one(
- query="UPDATE qiandao_mall_list_record SET bag_state = 2 WHERE pid = %s",
- args=(pid,))
- return
- else:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
- except Exception as e:
- log.error(f"Request failed: {e}")
- def parse_group_id_list(log, resp_json, pid, sql_pool):
- log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
- data = resp_json.get("data", {}).get("groupSummary", [])
- for item in data:
- group_id = item.get("groupId")
- try:
- # 获取 该组 购买数据的最大时间。
- max_trading_time = sql_pool.select_one(
- query=f"SELECT MAX(trading_time) FROM qiandao_mall_card_bag_reward_buy_record WHERE group_id = %s AND crawl_category = '{CRAWL_CATEGORY}'",
- args=(group_id,))
- max_trading_time = max_trading_time[0] if max_trading_time else None
- get_card_bag_reward_buyer_data(log, group_id, pid, sql_pool, max_trading_time)
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_card_bag_reward_buyer_data(log, group_id, pid, sql_pool, max_trading_time):
- """
- 获取 卡包赏 购买数据(支持自动翻页)
- :param log: 日志记录器
- :param group_id: 组ID
- :param pid: pid
- :param sql_pool: sql_pool
- :param max_trading_time: max_trading_time
- """
- url = "https://api.qiandao.cn/box/kuji/query/v3/draw-records"
- limit = "10" # 每页数量
- seq = "0" # 初始seq值
- while True:
- params = {
- "groupId": group_id,
- "limit": limit,
- "seq": seq
- }
- resp_json = utils.request_get_data(log, url, params)
- if resp_json.get("code") != 0:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
- break
- # 解析并处理当前页数据
- result = parse_card_bag_reward_buyer_data(log, resp_json, pid, sql_pool, group_id, max_trading_time)
- if not result:
- log.debug("No more data to process.")
- break
- has_more, next_seq = result
- if not has_more:
- log.debug("No more pages to fetch.")
- break
- seq = next_seq
- # time.sleep(random.uniform(0.5, 1))
- def parse_card_bag_reward_buyer_data(log, resp_json, pid, sql_pool, group_id, max_trading_time):
- """
- 解析 卡包赏 购买数据,并返回是否有更多数据及下一页的seq
- :param log: 日志记录器
- :param resp_json: 响应数据
- :param pid: pid
- :param sql_pool: sql_pool
- :param group_id: group_id
- :param max_trading_time: max_trading_time
- :return: (has_more: bool, next_seq: str)
- """
- log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
- data = resp_json.get("data", {})
- records = data.get("records", [])
- if not records:
- return False, None
- info_list = []
- for item in records:
- # print(item)
- trading_time = item.get("time") # 交易时间
- trading_time_str = utils.transform_ms(log, trading_time)
- if not trading_time_str:
- log.debug(f"Invalid trading time: {trading_time_str}")
- continue # 跳过无效时间
- # 字符串 -> datetime
- trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
- if max_trading_time and trading_time <= max_trading_time:
- log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
- return False, None
- productName = item.get("productName")
- userName = item.get("userName")
- userId = item.get("userId")
- level = item.get("level") # 1代表A赏, 以此类推
- levelType = item.get("levelType")
- seq = item.get("seq") # 抽卡顺序
- buy_dict = {
- "pid": pid,
- "group_id": group_id,
- "product_name": productName,
- "user_name": userName,
- "user_id": userId,
- "level": level,
- "level_type": levelType,
- "seq": seq,
- "trading_time": trading_time_str,
- "crawl_category": CRAWL_CATEGORY
- }
- # print(buy_dict)
- info_list.append(buy_dict)
- if info_list:
- try:
- sql_pool.insert_many(table="qiandao_mall_card_bag_reward_buy_record", data_list=info_list)
- sql_pool.update_one(
- query="UPDATE qiandao_mall_list_record SET bag_state = 1 WHERE pid = %s",
- args=(pid,))
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} call insert_many error: {e}")
- has_more = data.get("hasMore", False)
- next_seq = records[-1].get("seq") if has_more else None # 取最后一条的 seq
- return has_more, next_seq
- def qd_mall_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- # 请求列表页
- try:
- sql_mall_list = sql_pool.select_all(
- f"# SELECT DISTINCT pid FROM qiandao_mall_list_record WHERE crawl_category = '{CRAWL_CATEGORY}'")
- sql_mall_list = [item[0] for item in sql_mall_list]
- get_toy_mall_list(log, sql_pool, sql_mall_list)
- # get_toy_mall_list(log, sql_pool)
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
- time.sleep(5)
- # 请求详情页
- sql_token = sql_pool.select_one("SELECT token FROM qiandao_token LIMIT 1")
- token = sql_token[0]
- sql_pid_one_list = sql_pool.select_all(
- f"SELECT DISTINCT pid FROM qiandao_mall_list_record WHERE p_type = 'KUJI' AND crawl_category = '{CRAWL_CATEGORY}'")
- sql_pid_one_list = [item[0] for item in sql_pid_one_list]
- # sql_pid_two_list = sql_pool.select_all(
- # f"# SELECT DISTINCT pid FROM qiandao_mall_list_record WHERE p_type = 'GACHA' AND crawl_category = '{CRAWL_CATEGORY}'")
- # sql_pid_two_list = [item[0] for item in sql_pid_two_list]
- # sql_pid_three_list = sql_pool.select_all(
- # f"# SELECT DISTINCT pid FROM qiandao_mall_list_record WHERE p_type = 'LIVE_GROUPON' AND crawl_category = '{CRAWL_CATEGORY}'")
- # sql_pid_three_list = [item[0] for item in sql_pid_three_list]
- # 测试
- if not sql_pid_one_list:
- # sql_pid2_list = ["873770249084734603"]
- log.debug("No sql_pid_one_list")
- return
- for pid2 in sql_pid_one_list:
- try:
- get_group_id_list_for_pid_one_page(log, token, pid2, sql_pool)
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} for sql_pid2_list: {pid2}, error: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- def schedule_task():
- """
- 爬虫模块 定时任务 的启动文件
- """
- # 立即运行一次任务
- qd_mall_main(log=logger)
- # 设置定时任务
- schedule.every().day.at("00:01").do(qd_mall_main, log=logger)
- while True:
- schedule.run_pending()
- time.sleep(1)
- if __name__ == '__main__':
- schedule_task()
|