| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2026/2/27 11:22
- import time
- import inspect
- import requests
- import schedule
- import user_agent
- from loguru import logger
- from crypto_utils import CryptoHelper
- from mysql_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- logger.remove()
- logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="7 day")
- # 基础配置
- BASE_URL = "https://cashier.yqszpay.com"
- PAGE_SIZE = 10
- headers = {
- "User-Agent": user_agent.generate_user_agent(),
- "Connection": "Keep-Alive",
- "Accept-Encoding": "gzip",
- "Content-Type": "application/json",
- "channelNo": "88888888",
- "pageSize": str(PAGE_SIZE),
- # "pageNum": 1,
- "version": "1.9.9.82537"
- }
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_proxys(log):
- """
- 获取代理配置
- :param log: 日志对象
- :return: 代理字典
- """
- tunnel = "x371.kdltps.com:15818"
- kdl_username = "t13753103189895"
- kdl_password = "o0yefv6z"
- try:
- proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
- }
- return proxies
- except Exception as e:
- log.error(f"Error getting proxy: {e}")
- raise e
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def make_encrypted_post_request(log, url: str, request_data: dict, extra_headers: dict = None):
- """
- 通用加密POST请求函数(带重试机制)
- :param log: 日志对象
- :param url: 请求URL
- :param request_data: 请求数据字典(会被加密)
- :param extra_headers: 额外的请求头
- :return: 解密后的响应数据,失败返回None
- """
- request_headers = headers.copy()
- if extra_headers:
- request_headers.update(extra_headers)
- log.debug(f"Request URL: {url}, Data: {request_data}")
- encrypted_body = CryptoHelper.encrypt_request_data(request_data)
- # print(request_headers)
- response = requests.post(url, headers=request_headers, json=encrypted_body, timeout=22, proxies=get_proxys(log))
- # response.raise_for_status()
- if response.status_code == 200:
- response_json = response.json()
- # log.debug(f"Raw response: {response_json}")
- if 'data' in response_json:
- decrypted = CryptoHelper.decrypt_response_data(response_json)
- # log.debug(f"Decrypted response: {decrypted}")
- return decrypted
- return response_json
- else:
- log.error(f"请求失败: {response.status_code}, Response: {response.text}")
- return None
- def get_shop_single_page(log, page_num, page_size=PAGE_SIZE):
- """
- 获取商户列表(支持翻页)
- :param log: 日志对象
- :param page_num: 页码
- :param page_size: 每页条数
- """
- log.debug(f"Getting shop list, page: {page_num}")
- url = f"{BASE_URL}/zc-api/merchant/getMerMyList"
- request_data = {'pageNum': page_num, 'pageSize': page_size}
- try:
- resp = make_encrypted_post_request(log, url, request_data, extra_headers={"pageNum": str(page_num)})
- except Exception as e:
- log.error(f"Error getting shop list: {e}")
- resp = None
- return resp
- def get_sold_single_page(log, mer_no, page_num):
- """
- 获取商品列表(支持翻页)
- :param log: 日志对象
- :param mer_no: 商户编号
- :param page_num: 页码
- """
- log.info(f"Getting sold items for mer_no: {mer_no}, page: {page_num}")
- url = f"{BASE_URL}/zc-api/act/actProduct/getActList"
- request_data = {
- 'merNo': mer_no,
- 'pageNum': page_num,
- 'pageSize': PAGE_SIZE,
- 'queryType': 1
- }
- return make_encrypted_post_request(log, url, request_data, extra_headers={"pageNum": str(page_num)})
- def get_player_single_page(log, act_id, token, page_num, page_size=PAGE_SIZE):
- """
- 获取玩家列表(支持翻页)
- :param log: 日志对象
- :param act_id: 活动ID
- :param token: Authorization token
- :param page_num: 页码
- :param page_size: 每页条数
- """
- log.debug(f"Getting player list for act_id: {act_id}, page: {page_num}")
- url = f"{BASE_URL}/zc-api/act/actOrder/getActOrderPublicDetails"
- request_data = {'actId': act_id, 'pageNum': page_num, 'pageSize': page_size}
- return make_encrypted_post_request(
- log, url, request_data,
- extra_headers={"Authorization": token, "pageNum": str(page_num)}
- )
- def parse_shop_data(log, items, sql_pool):
- """
- 解析商户数据
- :param log: 日志对象
- :param items: 商户列表
- :param sql_pool: MySQL连接池
- :return: 解析后的数据列表
- """
- log.debug(f"Parsing shop data...........")
- info_list = []
- for item in items:
- # log.debug(f"Processing shop item: {item}")
- shop_id = item.get('merNo')
- shop_name = item.get('merName')
- sold_number = item.get('spell_number')
- # link_man = item.get('linkMan')
- # user_id = item.get('userId')
- fans = item.get('attentionNumber')
- data_dict = {
- 'shop_id': shop_id,
- 'shop_name': shop_name,
- 'sold_number': sold_number,
- 'fans': fans
- }
- log.debug(f"Parsed shop data: {data_dict}")
- info_list.append(data_dict)
- # 保存/更新 根据shop_id判断 是否存在,存在则更新,不存在则插入
- sql = "INSERT INTO zc_shop_record (shop_id, shop_name, sold_number, fans) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE shop_name=VALUES(shop_name), sold_number=VALUES(sold_number), fans=VALUES(fans)"
- # 将字典列表转换为元组列表
- args_list = [tuple(d.values()) for d in info_list]
- sql_pool.insert_many(query=sql, args_list=args_list)
- @retry(stop=stop_after_attempt(3), wait=wait_fixed(1), after=after_log)
- def get_video(log, token, pid):
- """
- 获取活动视频信息
- :param log: 日志对象
- :param token: Authorization token
- :param pid: 活动ID
- :return: (live_id, open_time, close_time, video_url)
- """
- url = "https://cashier.yqszpay.com/zc-api/live/actLive/getMerLiveInfo"
- request_data = {'actId': pid}
- log.debug(f"获取视频信息,actId: {pid}")
- resp_data = make_encrypted_post_request(
- log, url, request_data,
- extra_headers={"Authorization": token}
- )
- # log.debug(f"视频响应: {resp_data}")
- live_id = resp_data.get('live', {}).get('liveId')
- live_open_time = resp_data.get('live', {}).get('openTime')
- live_close_time = resp_data.get('live', {}).get('closeTime')
- video_url = resp_data.get('live', {}).get('videoUrl')
- return live_id, live_open_time, live_close_time, video_url
- def parse_sold_data(log, token, items, sql_pool, shop_name):
- """
- 解析商品数据
- :param log: 日志对象
- :param token: Authorization token
- :param items: 商品列表
- :param sql_pool: MySQL连接池
- :param shop_name: 商户名称
- :return: 解析后的数据列表
- """
- info_list = []
- for item in items:
- # log.debug(f"Processing sold item: {item}")
- shop_id = item.get('merNo') # 商户编号
- pid = item.get('id')
- act_day = item.get('actDay') # 活动天数
- act_logo = item.get('actLogo')
- act_name = item.get('actName') # 活动名称
- act_no = item.get('actNo') # 活动编号
- act_status = item.get('actStatus') # 活动状态
- startDate = item.get('startDate') # 开始时间
- endDate = item.get('complete_date') # 结束时间
- storageId = item.get('storageId') # 存储ID
- storageName = item.get('storageName') # 存储名称
- unitPrice = item.get('unitPrice') # 单价
- sumPrice = item.get('sumPrice') # 总价
- reality_price = item.get('realityPrice') # 实际价格
- packageNumber = item.get('packageNumber') # 包配置
- schedule_ = item.get('schedule') # 库存
- live_id, live_open_time, live_close_time, video_url = get_video(log, token, pid)
- data_dict = {
- 'shop_id': shop_id,
- 'shop_name': shop_name,
- 'pid': pid,
- 'act_day': act_day,
- 'act_img': act_logo,
- 'act_name': act_name,
- 'act_no': act_no,
- 'act_status': act_status,
- 'start_date': startDate,
- 'end_date': endDate,
- 'storage_id': storageId,
- 'storage_name': storageName,
- 'unit_price': unitPrice,
- 'sum_price': sumPrice,
- 'reality_price': reality_price,
- 'package_number': packageNumber,
- 'schedule': schedule_,
- 'live_id': live_id,
- 'live_open_time': live_open_time,
- 'live_close_time': live_close_time,
- 'video_url': video_url
- }
- # log.debug(f"Parsed sold data: {data_dict}")
- # { 'live_close_time': None, 'video_url': None}
- info_list.append(data_dict)
- # 保存数据
- sql_pool.insert_many(table='zc_product_record', data_list=info_list, ignore=True)
- def parse_player_data(log, items, sql_pool):
- """
- 解析玩家数据
- :param log: 日志对象
- :param items: 玩家列表
- :param sql_pool: MySQL连接池
- :return: 解析后的数据列表
- """
- log.debug(f"Parsing player data...........")
- info_list = []
- for item in items:
- # log.debug(f"Processing player item: {item}")
- pid = item.get('actId')
- give_number = item.get('giveNumber') # 份数
- user_id = item.get('userId')
- user_name = item.get('userName')
- data_dict = {
- 'pid': pid,
- 'give_number': give_number,
- 'user_id': user_id,
- 'user_name': user_name
- }
- # log.debug(f"Parsed player data: {data_dict}")
- info_list.append(data_dict)
- # 保存数据
- sql_pool.insert_many(table='zc_player_record', data_list=info_list, ignore=True)
- def get_shop_list(log, sql_pool):
- """
- 商户列表翻页生成器
- :param log: 日志对象
- :param sql_pool: MySQL连接池
- """
- page_num = 1
- total = 0
- while page_num <= 100:
- result = get_shop_single_page(log, page_num, PAGE_SIZE)
- # print(result)
- if result is None:
- log.error(f"第 {page_num} 页请求失败,停止翻页")
- break
- data_list = result.get('rows', [])
- parse_shop_data(log, data_list, sql_pool)
- # 获取总条数(第一页时获取)
- if total is None and 'total' in result:
- total = result['total']
- log.info(f"总记录数: {total}")
- # 检查是否有数据
- if len(data_list) == 0:
- log.info(f"第 {page_num} 页无数据,停止翻页")
- break
- # 根据total判断是否超出范围
- if total is not None and (page_num - 1) * PAGE_SIZE >= total:
- log.info(f"已遍历完所有数据,停止翻页")
- break
- log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}")
- page_num += 1
- def get_sold_list(log, shop_id, token, sql_pool, shop_name):
- """
- 商品列表翻页生成器
- :param log: 日志对象
- :param shop_id: shop_id
- :param token: Authorization token
- :param sql_pool: MySQL连接池
- :param shop_name: 商户名称
- """
- page_num = 1
- max_pages = 10
- while page_num <= max_pages:
- result = get_sold_single_page(log, shop_id, page_num)
- # time.sleep(random.uniform(0.5, 1)) # 添加随机延迟,防止对目标服务器造成过大负载
- # print(result)
- if result is None:
- log.error(f"第 {page_num} 页请求失败,停止翻页")
- break
- data_list = result.get('rows', [])
- parse_sold_data(log, token, data_list, sql_pool, shop_name)
- # 检查是否有数据
- if len(data_list) < 10:
- log.info(f"第 {page_num} 页无数据,停止翻页")
- break
- log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}")
- page_num += 1
- def get_player_list(log, act_id, token, sql_pool):
- """
- 玩家列表翻页生成器
- :param log: 日志对象
- :param act_id: 活动ID
- :param token: Authorization token
- :param sql_pool: MySQL连接池
- :return: has_data (True: 有数据, False: 无数据)
- """
- page_num = 1
- max_pages = 100
- has_data = False
- while page_num <= max_pages:
- result = get_player_single_page(log, act_id, token, page_num)
- if result is None:
- log.error(f"第 {page_num} 页请求失败,停止翻页")
- break
- data_list = result.get('rows', [])
- # 如果有数据才解析
- if len(data_list) > 0:
- has_data = True
- parse_player_data(log, data_list, sql_pool)
- # 检查是否有数据
- if len(data_list) < 10:
- log.info(f"第 {page_num} 页无数据,停止翻页")
- break
- log.info(f"第 {page_num} 页查询完成,本页条数: {len(data_list)}")
- page_num += 1
- return has_data
- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log)
- def zc_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool.check_pool_health():
- log.error("数据库连接池异常")
- raise RuntimeError("数据库连接池异常")
- try:
- # 获取 token
- token_row = sql_pool.select_one("SELECT token FROM zc_token WHERE id = 1")
- if not token_row:
- log.error("未查询到 token")
- return
- token = token_row[0]
- # player test
- # has_data = get_player_list(log, 1800, token, sql_pool)
- # 获取shop data
- try:
- get_shop_list(log, sql_pool)
- except Exception as e:
- log.error(f'iterate_shop_list error: {e}')
- time.sleep(5)
- # 获取sold data - 遍历所有商户
- try:
- # 从 shop 表查询所有 merNo
- mer_no_rows = sql_pool.select_all("SELECT shop_id FROM zc_shop_record WHERE sold_number != 0")
- log.info(f"查询到 {len(mer_no_rows)} 个商户编号: {mer_no_rows}")
- for shop_id, shop_name in mer_no_rows:
- log.info(f"开始爬取商户 {shop_id}, {shop_name} 的商品数据")
- get_sold_list(log, shop_id, shop_name, token, sql_pool)
- except Exception as e:
- log.error(f'get_sold_list error: {e}')
- time.sleep(5)
- # 获取player data - 遍历所有活动
- try:
- # 从 sold 表查询所有 actId
- act_id_rows = sql_pool.select_all("SELECT pid FROM zc_product_record WHERE player_state != 1")
- act_id_list = [row[0] for row in act_id_rows] if act_id_rows else []
- log.info(f"查询到 {len(act_id_list)} 个活动ID")
- for act_id in act_id_list:
- try:
- # 先将当前 pid 的状态改为 1,表示开始查询
- sql_pool.update_one("UPDATE zc_product_record SET player_state = 1 WHERE pid = %s", (act_id,))
- log.info(f"将 pid: {act_id} 的状态更新为 1(开始查询)")
- log.info(f"开始爬取pid: {act_id} 的玩家数据")
- has_data = get_player_list(log, act_id, token, sql_pool)
- # 根据是否有数据更新状态
- if has_data:
- log.info(f"pid: {act_id} 查询到数据,状态保持为 1")
- else:
- log.info(f"pid: {act_id} 没有数据,状态更新为 2")
- sql_pool.update_one("UPDATE zc_product_record SET player_state = 2 WHERE pid = %s", (act_id,))
- except Exception as pid_error:
- # 如果查询失败,将状态改为 3
- log.error(f"pid: {act_id} 查询失败,错误: {pid_error}")
- try:
- sql_pool.update_one("UPDATE zc_product_record SET player_state = 3 WHERE pid = %s", (act_id,))
- log.info(f"已将 pid: {act_id} 的状态更新为 3(查询异常)")
- except Exception as update_error:
- log.error(f"更新 pid: {act_id} 状态失败: {update_error}")
- except Exception as e:
- log.error(f'iterate_player_list error: {e}')
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- def schedule_task():
- """
- 爬虫模块 定时任务 的启动文件
- """
- # 立即运行一次任务
- zc_main(log=logger)
- # 设置定时任务
- schedule.every().day.at("00:01").do(zc_main, log=logger)
- while True:
- schedule.run_pending()
- time.sleep(1)
- if __name__ == '__main__':
- # zc_main(logger)
- schedule_task()
|