| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/6/30 19:35
- import time
- from datetime import datetime
- import utils
- import random
- import inspect
- import schedule
- from loguru import logger
- from mysql_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- logger.remove()
- logger.add("./logs/chouka_add_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- def get_draw_card_list(log, sql_pool):
- """
- 获取 抽卡机 列表页信息
- :param log:
- :param sql_pool:
- """
- page = 1
- max_page = 500
- total_count = 0
- sql_pid_list = sql_pool.select_all("SELECT pid FROM qiandao_mall_draw_card_list_record")
- sql_pid_list = [item[0] for item in sql_pid_list]
- # sql_pid_list = []
- while page <= max_page:
- len_item, totalCount = draw_card_one_page(log, page, sql_pool, sql_pid_list)
- if len_item < 20:
- log.debug(
- f"--------------- page {page}, len_item: {len_item}, totalCount: {totalCount}, break !!! ---------------")
- break
- total_count += len_item
- if total_count >= int(totalCount):
- log.debug(f"total_count: {total_count} >= totalCount: {totalCount}, break...........")
- break
- page += 1
- time.sleep(random.uniform(0.1, 1))
- sql_pid_list.clear()
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def draw_card_one_page(log, page, sql_pool, sql_pid_list):
- """
- 获取 抽卡机 单页信息
- :param log:
- :param page:
- :param sql_pool:
- :param sql_pid_list:
- """
- log.debug(f"--------------- {inspect.currentframe().f_code.co_name}, page: {page} ---------------")
- url = "https://api.qiandao.com/shelf-web/list-home-recommender"
- params = {
- # "offset": "0",
- "offset": str((page - 1) * 20),
- "limit": "20",
- "cmd": "b2c_homepage_feed",
- "name": "cgmarket",
- "project": "channel",
- "type": "LIVE_GROUPON",
- "typeIds": [1443323],
- "withLive": "true",
- "tagIds": [1701855],
- "quickCouponTemplateId": "0"
- }
- resp_json = utils.request_get_data(log, url, params)
- # print(resp_json)
- rows = resp_json["data"]["rows"]
- parse_draw_card_data(log, rows, sql_pool, sql_pid_list)
- len_rows = len(rows)
- total_count = resp_json["data"]["count"]
- return len_rows, total_count
- def parse_draw_card_data(log, resp_json, sql_pool, sql_pid_list):
- log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
- info_list = []
- for item in resp_json:
- # print(item)
- pid = item.get("id")
- if pid in sql_pid_list:
- log.debug(f"{inspect.currentframe().f_code.co_name}, pid: {pid} already exists")
- continue
- p_type = item.get("type")
- orgId = item.get("orgId")
- title = item.get("name")
- # description = item.get("description")
- isSoldOut = item.get("isSoldOut")
- price = item.get("price", {}).get("unitPriceOfCash")
- soldAmountText = item.get("soldAmountText")
- nickname = item.get("org", {}).get("nickname", '')
- info_dict = {
- "pid": pid,
- "p_type": p_type,
- "title": title,
- "price": price,
- "sold_amount_text": soldAmountText,
- "org_id": orgId,
- "nickname": nickname,
- # "description": description,
- "is_sold_out": isSoldOut
- }
- # print(info_dict)
- sql_pid_list.append(pid)
- info_list.append(info_dict)
- if info_list:
- # sql_pool.insert_many(table="qiandao_mall_draw_card_list_record", data_list=info_list)
- query = """
- INSERT INTO qiandao_mall_draw_card_list_record
- (pid, p_type, title, price, sold_amount_text, org_id, nickname, is_sold_out)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- p_type = VALUES(p_type),
- title = VALUES(title),
- price = VALUES(price),
- sold_amount_text = VALUES(sold_amount_text),
- org_id = VALUES(org_id),
- nickname = VALUES(nickname),
- is_sold_out = VALUES(is_sold_out)
- """
- args_list = [(item['pid'], item['p_type'], item['title'], item['price'], item['sold_amount_text'],
- item['org_id'], item['nickname'], item['is_sold_out']) for item in info_list]
- sql_pool.insert_many(query=query, args_list=args_list)
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_draw_card_details(log, sql_pool, pid):
- """
- 获取 抽卡机 详情页信息
- :param log:
- :param sql_pool:
- :param pid:
- :return:
- {'pid': '867827029033121899', 'p_type': 'GACHA', 'org_id': '710150753377039896', 'title': '0.01/包 灵焰包第2弹 可合成 累计抽卡领好礼', 'price': 1.4, 'sold_amount_text': '已售2000+', 'nickname': '哇咔咔', 'is_sold_out': False}
- """
- url = "https://api.qiandao.com/b2c-web/v1/gacha/query/detail"
- params = {
- "shelfId": "867827029033121899"
- }
- resp_json = utils.request_get_data(log, url, params)
- # print(resp_json)
- if resp_json.get("code") == 0:
- parse_draw_card_details_data(log, resp_json, pid, sql_pool)
- else:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
- def parse_draw_card_details_data(log, resp_json, pid, sql_pool):
- log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
- data = resp_json.get("data")
- specification = data.get("packageConfigRamark") # 规格说明
- introImages = data.get('images', {}).get("introImages", [])
- introImages = '|'.join(introImages) # 多图链接, |分割
- categoryId = data.get("categoryId")
- vipPrice = data.get("vipPrice")
- unitPriceOfCash = data.get("unitPriceOfCash")
- detail_dict = {
- "specification": specification,
- "images": introImages,
- "category_id": categoryId,
- "unit_price_of_cash": unitPriceOfCash,
- "vip_price": vipPrice,
- }
- # print(detail_dict)
- sql_pool.update_one_or_dict(table="qiandao_mall_draw_card_list_record", data=detail_dict, condition={"pid": pid})
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_draw_ids(log, pid, sql_pool,max_trading_time):
- """
- 获取 抽卡记录 id列表 信息
- :param log:
- :param pid:
- :param sql_pool:
- :param max_trading_time:
- """
- log.debug(f"--------------- {inspect.currentframe().f_code.co_name}, pid: {pid} ---------------")
- url = "https://api.qiandao.cn/b2c-web/v1/box/gacha/query/digest-draw-records"
- params = {
- # "shelfId": "855250935293687891"
- "shelfId": pid
- }
- resp_json = utils.request_get_data(log, url, params)
- # print(resp_json)
- if resp_json.get("code") == 0:
- rows = resp_json.get("data", {}).get("rows", [])
- draw_id_list = [item.get("id") for item in rows]
- # 每隔10个抽奖,获取一次
- for i in range(0, len(draw_id_list), 10):
- log.debug(
- f"{inspect.currentframe().f_code.co_name}, get_draw_ids, i: {i}, len draw_id_list: {len(draw_id_list)}")
- try:
- get_draw_buy_data(log, pid, sql_pool, draw_id_list[i:i + 10],max_trading_time)
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} call draw_id_list error: {e}")
- else:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_draw_buy_data(log, pid, sql_pool, ids: list,max_trading_time):
- """
- 获取 抽卡记录 信息
- :param log:
- :param pid:
- :param sql_pool:
- :param ids:
- :param max_trading_time:
- """
- url = "https://api.qiandao.cn/b2c-web/v1/box/gacha/query/ids-draw-records"
- # data = {
- # "ids": [
- # "878724165295676667",
- # "878724165295676666",
- # "878724165295676665",
- # "878724165295676664",
- # "878724165295676663",
- # "878723922630058286",
- # "878723922630058285",
- # "878723922630058284",
- # "878723922630058283",
- # "878723922630058282"
- # ],
- # "shelfId": "855250935293687891"
- # }
- data = {
- "ids": ids,
- # "shelfId": "855250935293687891"
- "shelfId": pid
- }
- resp_json = utils.request_post_data(log, url, data)
- # print(resp_json)
- if resp_json.get("code") == 0:
- try:
- parse_draw_buy_data(log, resp_json, pid, sql_pool,max_trading_time)
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} call parse_draw_buy_data error: {e}")
- else:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {resp_json.get('message')}")
- def is_draw_id_exists(log, sql_pool, draw_id):
- """
- 判断 draw_id 是否存在于表中
- :param log: 日志记录器
- :param sql_pool: 数据库连接池
- :param draw_id: 要检查的 draw_id
- :return: True 如果存在,否则 False
- """
- try:
- query = """
- SELECT EXISTS (
- SELECT 1
- FROM qiandao_mall_draw_card_buy_record
- WHERE draw_id = %s
- ) AS found
- """
- result = sql_pool.select_one(query=query, args=(draw_id,))
- return bool(result[0]) if result else False
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} error: {e}")
- return False
- def parse_draw_buy_data(log, resp_json, pid, sql_pool,max_trading_time):
- log.debug(f"--------------- {inspect.currentframe().f_code.co_name} ---------------")
- rows = resp_json.get('data', {}).get('rows', [])
- info_list = []
- for item in rows:
- draw_id = item.get("id")
- # # 判断 draw_id 是否在 qiandao_mall_draw_card_buy_record 表中
- # if is_draw_id_exists(log, sql_pool, draw_id):
- # log.debug(f"draw_id: {draw_id} 已存在,跳过插入")
- # continue
- productInfo = item.get("productInfo", {})
- spu_id = productInfo.get("id", {})
- spuName = productInfo.get("spuName")
- spuImage = productInfo.get("spuImage")
- rarity = item.get("rarity")
- userInfo = item.get("userInfo", {})
- buyer_id = userInfo.get("id")
- buyer_name = userInfo.get("name")
- draw_time = item.get("time")
- trading_time_str = utils.transform_ms(log, draw_time)
- if not trading_time_str:
- log.debug(f"Invalid trading time: {trading_time_str}")
- continue # 跳过无效时间
- # 字符串 -> datetime
- trading_time = datetime.strptime(trading_time_str, "%Y-%m-%d %H:%M:%S")
- if max_trading_time and trading_time <= max_trading_time:
- log.debug(f"trading_time: {trading_time_str} <= max_trading_time: {max_trading_time}, 跳过旧数据")
- break
- draw_dict = {
- "pid": pid,
- "draw_id": draw_id,
- "spu_id": spu_id,
- "spu_name": spuName,
- "spu_image": spuImage,
- "rarity": rarity,
- "buyer_id": buyer_id,
- "buyer_name": buyer_name,
- "draw_time": trading_time_str
- }
- # print(draw_dict)
- info_list.append(draw_dict)
- if info_list:
- try:
- sql_pool.insert_many(table="qiandao_mall_draw_card_buy_record", data_list=info_list)
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} call insert_many error: {e}")
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def qd_draw_card_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- # 请求抽卡记录
- # sql_pid2_list = sql_pool.select_all("SELECT pid FROM qiandao_mall_draw_card_list_record WHERE is_sold_out = 1")
- sql_pid2_list = sql_pool.select_all("SELECT pid FROM qiandao_mall_draw_card_list_record")
- sql_pid2_list = [item[0] for item in sql_pid2_list]
- # 测试
- if not sql_pid2_list:
- # sql_pid2_list = ["855250935293687891"]
- log.debug("No sql_pid2_list")
- return
- for pid2 in sql_pid2_list:
- try:
- # 获取 该pid 购买数据的最大时间。
- max_trading_time = sql_pool.select_one(
- query="SELECT MAX(draw_time) FROM qiandao_mall_draw_card_buy_record WHERE pid = %s",
- args=(pid2,))
- max_trading_time = max_trading_time[0] if max_trading_time else None
- get_draw_ids(log, pid2, sql_pool,max_trading_time)
- except Exception as e:
- log.error(f"{inspect.currentframe().f_code.co_name} for sql_pid2_list: {pid2}, error: {e}")
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- if __name__ == '__main__':
- qd_draw_card_main(log=logger)
|