| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/3/24 15:05
- import inspect
- import requests
- from loguru import logger
- from bs4 import BeautifulSoup
- from tenacity import retry, stop_after_attempt, wait_fixed
- logger.remove()
- logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- HEADERS = {
- "User-Agent": "Dart/3.5 (dart:io)",
- "Accept-Encoding": "gzip",
- "Content-Type": "application/json",
- "deviceid": "763f77b1-cc16-4369-ac39-a03206ecfb48",
- "brand": "Redmi",
- "os": "android",
- "content-type": "application/json; charset=utf-8",
- "authori-zation": "",
- "systemversion": "32",
- "theme": "dark",
- "lang": "zh",
- "verse-ua": "d7b3b338008806f1b20427173b983e29",
- "version": "2.0.0",
- "isphysicaldevice": "true",
- "cid": "02931506",
- "sktime": "1746343207832",
- "sk": "fe8a84f5e1ff81813d9a998d72d1cd99"
- }
- # headers = {
- # "User-Agent": "Dart/3.5 (dart:io)",
- # "Accept-Encoding": "gzip",
- # "Content-Type": "application/json",
- # "deviceid": "763f77b1-cc16-4369-ac39-a03206ecfb48",
- # "brand": "Redmi",
- # "os": "android",
- # "content-type": "application/json; charset=utf-8",
- # "authori-zation": "a-22695f440cc94df28b39f3e804696112",
- # "systemversion": "32",
- # "theme": "dark",
- # "lang": "zh",
- # "verse-ua": "d7b3b338008806f1b20427173b983e29",
- # "version": "1.3.0",
- # "isphysicaldevice": "true",
- # "sktime": "1746343207832",
- # "cid": "02931506",
- # "sk": "fe8a84f5e1ff81813d9a998d72d1cd99"
- # }
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_proxys(log):
- """
- 获取代理
- :return: 代理
- """
- tunnel = "x371.kdltps.com:15818"
- kdl_username = "t13753103189895"
- kdl_password = "o0yefv6z"
- try:
- proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
- }
- return proxies
- except Exception as e:
- log.error(f"Error getting proxy: {e}")
- raise e
- # def save_shop_list(sql_pool, shop_list):
- # """
- # 保存店铺数据
- # :param sql_pool:
- # :param shop_list:
- # """
- # sql = "INSERT INTO leka_shop_record (shop_id, shop_name, fans_num, group_num, create_time) VALUES (%s, %s, %s, %s, %s)"
- # sql_pool.insert_all(sql, shop_list)
- # def save_product_list(sql_pool, product_list):
- # """
- # 保存商品数据
- # :param sql_pool:
- # :param product_list:
- # """
- # sql = "INSERT INTO leka_product_record (product_id, no, create_time, title, img, price_sale, total_price, sale_num, spec_config, sort, state, shop_id, shop_name, category, on_sale_time, end_time, finish_time, video_url) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
- # sql_pool.insert_one(sql, product_list)
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def make_request(log, method, url, params=None, data=None, headers=None, proxies=None, timeout=5, token=None):
- """
- 通用请求函数
- :param log: logger对象
- :param method: 请求方法 ('GET' 或 'POST')
- :param url: 请求的URL
- :param params: GET请求的查询参数
- :param data: POST请求的数据
- :param headers: 请求头
- :param proxies: 代理
- :param timeout: 请求超时时间
- :param token: token
- :return: 响应的JSON数据
- """
- if headers is None:
- headers = HEADERS
- if 'getHitCardReport' or 'getCardPublicly' in url:
- if not token:
- token = "a-22695f440cc94df28b39f3e804696112"
- headers["authori-zation"] = token
- if proxies is None:
- proxies = get_proxys(log)
- try:
- with requests.Session() as session:
- if method.upper() == 'GET':
- if proxies is None:
- response = session.get(url, headers=headers, params=params, timeout=timeout)
- else:
- response = session.get(url, headers=headers, params=params, proxies=proxies, timeout=timeout)
- elif method.upper() == 'POST':
- if proxies is None:
- response = session.post(url, headers=headers, json=data, timeout=timeout)
- # print(response.text)
- else:
- response = session.post(url, headers=headers, json=data, proxies=proxies, timeout=timeout)
- else:
- log.error(f"Unsupported request method: {method}")
- return None
- response.raise_for_status()
- data = response.json()
- if data["code"] == 200:
- log.info(f"Successfully fetched {method} request to {url}")
- return data
- else:
- log.warning(f"Warning {inspect.currentframe().f_code.co_name}: {data['message']}")
- return None
- except requests.exceptions.RequestException as e:
- log.error(f"Error making {method} request to {url}: {e}")
- raise e
- except ValueError as e:
- log.error(f"Error parsing JSON for {method} request to {url}: {e}")
- raise e
- except Exception as e:
- log.error(f"Error making {method} request to {url}: {e}")
- raise e
- def get_play_back(log, product_id, token):
- """
- 获取 视频回放链接
- :param log: logger对象
- :param product_id: product_id
- :param token: token
- """
- log.info(f"Starting to fetch playback for product_id {product_id}")
- url = "https://api.luckycards.com.cn/api/front/c/product/productDetailDynamics"
- params = {
- # "code": "LCS1254174"
- "code": product_id
- }
- try:
- response = make_request(log, 'GET', url, params=params, token=token)
- if response:
- items = response.get("data", {})
- normalLiving = items.get("normalLiving", {})
- playback = normalLiving.get("playback")
- return playback
- else:
- return None
- except Exception as e:
- log.error(f"Error fetching playback {product_id}: {e}")
- return None
- def clean_texts(html_text):
- """
- 使用 BeautifulSoup 解析并获取纯文本
- :param html_text: 待解析的HTML格式的数据
- :return: clean_text -> 解析后的数据
- """
- if not html_text:
- return ""
- soup = BeautifulSoup(html_text, 'html.parser')
- # clean_text = soup.get_text(separator=' ', strip=True)
- clean_text = soup.get_text(strip=True)
- # 替换 为普通空格
- clean_text = clean_text.replace(' ', ' ')
- return clean_text
- def parse_product_items(log, items, sql_pool, product_id, token):
- """
- 解析 产品信息
- :param log: logger对象
- :param items: 请求response
- :param sql_pool: MySQL连接池对象
- :param product_id: product_id
- :param token: token
- """
- if not items:
- log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found")
- return
- no = items.get("id")
- create_time = items.get("publishTime")
- title = items.get("productName")
- img = items.get("productImageIndex")
- price_sale = items.get("unitPriceStr")
- total_price = items.get("totalSalePrice")
- sale_num = items.get("saleCount") # 售出数量
- spec_config = items.get("hitCardStandard") # 规格
- sort = items.get("series") # 分类 0:全部 1:原盒 2:幸运盒 3:福盒?
- state = items.get("status")
- shop_id = items.get("merchantCode")
- shop_name = items.get("merchantName")
- category = items.get("brandId")
- on_sale_time = items.get("onlineTime")
- end_time = items.get("endTime")
- finish_time = items.get("finishTime")
- # content = items.get("purchaseNotes")
- # if content:
- # content = content.replace("<p>", "").replace("</p>", "")
- # brief = items.get("brief")
- product_detail = items.get("productDetail")
- if product_detail:
- product_detail = clean_texts(product_detail)
- # print('product_detail:',product_detail)
- video_url = get_play_back(log, product_id, token)
- hit_card_desc = items.get("hitCardDesc") # 赠品介绍
- open_mode = items.get("openMode") # 随机球队
- open_mode_comment = items.get("openModeComment") # 随机球队 说明
- random_mode = items.get("randomMode") # 即买即随
- random_mode_comment = items.get("randomModeComment") # 即买即随 说明
- info_dict = {
- "no": no,
- "create_time": create_time,
- "title": title,
- "img": img,
- "price_sale": price_sale,
- "total_price": total_price,
- "sale_num": sale_num,
- "spec_config": spec_config,
- "sort": sort,
- "state": state,
- "shop_id": shop_id,
- "shop_name": shop_name,
- "category": category,
- "on_sale_time": on_sale_time,
- "end_time": end_time,
- "finish_time": finish_time,
- "product_detail": product_detail,
- "video_url": video_url,
- "hit_card_desc": hit_card_desc,
- "open_mode": open_mode,
- "open_mode_comment": open_mode_comment,
- "random_mode": random_mode,
- "random_mode_comment": random_mode_comment,
- }
- # print(info_dict)
- # sql_pool.insert_one_or_dict(table="leka_product_record", data=info_dict)
- sql_pool.update_one_or_dict(table="leka_product_record", data=info_dict, condition={"product_id": product_id})
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_product_details(log, product_id, sql_pool, token):
- """
- 获取 商品详情 单条 信息
- :param log: logger对象
- :param product_id: product_id
- :param sql_pool: MySQL连接池对象
- :param token: token
- """
- log.debug(f"Getting product details for {product_id}")
- url = "https://api.luckycards.com.cn/api/front/c/product/productDetail"
- params = {
- # "code": "LCS1254079"
- "code": product_id
- }
- try:
- response = make_request(log, 'GET', url, params=params, token=token)
- if response:
- parse_product_items(log, response.get("data"), sql_pool, product_id, token)
- else:
- log.error(f"Error getting product details for {product_id}: {response.get('msg')}")
- except Exception as e:
- log.error(f"Error getting product details for {product_id}: {e}")
- def get_product_detail_list(log, sql_pool, token):
- """
- 获取 商品详情 列表 信息
- :param log: logger对象
- :param sql_pool: MySQL连接池对象
- :param token: token
- """
- sql_product_id_list = sql_pool.select_all("SELECT product_id FROM leka_product_record WHERE no IS NULL")
- sql_product_id_list = [item[0] for item in sql_product_id_list]
- for product_id in sql_product_id_list:
- try:
- get_product_details(log, product_id, sql_pool, token)
- except Exception as e:
- log.error(f"Error get_product_detail_list fetching product {product_id}: {e}")
- continue
- def parse_player_items(log, items, sql_pool, product_id):
- """
- 解析 卡密公示 信息
- :param log: logger对象
- :param items: 请求response
- :param product_id: product_id
- :param sql_pool: MySQL连接池对象
- """
- if not items:
- log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found")
- return
- player_list = []
- for item in items:
- # print(item)
- user_code = item.get("userCode")
- user_id = item.get("userId")
- user_name = item.get("nickName")
- num = item.get("cardCount")
- # info = (product_id, user_code, num, user_id, user_name)
- info_dict = {
- "product_id": product_id,
- "user_code": user_code,
- "num": num,
- "user_id": user_id,
- "user_name": user_name
- }
- # print(info_dict)
- player_list.append(info_dict)
- sql_pool.insert_many(table='leka_player_record', data_list=player_list)
- sql_pool.update_one("update leka_product_record set km_state = 1 where product_id = %s", (product_id,))
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_player_list(log, product_id, sql_pool, token):
- """
- 抓取 kami公示 信息
- :param log: logger对象
- :param product_id: product_id
- :param sql_pool: MySQL连接池对象
- :param token: token
- """
- log.debug(f"Getting player list for {product_id}")
- url = "https://api.luckycards.com.cn/api/front/c/card/getCardPublicly"
- last_id = 0 # 初始lastId为0
- total_players = 0
- while True:
- data = {
- "keyword": "",
- "lastUserId": last_id,
- "productCode": product_id,
- "publiclyType": 2, # 1:赠品维度 2:玩家维度
- }
- # print(data)
- try:
- response = make_request(log, 'POST', url, data=data, token=token)
- if not response:
- log.error(f"Error getting player list for {product_id}: Empty response")
- break
- items = response.get("data", [])
- if not items:
- log.info(f"No more players found for product {product_id}")
- sql_pool.update_one("update leka_product_record set km_state = 3 where product_id = %s", (product_id,))
- break
- # 处理当前页数据
- parse_player_items(log, items, sql_pool, product_id)
- total_players += len(items)
- # 如果获取数量超过50条,说明已经获取到所有数据,结束循环
- if total_players > 50:
- log.debug(f"Total players found for product {product_id}: {total_players}")
- break
- # 如果获取数量不足20条,说明是最后一页
- if len(items) < 20:
- log.info(f"Last page detected for product {product_id} (got {len(items)} items)")
- break
- # 更新lastId为最后一条的userId
- last_id = items[-1].get("userId")
- # print(last_id)
- if not last_id:
- log.error("API response missing userId in last item, cannot paginate")
- break
- # 避免频繁请求
- # time.sleep(0.5)
- except Exception as e:
- log.error(f"Error getting player list for {product_id} at lastId {last_id}: {e}")
- break
- log.info(f"Finished fetching players for product {product_id}, total: {total_players}")
- def get_players(log, sql_pool, token):
- """
- 抓取 kami公示 信息
- :param log: logger对象
- :param sql_pool: MySQL连接池对象
- :param token: token
- """
- product_list = sql_pool.select_all("SELECT product_id FROM leka_product_record WHERE km_state IN (0, 3)")
- product_list = [product_id[0] for product_id in product_list]
- # token = sql_pool.select_one("SELECT token FROM leka_token")
- # token = token[0]
- if not product_list:
- log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No product_id found")
- return
- else:
- log.info(f"Start fetching players data. Total products: {len(product_list)}")
- for product_id in product_list:
- try:
- get_player_list(log, product_id, sql_pool, token)
- except Exception as e:
- log.error(f"Error fetching product {product_id}: {e}")
- continue
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_report_one_page(log, sql_pool, productCode, page, last_id, token):
- """
- 获取 拆卡报告 单页的信息
- :param log: logger对象
- :param sql_pool: MySQL连接池对象
- :param productCode: product_id
- :param page: 页码
- :param last_id: last_id
- :param token: token
- """
- url = "https://api.luckycards.com.cn/api/front/c/card/getHitCardReport"
- data = {
- "keyword": "",
- "page": page,
- "lastId": last_id,
- # "productCode": "LCS1254213"
- "productCode": productCode
- }
- log.info(f"Getting report data for: {productCode}, Page: {page}")
- try:
- response = make_request(log, 'POST', url, data=data, token=token)
- # print(response)
- if response:
- items = response.get("data", [])
- if items:
- info_list = []
- for item in items:
- card_id = item.get("orderNo")
- card_name = item.get("cardSecret")
- create_time = item.get("drawTime")
- imgs = item.get("hitPic")
- user_id = item.get("userCode")
- user_name = item.get("nickName")
- shop_id = item.get("merchantCode")
- shop_name = item.get("merchantName")
- card_desc = item.get("hitCardDesc")
- # info = (card_id, card_name, create_time, imgs, user_id, user_name, shop_id, shop_name, card_desc)
- info_dict = {
- "product_id": productCode,
- "card_id": card_id,
- "card_name": card_name,
- "create_time": create_time,
- "imgs": imgs,
- "user_id": user_id,
- "user_name": user_name,
- "shop_id": shop_id,
- "shop_name": shop_name,
- "card_desc": card_desc
- }
- # print(info_dict)
- info_list.append(info_dict)
- sql_pool.insert_many(table='leka_report_record', data_list=info_list)
- log.info(f"Successfully saved {len(items)} report items")
- return items[-1].get("userCode"), len(items)
- else:
- log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No items found")
- sql_pool.update_one("update leka_product_record set report_state = 3 where product_id = %s",
- (productCode,))
- return 0, 0
- else:
- log.error(f"Error getting report data: {response.get('msg')}")
- return 0
- except Exception as e:
- log.error(f"Error getting report data: {e}")
- raise e
- def get_report_list(log, sql_pool, product_id, token):
- """
- 抓取 拆卡报告 单个product_id 所有页码的 信息
- :param log: logger对象
- :param sql_pool: MySQL连接池对象
- :param product_id: product_id
- :param token: token
- """
- # log.info(f"Start fetching report data. Product id: {product_id}")
- page = 1
- last_id = 0
- # while True:
- try:
- last_d, len_item = get_report_one_page(log, sql_pool, product_id, page, last_id, token)
- # if len_item != 0 and len_item < 20:
- log.info(f"Finished fetching report data for product {product_id}, total: {len_item}")
- sql_pool.update_one("update leka_product_record set report_state = 1 where product_id = %s", (product_id,))
- # # 如果获取数量不足20条,说明是最后一页 ***暂时没找到第二页的***
- # if len_item < 20:
- # log.info(f"Last page detected for product {product_id} (got {len_item} items)")
- # break
- #
- # # 更新lastId为最后一条的userId
- # last_id = last_d
- # if not last_id:
- # log.error("API response missing userId in last item, cannot paginate")
- # break
- #
- # page += 1
- except Exception as e:
- log.error(f"Error getting report data: {e}")
- # break
- def get_reports(log, sql_pool, token):
- """
- 抓取 拆卡报告 信息
- :param log: logger对象
- :param sql_pool: MySQL连接池对象
- :param token: token
- """
- product_list = sql_pool.select_all("SELECT product_id FROM leka_product_record WHERE report_state IN (0, 3)")
- product_list = [product_id[0] for product_id in product_list]
- # token = sql_pool.select_one("SELECT token FROM leka_token")
- # token = token[0]
- if not product_list:
- log.warning(f"Warning {inspect.currentframe().f_code.co_name}: No product_id found")
- return
- else:
- log.info(f"Start fetching report data. Total products: {len(product_list)}")
- for product_id in product_list:
- try:
- get_report_list(log, sql_pool, product_id, token)
- except Exception as e:
- log.error(f"Error fetching product {product_id}: {e}")
- continue
- if __name__ == '__main__':
- pass
- # pid = 'LCS1254213'
- # pid = 'LCS1253418'
- # pid = 'LCS1256332'
- # from mysql_pool import MySQLConnectionPool
- # sql_pool_ = MySQLConnectionPool(log=logger)
- # get_reports(logger, None)
- # get_player_list(logger, pid, None)
- # get_product_details(logger, 'LCS1255968', sql_pool_)
|