# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/7/14 15:16 import datetime import random import time import requests from get_sign import get_sign import inspect import schedule from loguru import logger from tenacity import retry, stop_after_attempt, wait_fixed from mysql_pool import MySQLConnectionPool logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") HEADERS = { "User-Agent": "Mozilla/5.0 (Linux; Android 11; Pixel 5 Build/RQ3A.211001.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/83.0.4103.106 Mobile Safari/537.36 uni-app Html5Plus/1.0 (Immersed/52.727272)", "Connection": "Keep-Alive", "Accept": "application/json", "Accept-Encoding": "gzip", "Content-Type": "application/json", "Cache-Control": "no-cache", # "sign": "3c5028da758dc416455f575334cadaab", # "x-token": "782a2174df1a19aa26904dad1d347c97", "client": "yingyongbao", "appversion": "2.1.10", # "nonce": "507060933e1e0585", "deviceid": "null", "jrd": "100d85590861f713a85", # "timestamp": f"{ts}" } def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e def transfer_ts(timestamp): """ 将10位时间戳转换为指定格式的时间字符串 :param timestamp 1730389419 :return formatted_time """ dt_object = datetime.datetime.fromtimestamp(timestamp) formatted_time = dt_object.strftime('%Y-%m-%d %H:%M:%S') return formatted_time def get_product_list(log, sql_pool, sql_shop_id_list): page = 1 while page <= 100: try: log.debug(f'--------------- page {page} start ---------------') len_items, total_page = get_product_single_page(log, page, sql_pool, sql_shop_id_list) except Exception as e: log.error(f"{inspect.currentframe().f_code.co_name} Request get_product_single_page for page:{page}, {e}") len_items = 0 total_page = 0 if len_items < 10: log.debug(f'--------------- page {page} has {len_items} items, break ---------------') break if total_page == page: log.debug(f'--------------- page {page} has {total_page} pages, break ---------------') break page += 1 # 设置等待时间 避免查询太频繁 time.sleep(random.uniform(0.5, 1)) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_product_single_page(log, page: int, sql_pool, sql_shop_id_list): url = "https://api.xingchao6.com/AppClient/v1.1/product/list/page" ts = int((time.time()) * 1000) data = { # "page": 1, "page": page, "page_size": 10, "timestamp": ts } sign, nonce = get_sign(data, ts) headers = HEADERS.copy() headers["sign"] = sign headers["nonce"] = nonce headers["timestamp"] = str(ts) # response = requests.post(url, headers=headers, json=data) response = requests.post(url, headers=headers, json=data, proxies=get_proxys(log), timeout=10) # print(response.text) response.raise_for_status() if response.json().get("code") == 200: total_page = response.json().get("data", {}).get("page", {}).get("total_page", 0) log.debug(f'--------------- page {page} has {total_page} items ---------------') if total_page > 0: items = response.json().get("data", {}).get("list", []) if not items: log.debug(f'--------------- page {page} has no items ---------------') return info_list = [] for item in items: # log.debug(f'--------------- item: {item} ---------------') shop_data = item.get("shop", {}) shop_id = shop_data.get("id") if shop_id and shop_id in sql_shop_id_list: log.debug(f'--------------- shop_id: {shop_id} is exist, skip ---------------') continue shop_name = shop_data.get("name") data_dict = { "shop_id": shop_id, "shop_name": shop_name } # print(data_dict) info_list.append(data_dict) sql_shop_id_list.append(shop_id) # 插入数据库 if info_list: try: sql_pool.insert_many(table="xingchao_shop_record", data_list=info_list) except Exception as e: log.warning(f"{inspect.currentframe().f_code.co_name}, {e[:500]}") return len(items), total_page return 0, total_page else: log.debug(f'--------------- page {page} error, {response.json().get("message")} ---------------') return 0, 0 # ---------------------------------------------------------------------------------------------------------------------- def get_shop_product_sold_list(log, shop_id, sql_pool, sql_product_id_list, x_token): page = 1 while page <= 100: try: log.debug(f'--------------- page {page}, shop_id {shop_id} start ---------------') len_items, total_page = get_shop_product_sold_single_page(log, page, sql_pool, shop_id, sql_product_id_list, x_token) except Exception as e: log.error( f"{inspect.currentframe().f_code.co_name} Request get_shop_product_sold_single_page for page:{page}, {e}") len_items = 0 total_page = 0 if len_items < 20: log.debug(f'--------------- page {page} has {len_items} items, break ---------------') break if total_page == page: log.debug(f'--------------- page {page} has {total_page} pages, break ---------------') break page += 1 # 设置等待时间 避免查询太频繁 time.sleep(random.uniform(0.5, 1)) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_shop_product_sold_single_page(log, page: int, sql_pool, shop_id, sql_product_id_list, x_token): url = "https://api.xingchao6.com/AppClient/v1.1/product/list/page" ts = int((time.time()) * 1000) data = { "shop_id": shop_id, # "shop_id": 53, "state": 2, "page": page, # "page": 1, "ob_k": "show_time", "ob_v": 1, "timestamp": f"{ts}" } sign, nonce = get_sign(data, ts) headers = HEADERS.copy() headers["sign"] = sign headers["nonce"] = nonce headers["timestamp"] = str(ts) # response = requests.post(url, headers=headers, json=data) response = requests.post(url, headers=headers, json=data, proxies=get_proxys(log), timeout=10) # print(response.text) response.raise_for_status() if response.json().get("code") == 200: total_page = response.json().get("data", {}).get("page", {}).get("total_page", 0) log.debug(f'--------------- page {page} has {total_page} items ---------------') if total_page > 0: items = response.json().get("data", {}).get("list", []) if not items: log.debug(f'--------------- page {page} has no items ---------------') return info_list = [] for item in items: product_id = item.get("id") no = item.get("no") if product_id and product_id in sql_product_id_list: log.debug(f'--------------- product_id {product_id} has been crawled ---------------') continue cate_id = item.get("cate_id") create_time = item.get("create_time") title = item.get("title") subtitle = item.get("subtitle") spec_config = item.get("spec_config") spec_total = item.get("spec_total") sort = item.get("sort") is_rnd_show = item.get("is_rnd_show") tag_type = item.get("tag_type") tag_type_cn = item.get("tag_type_cn") state = item.get("state") state_cn = item.get("state_cn") shop_id = item.get("shop", {}).get("id") shop_name = item.get("shop", {}).get("name") is_purchase_limit = item.get("is_purchase_limit") on_sale_time = item.get("on_sale_time") if on_sale_time: on_sale_time = transfer_ts(on_sale_time) discount_config = item.get("discount_config") try: product_detail = get_sold_detail(log, no, x_token) except Exception as e: log.error(f"获取商品详情失败: {e}") product_detail = {} data_dict = { "product_id": product_id, "no": no, "cate_id": cate_id, "create_time": create_time, "title": title, "subtitle": subtitle, "img": product_detail["img"], "price_sale": product_detail["price_sale"], "total_price": product_detail["total_price"], "sale_num": product_detail["sale_num"], "spec_config": spec_config, "spec_total": spec_total, "sort": sort, "is_rnd_show": is_rnd_show, "tag_type": tag_type, "tag_type_cn": tag_type_cn, "state": state, "state_cn": state_cn, "shop_id": shop_id, "shop_name": shop_name, "is_purchase_limit": is_purchase_limit, "category": product_detail["category"], "on_sale_time": on_sale_time, "end_time": product_detail["end_time"], "finish_time": product_detail["finish_time"], "discount_config": str(discount_config), "spec_config_desc": product_detail["spec_config_desc"], "content": product_detail["content"], "brief": product_detail["brief"], "video_url": product_detail["video_url"] } # print(data_dict) info_list.append(data_dict) sql_product_id_list.append(product_id) if info_list: sql_pool.insert_many(table="xingchao_product_record", data_list=info_list) return len(items), total_page return 0, total_page else: log.debug(f'--------------- page {page} error, {response.json().get("message")} ---------------') return 0, 0 @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_sold_detail(log, no, x_token): log.debug(f'--------------- get_sold_detail:{no} ---------------') url = "https://api.xingchao6.com/AppClient/v2/product/detail" ts = int((time.time()) * 1000) params = { # "no": "SN10763635", "no": no, "timestamp": f"{ts}" } sign, nonce = get_sign(params, ts) headers = HEADERS.copy() headers["sign"] = sign headers["nonce"] = nonce headers["timestamp"] = str(ts) # response = requests.get(url, headers=headers, params=params) response = requests.get(url, headers=headers, params=params, proxies=get_proxys(log), timeout=10) # print(response.text) response.raise_for_status() if response.json().get("code") == 200: item = response.json().get("data", {}) if not item: return {} price_sale = item.get("price_sale") sale_num = item.get("sale_num") if price_sale: results = sale_num * float(price_sale) total_price = round(results, 2) else: total_price = None defined_config = item.get("defined_config", []) if defined_config: brief = defined_config[0].get("brief") # 活动 else: brief = None content = item.get("content", "") # content = html.escape(item.get("content", "")) # print("content:", content) try: video_url = get_video_info(log, no, x_token) except Exception as e: log.debug(e) video_url = None detail_dict = { "price_sale": price_sale, "sale_num": sale_num, "total_price": str(total_price), "img": ','.join([url for url_list in item['img'].values() for url in url_list]), "category": item.get("category").get("name"), "end_time": transfer_ts(item.get("end_time")), "finish_time": item.get("finish_time"), "spec_config_desc": item.get("template", {}).get("spec_config_desc"), "content": str(content), "brief": brief, "video_url": video_url } # print('detail_dict:', detail_dict) return detail_dict else: log.debug( f'-------------{inspect.currentframe().f_code.co_name}-- no {no} error, {response.json().get("message")} ---------------') return {} @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_video_info(log, no, x_token): log.debug(f'-------------{inspect.currentframe().f_code.co_name}-- no {no} ---------------') url = "https://api.xingchao6.com/AppClient/v1.1/live/watch" ts = int((time.time()) * 1000) params = { # "product_no": "SN10763635", "product_no": no, "timestamp": str(ts) } sign, nonce = get_sign(params, ts) headers = HEADERS.copy() headers["sign"] = sign headers["nonce"] = nonce headers["timestamp"] = str(ts) headers["x-token"] = x_token # response = requests.get(url, headers=headers, params=params) response = requests.get(url, headers=headers, params=params, proxies=get_proxys(log), timeout=10) # print(response.text) response.raise_for_status() if response.json().get("code") == 200: item = response.json().get("data", {}) if not item: return None video_url = item.get("url") return video_url else: log.debug( f'-------------{inspect.currentframe().f_code.co_name}-- no {no} error, {response.json().get("message")} ---------------') return None @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_player_list(log, no, sql_pool, x_token): """ 获取玩家列表 type:1 -> 福运签列表 / type:2 -> 用户列表 :param log: :param sql_pool: :param no: :param x_token: """ log.debug(f'-------------{inspect.currentframe().f_code.co_name}-- get {no} ---------------') url = "https://api.xingchao6.com/AppClient/v1/product/order/km/list/page" next_id = None while True: ts = int((time.time()) * 1000) params = { "type": "2", # "no": "SN10763635", "no": no, "timestamp": str(ts) } if next_id: params["next_id"] = next_id sign, nonce = get_sign(params, ts) headers = HEADERS.copy() headers["sign"] = sign headers["nonce"] = nonce headers["timestamp"] = str(ts) headers["x-token"] = x_token # response = requests.get(url, headers=headers, params=params) response = requests.get(url, headers=headers, params=params, proxies=get_proxys(log), timeout=10) # print('get_player_list:', response.json()) response.raise_for_status() resp_json = response.json() # print('get_player_list:',resp_json) if response.json().get("code") == 500: log.error(f"{no}商品不存在, 更改状态为2..........") sql_pool.update_one("UPDATE xingchao_product_record SET player_stats = 2 WHERE no = %s", (no,)) break data_list = resp_json.get("data", {}).get("list", []) if not data_list: logger.debug("data_list is empty, get_player_list end") # data_list为空 更改状态为3 未上传 sql_pool.update_one("UPDATE xingchao_product_record SET player_stats = 3 WHERE no = %s", (no,)) break player_data_list = [] for item in data_list: card_id = item.get("card_id") card_name = item.get("card_name") card_set = item.get("card_set") create_time = item.get("create_time") km_id = item.get("id") num = item.get("num") # ori_id = item.get("ori_id") ori_id = item.get("ori_template_id") player = item.get("player") player_cn = item.get("player_cn") seq = item.get("seq") team = item.get("team") team_cn = item.get("team_cn") user_id = item.get("user_id") user_name = item.get("user_name") # player_data = ( # no, card_id, card_name, card_set, create_time, km_id, num, ori_id, player, player_cn, seq, team, # team_cn, user_id, user_name # ) player_dict = { "no": no, "card_id": card_id, "card_name": card_name, "card_set": card_set, "create_time": create_time, "km_id": km_id, "num": num, "ori_id": ori_id, "player": player, "player_cn": player_cn, "seq": seq, "team": team, "team_cn": team_cn, "user_id": user_id, "user_name": user_name } # print('player_data:', player_data) player_data_list.append(player_dict) try: sql_pool.insert_many(table="xingchao_player_record", data_list=player_data_list) except Exception as e: log.warning(f"保存数据失败:{e[:500]}") # next_id = data_list[-1].get("id") if data_list else None next_id = data_list[-1].get("id") if len(data_list) == 20 else None if not next_id: logger.debug("next_id is not, get_player_list end") break # 设置等待时间 避免查询太频繁 time.sleep(random.uniform(0.5, 1)) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_good_report_list(): """ 获得本期好卡 """ url = "https://api.xingchao6.com/AppClient/v1/product/unpackage/list/page" ts = int((time.time()) * 1000) params = { "get_type": "0", "type": "2", "no": "SN10763635", "page_size": "10", "timestamp": "1752560144500" } sign, nonce = get_sign(params, ts) headers = HEADERS.copy() headers["sign"] = sign headers["nonce"] = nonce headers["timestamp"] = str(ts) response = requests.get(url, headers=headers, params=params) print(response.text) print(response) @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_others_report_list(log, no, sql_pool): """ 其他赠品 """ try: next_id = None all_data = [] while True: url = "https://api.xingchao6.com/AppClient/v1/product/unpackage/list/page" ts = int((time.time()) * 1000) params = { "get_type": "0", "type": "3", # "no": "SN10763635", "no": no, "page_size": "100", "timestamp": f"{ts}" } if next_id: params["next_id"] = next_id sign, nonce = get_sign(params, ts) headers = HEADERS.copy() headers["sign"] = sign headers["nonce"] = nonce headers["timestamp"] = str(ts) # response = requests.get(url, headers=headers, params=params) response = requests.get(url, headers=headers, params=params, proxies=get_proxys(log), timeout=10) # print(response.text) response.raise_for_status() if response.json().get("code") == 500: log.error(f"{no}商品不存在, 更改状态为2..........") sql_pool.update_one("UPDATE xingchao_product_record SET report_stats = 2 WHERE no = %s", (no,)) break data = response.json().get("data", {}).get("data", []) if not data: log.debug(f"{no}response.json() data 为空, 更改状态为3.........") sql_pool.update_one("UPDATE xingchao_product_record SET report_stats = 3 WHERE no = %s", (no,)) break all_data.extend(data) # 获取下一页的标识符 next_id = data[-1].get("id") if len(data) < 100: log.debug(f"{no} -> len data 长度小于100, 停止查询.........") break # 设置等待时间 避免查询太频繁 time.sleep(random.uniform(0.5, 1)) info_list = [] if all_data: for item in all_data: card_id = item.get("card_id") card_name = item.get("card_name") card_set = item.get("card_set") create_time = item.get("create_time") report_id = item.get("id") imgs = item.get("img") is_good = item.get("is_good") lottery_num = item.get("lottery_num") player = item.get("player") player_cn = item.get("player_cn") status = item.get("status") status_cn = item.get("status_cn") team = item.get("team") team_cn = item.get("team_cn") user_name = item.get("user_name") # info = ( # no, card_id, card_name, card_set, create_time, report_id, imgs, is_good, lottery_num, player, # player_cn, status, status_cn, team, team_cn, user_name) data_dict = { "no": no, "card_id": card_id, "card_name": card_name, "card_set": card_set, "create_time": create_time, "report_id": report_id, "imgs": imgs, "is_good": is_good, "lottery_num": lottery_num, "player": player, "player_cn": player_cn, "status": status, "status_cn": status_cn, "team": team, "team_cn": team_cn, "user_name": user_name } # print(data_dict) info_list.append(data_dict) if info_list: sql_pool.insert_many(table="xingchao_report_record", data_list=info_list) log.info(f"{no}商品数据采集完成, 更改状态为1..........") sql_pool.update_one("UPDATE xingchao_product_record SET report_stats = 1 WHERE no = %s", (no,)) except Exception as e: log.error(e) @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def xc_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: token = sql_pool.select_one("select token from xingchao_token") x_token = token[0] logger.debug(f"x_token:{x_token}") try: sql_shop_id_list = sql_pool.select_all("select shop_id from xingchao_shop_record") sql_shop_id_list = [i[0] for i in sql_shop_id_list] get_product_list(log, sql_pool, sql_shop_id_list) except Exception as e: log.error(f"Request shop id error: {e}") time.sleep(10) try: sql_product_id_list = sql_pool.select_all("select product_id from xingchao_product_record") sql_product_id_list = [i[0] for i in sql_product_id_list] sql_query_shop_id_list = sql_pool.select_all("select shop_id from xingchao_shop_record") sql_query_shop_id_list = [i[0] for i in sql_query_shop_id_list] for shop_id in sql_query_shop_id_list: try: get_shop_product_sold_list(log, shop_id, sql_pool, sql_product_id_list, x_token) except Exception as e: log.error(e) except Exception as e: log.error(f"Request get_shop_product_sold_list error: {e}") time.sleep(10) try: sql_player_list = sql_pool.select_all("select no from xingchao_product_record where player_stats = 0") sql_player_list = [i[0] for i in sql_player_list] for noid in sql_player_list: try: get_player_list(log, noid, sql_pool, x_token) # 保存完成 更改状态为1 sql_pool.update_one("UPDATE xingchao_product_record SET player_stats = 1 WHERE no = %s", (noid,)) except Exception as e: log.error(f"Request get_player_list error: {e}") sql_pool.update_one("UPDATE xingchao_product_record SET player_stats = 2 WHERE no = %s", (noid,)) except Exception as e: log.error(f"Request player list error: {e}") time.sleep(10) try: sql_report_list = sql_pool.select_all("select no from xingchao_product_record where report_stats = 0") sql_report_list = [i[0] for i in sql_report_list] for noid in sql_report_list: try: get_others_report_list(log, noid, sql_pool) except Exception as e: log.error(f"Request get_report_list error: {e}") except Exception as e: log.error(f"Request get_shop_data_list error: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') def schedule_task(): """ 爬虫模块 定时任务 的启动文件 """ # 立即运行一次任务 # xc_main(log=logger) # 设置定时任务 schedule.every().day.at("00:01").do(xc_main, log=logger) while True: schedule.run_pending() time.sleep(1) if __name__ == '__main__': # get_product_list() # get_product_single_page(logger, 1, None) # get_shop_product_sold() # get_sold_detail() # get_player_list(logger,'SN10763635',None,'782a2174df1a19aa26904dad1d347c97') # get_good_report_list() # get_others_report_list(logger, 'SN10763635') # get_video() # xc_main(logger) schedule_task()