# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2026/2/27 11:22 import time import inspect import requests import schedule import user_agent from loguru import logger from crypto_utils import CryptoHelper from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") # 基础配置 BASE_URL = "https://cashier.yqszpay.com" PAGE_SIZE = 10 headers = { "User-Agent": user_agent.generate_user_agent(), "Connection": "Keep-Alive", "Accept-Encoding": "gzip", "Content-Type": "application/json", "channelNo": "88888888", "pageSize": str(PAGE_SIZE), # "pageNum": 1, "version": "1.9.9.82537" } def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理配置 :param log: 日志对象 :return: 代理字典 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def make_encrypted_post_request(log, url: str, request_data: dict, extra_headers: dict = None): """ 通用加密POST请求函数(带重试机制) :param log: 日志对象 :param url: 请求URL :param request_data: 请求数据字典(会被加密) :param extra_headers: 额外的请求头 :return: 解密后的响应数据,失败返回None """ request_headers = headers.copy() if extra_headers: request_headers.update(extra_headers) log.debug(f"Request URL: {url}, Data: {request_data}") encrypted_body = CryptoHelper.encrypt_request_data(request_data) # print(request_headers) response = requests.post(url, headers=request_headers, json=encrypted_body, timeout=22, proxies=get_proxys(log)) # response.raise_for_status() if response.status_code == 200: response_json = response.json() # log.debug(f"Raw response: {response_json}") if 'data' in response_json: decrypted = CryptoHelper.decrypt_response_data(response_json) # log.debug(f"Decrypted response: {decrypted}") return decrypted return response_json else: log.error(f"请求失败: {response.status_code}, Response: {response.text}") return None def get_shop_single_page(log, page_num, page_size=PAGE_SIZE): """ 获取商户列表(支持翻页) :param log: 日志对象 :param page_num: 页码 :param page_size: 每页条数 """ log.debug(f"Getting shop list, page: {page_num}") url = f"{BASE_URL}/zc-api/merchant/getMerMyList" request_data = {'pageNum': page_num, 'pageSize': page_size} try: resp = make_encrypted_post_request(log, url, request_data, extra_headers={"pageNum": str(page_num)}) except Exception as e: log.error(f"Error getting shop list: {e}") resp = None return resp def get_sold_single_page(log, mer_no, page_num): """ 获取商品列表(支持翻页) :param log: 日志对象 :param mer_no: 商户编号 :param page_num: 页码 """ log.info(f"Getting sold items for mer_no: {mer_no}, page: {page_num}") url = f"{BASE_URL}/zc-api/act/actProduct/getActList" request_data = { 'merNo': mer_no, 'pageNum': page_num, 'pageSize': PAGE_SIZE, 'queryType': 1 } return make_encrypted_post_request(log, url, request_data, extra_headers={"pageNum": str(page_num)}) def get_player_single_page(log, act_id, token, page_num, page_size=PAGE_SIZE): """ 获取玩家列表(支持翻页) :param log: 日志对象 :param act_id: 活动ID :param token: Authorization token :param page_num: 页码 :param page_size: 每页条数 """ log.debug(f"Getting player list for act_id: {act_id}, page: {page_num}") url = f"{BASE_URL}/zc-api/act/actOrder/getActOrderPublicDetails" request_data = {'actId': act_id, 'pageNum': page_num, 'pageSize': page_size} return make_encrypted_post_request( log, url, request_data, extra_headers={"Authorization": token, "pageNum": str(page_num)} ) def parse_shop_data(log, items, sql_pool): """ 解析商户数据 :param log: 日志对象 :param items: 商户列表 :param sql_pool: MySQL连接池 :return: 解析后的数据列表 """ log.debug(f"Parsing shop data...........") info_list = [] for item in items: # log.debug(f"Processing shop item: {item}") shop_id = item.get('merNo') shop_name = item.get('merName') sold_number = item.get('spell_number') # link_man = item.get('linkMan') # user_id = item.get('userId') fans = item.get('attentionNumber') data_dict = { 'shop_id': shop_id, 'shop_name': shop_name, 'sold_number': sold_number, 'fans': fans } log.debug(f"Parsed shop data: {data_dict}") info_list.append(data_dict) # 保存/更新 根据shop_id判断 是否存在,存在则更新,不存在则插入 sql = "INSERT INTO zc_shop_record (shop_id, shop_name, sold_number, fans) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE shop_name=VALUES(shop_name), sold_number=VALUES(sold_number), fans=VALUES(fans)" # 将字典列表转换为元组列表 args_list = [tuple(d.values()) for d in info_list] sql_pool.insert_many(query=sql, args_list=args_list) @retry(stop=stop_after_attempt(3), wait=wait_fixed(1), after=after_log) def get_video(log, token, pid): """ 获取活动视频信息 :param log: 日志对象 :param token: Authorization token :param pid: 活动ID :return: (live_id, open_time, close_time, video_url) """ url = "https://cashier.yqszpay.com/zc-api/live/actLive/getMerLiveInfo" request_data = {'actId': pid} log.debug(f"获取视频信息,actId: {pid}") resp_data = make_encrypted_post_request( log, url, request_data, extra_headers={"Authorization": token} ) # log.debug(f"视频响应: {resp_data}") live_id = resp_data.get('live', {}).get('liveId') live_open_time = resp_data.get('live', {}).get('openTime') live_close_time = resp_data.get('live', {}).get('closeTime') video_url = resp_data.get('live', {}).get('videoUrl') return live_id, live_open_time, live_close_time, video_url def parse_sold_data(log, token, items, sql_pool, shop_name): """ 解析商品数据 :param log: 日志对象 :param token: Authorization token :param items: 商品列表 :param sql_pool: MySQL连接池 :param shop_name: 商户名称 :return: 解析后的数据列表 """ info_list = [] for item in items: # log.debug(f"Processing sold item: {item}") shop_id = item.get('merNo') # 商户编号 pid = item.get('id') act_day = item.get('actDay') # 活动天数 act_logo = item.get('actLogo') act_name = item.get('actName') # 活动名称 act_no = item.get('actNo') # 活动编号 act_status = item.get('actStatus') # 活动状态 startDate = item.get('startDate') # 开始时间 endDate = item.get('complete_date') # 结束时间 storageId = item.get('storageId') # 存储ID storageName = item.get('storageName') # 存储名称 unitPrice = item.get('unitPrice') # 单价 sumPrice = item.get('sumPrice') # 总价 reality_price = item.get('realityPrice') # 实际价格 packageNumber = item.get('packageNumber') # 包配置 schedule_ = item.get('schedule') # 库存 live_id, live_open_time, live_close_time, video_url = get_video(log, token, pid) data_dict = { 'shop_id': shop_id, 'shop_name': shop_name, 'pid': pid, 'act_day': act_day, 'act_img': act_logo, 'act_name': act_name, 'act_no': act_no, 'act_status': act_status, 'start_date': startDate, 'end_date': endDate, 'storage_id': storageId, 'storage_name': storageName, 'unit_price': unitPrice, 'sum_price': sumPrice, 'reality_price': reality_price, 'package_number': packageNumber, 'schedule': schedule_, 'live_id': live_id, 'live_open_time': live_open_time, 'live_close_time': live_close_time, 'video_url': video_url } # log.debug(f"Parsed sold data: {data_dict}") # { 'live_close_time': None, 'video_url': None} info_list.append(data_dict) # 保存数据 sql_pool.insert_many(table='zc_product_record', data_list=info_list, ignore=True) def parse_player_data(log, items, sql_pool): """ 解析玩家数据 :param log: 日志对象 :param items: 玩家列表 :param sql_pool: MySQL连接池 :return: 解析后的数据列表 """ log.debug(f"Parsing player data...........") info_list = [] for item in items: # log.debug(f"Processing player item: {item}") pid = item.get('actId') give_number = item.get('giveNumber') # 份数 user_id = item.get('userId') user_name = item.get('userName') data_dict = { 'pid': pid, 'give_number': give_number, 'user_id': user_id, 'user_name': user_name } # log.debug(f"Parsed player data: {data_dict}") info_list.append(data_dict) # 保存数据 sql_pool.insert_many(table='zc_player_record', data_list=info_list, ignore=True) def get_shop_list(log, sql_pool): """ 商户列表翻页生成器 :param log: 日志对象 :param sql_pool: MySQL连接池 """ page_num = 1 total = 0 while page_num <= 100: result = get_shop_single_page(log, page_num, PAGE_SIZE) # print(result) if result is None: log.error(f"第 {page_num} 页请求失败,停止翻页") break data_list = result.get('rows', []) parse_shop_data(log, data_list, sql_pool) # 获取总条数(第一页时获取) if total is None and 'total' in result: total = result['total'] log.info(f"总记录数: {total}") # 检查是否有数据 if len(data_list) == 0: log.info(f"第 {page_num} 页无数据,停止翻页") break # 根据total判断是否超出范围 if total is not None and (page_num - 1) * PAGE_SIZE >= total: log.info(f"已遍历完所有数据,停止翻页") break log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}") page_num += 1 def get_sold_list(log, shop_id, token, sql_pool, shop_name): """ 商品列表翻页生成器 :param log: 日志对象 :param shop_id: shop_id :param token: Authorization token :param sql_pool: MySQL连接池 :param shop_name: 商户名称 """ page_num = 1 max_pages = 10 while page_num <= max_pages: result = get_sold_single_page(log, shop_id, page_num) # time.sleep(random.uniform(0.5, 1)) # 添加随机延迟,防止对目标服务器造成过大负载 # print(result) if result is None: log.error(f"第 {page_num} 页请求失败,停止翻页") break data_list = result.get('rows', []) parse_sold_data(log, token, data_list, sql_pool, shop_name) # 检查是否有数据 if len(data_list) < 10: log.info(f"第 {page_num} 页无数据,停止翻页") break log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}") page_num += 1 def get_player_list(log, act_id, token, sql_pool): """ 玩家列表翻页生成器 :param log: 日志对象 :param act_id: 活动ID :param token: Authorization token :param sql_pool: MySQL连接池 :return: has_data (True: 有数据, False: 无数据) """ page_num = 1 max_pages = 100 has_data = False while page_num <= max_pages: result = get_player_single_page(log, act_id, token, page_num) if result is None: log.error(f"第 {page_num} 页请求失败,停止翻页") break data_list = result.get('rows', []) # 如果有数据才解析 if len(data_list) > 0: has_data = True parse_player_data(log, data_list, sql_pool) # 检查是否有数据 if len(data_list) < 10: log.info(f"第 {page_num} 页无数据,停止翻页") break log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}") page_num += 1 return has_data @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def zc_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: # 获取 token token_row = sql_pool.select_one("SELECT token FROM zc_token WHERE id = 1") if not token_row: log.error("未查询到 token") return token = token_row[0] # player test # has_data = get_player_list(log, 1800, token, sql_pool) # 获取shop data try: get_shop_list(log, sql_pool) except Exception as e: log.error(f'iterate_shop_list error: {e}') time.sleep(5) # 获取sold data - 遍历所有商户 try: # 从 shop 表查询所有 merNo mer_no_rows = sql_pool.select_all("SELECT shop_id FROM zc_shop_record WHERE sold_number != 0") log.info(f"查询到 {len(mer_no_rows)} 个商户编号: {mer_no_rows}") for shop_id, shop_name in mer_no_rows: log.info(f"开始爬取商户 {shop_id}, {shop_name} 的商品数据") get_sold_list(log, shop_id, shop_name, token, sql_pool) except Exception as e: log.error(f'get_sold_list error: {e}') time.sleep(5) # 获取player data - 遍历所有活动 try: # 从 sold 表查询所有 actId act_id_rows = sql_pool.select_all("SELECT pid FROM zc_product_record WHERE player_state != 1") act_id_list = [row[0] for row in act_id_rows] if act_id_rows else [] log.info(f"查询到 {len(act_id_list)} 个活动ID") for act_id in act_id_list: try: # 先将当前 pid 的状态改为 1,表示开始查询 sql_pool.update_one("UPDATE zc_product_record SET player_state = 1 WHERE pid = %s", (act_id,)) log.info(f"将 pid: {act_id} 的状态更新为 1(开始查询)") log.info(f"开始爬取pid: {act_id} 的玩家数据") has_data = get_player_list(log, act_id, token, sql_pool) # 根据是否有数据更新状态 if has_data: log.info(f"pid: {act_id} 查询到数据,状态保持为 1") else: log.info(f"pid: {act_id} 没有数据,状态更新为 2") sql_pool.update_one("UPDATE zc_product_record SET player_state = 2 WHERE pid = %s", (act_id,)) except Exception as pid_error: # 如果查询失败,将状态改为 3 log.error(f"pid: {act_id} 查询失败,错误: {pid_error}") try: sql_pool.update_one("UPDATE zc_product_record SET player_state = 3 WHERE pid = %s", (act_id,)) log.info(f"已将 pid: {act_id} 的状态更新为 3(查询异常)") except Exception as update_error: log.error(f"更新 pid: {act_id} 状态失败: {update_error}") except Exception as e: log.error(f'iterate_player_list error: {e}') except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') def schedule_task(): """ 爬虫模块 定时任务 的启动文件 """ # 立即运行一次任务 zc_main(log=logger) # 设置定时任务 schedule.every().day.at("00:01").do(zc_main, log=logger) while True: schedule.run_pending() time.sleep(1) if __name__ == '__main__': # zc_main(logger) schedule_task()