# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/3/24 15:05 import inspect import requests from loguru import logger from bs4 import BeautifulSoup from tenacity import retry, stop_after_attempt, wait_fixed logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") HEADERS = { "User-Agent": "Dart/3.5 (dart:io)", "Accept-Encoding": "gzip", "Content-Type": "application/json", "deviceid": "763f77b1-cc16-4369-ac39-a03206ecfb48", "brand": "Redmi", "os": "android", "content-type": "application/json; charset=utf-8", "authori-zation": "", "systemversion": "32", "theme": "dark", "lang": "zh", "verse-ua": "d7b3b338008806f1b20427173b983e29", "version": "1.3.0", "isphysicaldevice": "true", "cid": "02931506", "sktime": "1746343207832", "sk": "fe8a84f5e1ff81813d9a998d72d1cd99" } # headers = { # "User-Agent": "Dart/3.5 (dart:io)", # "Accept-Encoding": "gzip", # "Content-Type": "application/json", # "deviceid": "763f77b1-cc16-4369-ac39-a03206ecfb48", # "brand": "Redmi", # "os": "android", # "content-type": "application/json; charset=utf-8", # "authori-zation": "a-22695f440cc94df28b39f3e804696112", # "systemversion": "32", # "theme": "dark", # "lang": "zh", # "verse-ua": "d7b3b338008806f1b20427173b983e29", # "version": "1.3.0", # "isphysicaldevice": "true", # "sktime": "1746343207832", # "cid": "02931506", # "sk": "fe8a84f5e1ff81813d9a998d72d1cd99" # } def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e # def save_shop_list(sql_pool, shop_list): # """ # 保存店铺数据 # :param sql_pool: # :param shop_list: # """ # sql = "INSERT INTO leka_shop_record (shop_id, shop_name, fans_num, group_num, create_time) VALUES (%s, %s, %s, %s, %s)" # sql_pool.insert_all(sql, shop_list) # def save_product_list(sql_pool, product_list): # """ # 保存商品数据 # :param sql_pool: # :param product_list: # """ # sql = "INSERT INTO leka_product_record (product_id, no, create_time, title, img, price_sale, total_price, sale_num, spec_config, sort, state, shop_id, shop_name, category, on_sale_time, end_time, finish_time, video_url) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" # sql_pool.insert_one(sql, product_list) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def make_request(log, method, url, params=None, data=None, headers=None, proxies=None, timeout=5, token=None): """ 通用请求函数 :param log: logger对象 :param method: 请求方法 ('GET' 或 'POST') :param url: 请求的URL :param params: GET请求的查询参数 :param data: POST请求的数据 :param headers: 请求头 :param proxies: 代理 :param timeout: 请求超时时间 :param token: token :return: 响应的JSON数据 """ if headers is None: headers = HEADERS if 'getHitCardReport' or 'getCardPublicly' in url: if not token: token = "a-22695f440cc94df28b39f3e804696112" headers["authori-zation"] = token if proxies is None: proxies = get_proxys(log) try: with requests.Session() as session: if method.upper() == 'GET': if proxies is None: response = session.get(url, headers=headers, params=params, timeout=timeout) else: response = session.get(url, headers=headers, params=params, proxies=proxies, timeout=timeout) elif method.upper() == 'POST': if proxies is None: response = session.post(url, headers=headers, json=data, timeout=timeout) # print(response.text) else: response = session.post(url, headers=headers, json=data, proxies=proxies, timeout=timeout) else: log.error(f"Unsupported request method: {method}") return None response.raise_for_status() data = response.json() if data["code"] == 200: log.info(f"Successfully fetched {method} request to {url}") return data else: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: {data['message']}") return None except requests.exceptions.RequestException as e: log.error(f"Error making {method} request to {url}: {e}") raise e except ValueError as e: log.error(f"Error parsing JSON for {method} request to {url}: {e}") raise e except Exception as e: log.error(f"Error making {method} request to {url}: {e}") raise e def get_play_back(log, product_id, token): """ 获取 视频回放链接 :param log: logger对象 :param product_id: product_id :param token: token """ log.info(f"Starting to fetch playback for product_id {product_id}") url = "https://api.luckycards.com.cn/api/front/c/product/productDetailDynamics" params = { # "code": "LCS1254174" "code": product_id } try: response = make_request(log, 'GET', url, params=params, token=token) if response: items = response.get("data", {}) normalLiving = items.get("normalLiving", {}) playback = normalLiving.get("playback") return playback else: return None except Exception as e: log.error(f"Error fetching playback {product_id}: {e}") return None def clean_texts(html_text): """ 使用 BeautifulSoup 解析并获取纯文本 :param html_text: 待解析的HTML格式的数据 :return: clean_text -> 解析后的数据 """ if not html_text: return "" soup = BeautifulSoup(html_text, 'html.parser') # clean_text = soup.get_text(separator=' ', strip=True) clean_text = soup.get_text(strip=True) # 替换   为普通空格 clean_text = clean_text.replace(' ', ' ') return clean_text def parse_product_items(log, items, sql_pool, product_id, token): """ 解析 产品信息 :param log: logger对象 :param items: 请求response :param sql_pool: MySQL连接池对象 :param product_id: product_id :param token: token """ if not items: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found") return no = items.get("id") create_time = items.get("publishTime") title = items.get("productName") img = items.get("productImageIndex") price_sale = items.get("unitPriceStr") total_price = items.get("totalSalePrice") sale_num = items.get("saleCount") # 售出数量 spec_config = items.get("hitCardStandard") # 规格 sort = items.get("series") # 分类 0:全部 1:原盒 2:幸运盒 3:福盒? state = items.get("status") shop_id = items.get("merchantCode") shop_name = items.get("merchantName") category = items.get("brandId") on_sale_time = items.get("onlineTime") end_time = items.get("endTime") finish_time = items.get("finishTime") # content = items.get("purchaseNotes") # if content: # content = content.replace("

", "").replace("

", "") # brief = items.get("brief") product_detail = items.get("productDetail") if product_detail: product_detail = clean_texts(product_detail) # print('product_detail:',product_detail) video_url = get_play_back(log, product_id, token) hit_card_desc = items.get("hitCardDesc") # 赠品介绍 open_mode = items.get("openMode") # 随机球队 open_mode_comment = items.get("openModeComment") # 随机球队 说明 random_mode = items.get("randomMode") # 即买即随 random_mode_comment = items.get("randomModeComment") # 即买即随 说明 info_dict = { "no": no, "create_time": create_time, "title": title, "img": img, "price_sale": price_sale, "total_price": total_price, "sale_num": sale_num, "spec_config": spec_config, "sort": sort, "state": state, "shop_id": shop_id, "shop_name": shop_name, "category": category, "on_sale_time": on_sale_time, "end_time": end_time, "finish_time": finish_time, "product_detail": product_detail, "video_url": video_url, "hit_card_desc": hit_card_desc, "open_mode": open_mode, "open_mode_comment": open_mode_comment, "random_mode": random_mode, "random_mode_comment": random_mode_comment, } # print(info_dict) # sql_pool.insert_one_or_dict(table="leka_product_record", data=info_dict) sql_pool.update_one_or_dict(table="leka_product_record", data=info_dict, condition={"product_id": product_id}) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_product_details(log, product_id, sql_pool, token): """ 获取 商品详情 单条 信息 :param log: logger对象 :param product_id: product_id :param sql_pool: MySQL连接池对象 :param token: token """ log.debug(f"Getting product details for {product_id}") url = "https://api.luckycards.com.cn/api/front/c/product/productDetail" params = { # "code": "LCS1254079" "code": product_id } try: response = make_request(log, 'GET', url, params=params, token=token) if response: parse_product_items(log, response.get("data"), sql_pool, product_id, token) else: log.error(f"Error getting product details for {product_id}: {response.get('msg')}") except Exception as e: log.error(f"Error getting product details for {product_id}: {e}") def get_product_detail_list(log, sql_pool, token): """ 获取 商品详情 列表 信息 :param log: logger对象 :param sql_pool: MySQL连接池对象 :param token: token """ sql_product_id_list = sql_pool.select_all("SELECT product_id FROM leka_product_record WHERE no IS NULL") sql_product_id_list = [item[0] for item in sql_product_id_list] for product_id in sql_product_id_list: try: get_product_details(log, product_id, sql_pool, token) except Exception as e: log.error(f"Error get_product_detail_list fetching product {product_id}: {e}") continue def parse_player_items(log, items, sql_pool, product_id): """ 解析 卡密公示 信息 :param log: logger对象 :param items: 请求response :param product_id: product_id :param sql_pool: MySQL连接池对象 """ if not items: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found") return player_list = [] for item in items: # print(item) user_code = item.get("userCode") user_id = item.get("userId") user_name = item.get("nickName") num = item.get("cardCount") # info = (product_id, user_code, num, user_id, user_name) info_dict = { "product_id": product_id, "user_code": user_code, "num": num, "user_id": user_id, "user_name": user_name } # print(info_dict) player_list.append(info_dict) sql_pool.insert_many(table='leka_player_record', data_list=player_list) sql_pool.update_one("update leka_product_record set km_state = 1 where product_id = %s", (product_id,)) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_player_list(log, product_id, sql_pool, token): """ 抓取 kami公示 信息 :param log: logger对象 :param product_id: product_id :param sql_pool: MySQL连接池对象 :param token: token """ log.debug(f"Getting player list for {product_id}") url = "https://api.luckycards.com.cn/api/front/c/card/getCardPublicly" last_id = 0 # 初始lastId为0 total_players = 0 while True: data = { "keyword": "", "lastUserId": last_id, "productCode": product_id, "publiclyType": 2, # 1:赠品维度 2:玩家维度 } # print(data) try: response = make_request(log, 'POST', url, data=data, token=token) if not response: log.error(f"Error getting player list for {product_id}: Empty response") break items = response.get("data", []) if not items: log.info(f"No more players found for product {product_id}") sql_pool.update_one("update leka_product_record set km_state = 3 where product_id = %s", (product_id,)) break # 处理当前页数据 parse_player_items(log, items, sql_pool, product_id) total_players += len(items) # 如果获取数量超过50条,说明已经获取到所有数据,结束循环 if total_players > 50: log.debug(f"Total players found for product {product_id}: {total_players}") break # 如果获取数量不足20条,说明是最后一页 if len(items) < 20: log.info(f"Last page detected for product {product_id} (got {len(items)} items)") break # 更新lastId为最后一条的userId last_id = items[-1].get("userId") # print(last_id) if not last_id: log.error("API response missing userId in last item, cannot paginate") break # 避免频繁请求 # time.sleep(0.5) except Exception as e: log.error(f"Error getting player list for {product_id} at lastId {last_id}: {e}") break log.info(f"Finished fetching players for product {product_id}, total: {total_players}") def get_players(log, sql_pool, token): """ 抓取 kami公示 信息 :param log: logger对象 :param sql_pool: MySQL连接池对象 :param token: token """ product_list = sql_pool.select_all("SELECT product_id FROM leka_product_record WHERE km_state IN (0, 3)") product_list = [product_id[0] for product_id in product_list] # token = sql_pool.select_one("SELECT token FROM leka_token") # token = token[0] if not product_list: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No product_id found") return else: log.info(f"Start fetching players data. Total products: {len(product_list)}") for product_id in product_list: try: get_player_list(log, product_id, sql_pool, token) except Exception as e: log.error(f"Error fetching product {product_id}: {e}") continue @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_report_one_page(log, sql_pool, productCode, page, last_id, token): """ 获取 拆卡报告 单页的信息 :param log: logger对象 :param sql_pool: MySQL连接池对象 :param productCode: product_id :param page: 页码 :param last_id: last_id :param token: token """ url = "https://api.luckycards.com.cn/api/front/c/card/getHitCardReport" data = { "keyword": "", "page": page, "lastId": last_id, # "productCode": "LCS1254213" "productCode": productCode } log.info(f"Getting report data for: {productCode}, Page: {page}") try: response = make_request(log, 'POST', url, data=data, token=token) # print(response) if response: items = response.get("data", []) if items: info_list = [] for item in items: card_id = item.get("orderNo") card_name = item.get("cardSecret") create_time = item.get("drawTime") imgs = item.get("hitPic") user_id = item.get("userCode") user_name = item.get("nickName") shop_id = item.get("merchantCode") shop_name = item.get("merchantName") card_desc = item.get("hitCardDesc") # info = (card_id, card_name, create_time, imgs, user_id, user_name, shop_id, shop_name, card_desc) info_dict = { "product_id": productCode, "card_id": card_id, "card_name": card_name, "create_time": create_time, "imgs": imgs, "user_id": user_id, "user_name": user_name, "shop_id": shop_id, "shop_name": shop_name, "card_desc": card_desc } # print(info_dict) info_list.append(info_dict) sql_pool.insert_many(table='leka_report_record', data_list=info_list) log.info(f"Successfully saved {len(items)} report items") return items[-1].get("userCode"), len(items) else: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found") sql_pool.update_one("update leka_product_record set report_state = 3 where product_id = %s", (productCode,)) return 0, 0 else: log.error(f"Error getting report data: {response.get('msg')}") return 0 except Exception as e: log.error(f"Error getting report data: {e}") raise e def get_report_list(log, sql_pool, product_id, token): """ 抓取 拆卡报告 单个product_id 所有页码的 信息 :param log: logger对象 :param sql_pool: MySQL连接池对象 :param product_id: product_id :param token: token """ # log.info(f"Start fetching report data. Product id: {product_id}") page = 1 last_id = 0 # while True: try: last_d, len_item = get_report_one_page(log, sql_pool, product_id, page, last_id, token) # if len_item != 0 and len_item < 20: log.info(f"Finished fetching report data for product {product_id}, total: {len_item}") sql_pool.update_one("update leka_product_record set report_state = 1 where product_id = %s", (product_id,)) # # 如果获取数量不足20条,说明是最后一页 ***暂时没找到第二页的*** # if len_item < 20: # log.info(f"Last page detected for product {product_id} (got {len_item} items)") # break # # # 更新lastId为最后一条的userId # last_id = last_d # if not last_id: # log.error("API response missing userId in last item, cannot paginate") # break # # page += 1 except Exception as e: log.error(f"Error getting report data: {e}") # break def get_reports(log, sql_pool, token): """ 抓取 拆卡报告 信息 :param log: logger对象 :param sql_pool: MySQL连接池对象 :param token: token """ product_list = sql_pool.select_all("SELECT product_id FROM leka_product_record WHERE report_state IN (0, 3)") product_list = [product_id[0] for product_id in product_list] # token = sql_pool.select_one("SELECT token FROM leka_token") # token = token[0] if not product_list: log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No product_id found") return else: log.info(f"Start fetching report data. Total products: {len(product_list)}") for product_id in product_list: try: get_report_list(log, sql_pool, product_id, token) except Exception as e: log.error(f"Error fetching product {product_id}: {e}") continue if __name__ == '__main__': pass # pid = 'LCS1254213' # pid = 'LCS1253418' # pid = 'LCS1256332' # from mysql_pool import MySQLConnectionPool # sql_pool_ = MySQLConnectionPool(log=logger) # get_reports(logger, None) # get_player_list(logger, pid, None) # get_product_details(logger, 'LCS1255968', sql_pool_)