# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/12/8 13:44 import time import random import inspect import requests import user_agent from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed from utils import create_request_data logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") auth = "266577ZDRB9Vb65wzF4G6!1xyVJv7BGKi7fcI757ILRQW!1sI9jAV5AOWQhXJjA!2oIrMsaLY!2WU7AnCtCshdk4EC3dvjQ==" headers = { 'authority': 'wechatapp.ichibankuji.cn', 'content-type': 'application/json', 'referer': 'https://servicewechat.com/wxd21e3190b2a44f73/21/page-frame.html', "terminalos": "YFSXZF", 'user-agent': user_agent.generate_user_agent(), 'authorization': auth } def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_set_single_page(log, page=1, sql_pool=None): """ 获取单个页面数据 :param log: 日志对象 :param page: 页码 :param sql_pool: 数据库连接池对象 :return: 数据长度 """ log.debug(f"{inspect.currentframe().f_code.co_name}, 正在获取第{page}页数据........") original_data = { "orderType": 0, "pageNum": page, "pageSize": 10, "status": 0, "type": 1 } url = "https://wechatapp.ichibankuji.cn/wechat/yfs/getSetList" request_data = create_request_data(original_data) payload = { "da": request_data } response = requests.post( url, headers=headers, json=payload, timeout=22 ) # print("响应内容:", response.text) response.raise_for_status() resp_json = response.json().get("data", {}).get("list", []) if not resp_json: log.debug("Not resp_json, 没有数据........") return 0 info_list = [] for item in resp_json: box_id = item.get("id") name = item.get("name") price = item.get("price") onsaleTime = item.get("onsaleTime") headUrl = item.get("headUrl") if headUrl: headUrl = f"https://static.ichibankuji.cn{headUrl}" scoresDeduct = item.get("scoresDeduct") # 积分抵扣 status = item.get("status") themeId = item.get("themeId") total = item.get("total") box_type = item.get("type") data_dict = { "set_id": box_id, "set_name": name, "price": price, "onsale_time": onsaleTime, "head_url": headUrl, "scores_deduct": scoresDeduct, "status": status, "theme_id": themeId, "set_total": total, "set_type": box_type } # print(data_dict) info_list.append(data_dict) if info_list: sql_pool.insert_many(table="rhyf_product_list_record", data_list=info_list, ignore=True) return len(info_list) def get_setlist_all_page(log, sql_pool=None): """ 获取所有页面数据 :param log: 日志对象 :param sql_pool: 数据库连接池对象 :return: """ page = 1 while True: try: len_data = get_set_single_page(log, page, sql_pool) if len_data < 10: log.debug(f"当前页为{page}, 数据长度为{len_data} ,数据已经获取完毕........") break page += 1 except Exception as e: log.error(f"Error getting set single page: {e}") break # ---------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(5), wait=wait_fixed(5), after=after_log) def get_box_detail(log, set_id: int, sql_pool=None): """ 获取箱子详情 :param log: 日志对象 :param set_id: 箱子ID :param sql_pool: 数据库连接池对象 :return: 数据长度 """ log.debug(f"{inspect.currentframe().f_code.co_name}, set_id:{set_id}, 获取 <详情> 数据........") url = "https://wechatapp.ichibankuji.cn/wechat/yfs/firstBoxDetail" original_data = { "queueMode": 2, "setId": f"{set_id}" } request_data = create_request_data(original_data) payload = { "da": request_data } response = requests.post(url, headers=headers, json=payload, timeout=22) # print(response.text) response.raise_for_status() resp_json = response.json().get("data", {}) if not resp_json: log.debug("Not resp_json, 没有数据........") return 0 boxNo = resp_json.get("boxNo") # 箱号 box_id = resp_json.get("id") orderNo = resp_json.get("orderNo") # price = resp_json.get("price") # status = resp_json.get("status") stock = resp_json.get("stock") # 库存 total = resp_json.get("total") # 总抽数 data_dict = { "set_id": set_id, "box_no": boxNo, "box_id": box_id, "order_no": orderNo, "stock": stock, "total": total } # print(data_dict) sql_pool.insert_one_or_dict(table="rhyf_product_detail_record", data=data_dict) @retry(stop=stop_after_attempt(5), wait=wait_fixed(5), after=after_log) def get_draw_single_page(log, box_id: int, page=1, sql_pool=None): """ 获取箱子抽奖数据 :param log: 日志对象 :param box_id: 箱子ID :param page: 页码 :param sql_pool: 数据库连接池对象 :return: 数据长度 """ log.debug(f"{inspect.currentframe().f_code.co_name}, box_id:{box_id}, 获取 <抽奖> 数据........") url = "https://wechatapp.ichibankuji.cn/wechat/yfs/getBoxDrawLog" original_data = { # "boxId": 84633, "boxId": box_id, "pageSize": 30, "isMine": 0, "pageNum": page } request_data = create_request_data(original_data) payload = { "da": request_data } response = requests.post(url, headers=headers, json=payload, timeout=22) # print(response.text) response.raise_for_status() resp_json = response.json().get("data", {}).get("list", []) if not resp_json: log.debug("Not resp_json, 没有数据........") return 0 info_list = [] for item in resp_json: set_id = item.get("setId") draw_id = item.get("id") nickName = item.get("nickName") createTime = item.get("createTime") awardInfo = item.get("awardInfo", []) if not awardInfo: log.debug(f"box_id:{box_id}, Not awardInfo List, 没有数据........") continue for award in awardInfo: award_type = award.get("typeStr") # 奖品等级 award_total = award.get("total") # 奖品数量 data_dict = { "set_id": set_id, "box_id": box_id, "draw_id": draw_id, "nick_name": nickName, "create_time": createTime, "award_type": award_type, "award_total": award_total } # print(data_dict) info_list.append(data_dict) # 写入数据库 if info_list: sql_pool.insert_many(table="rhyf_product_draw_record", data_list=info_list, ignore=True) return len(info_list) def get_draw_list(log, box_id: int, sql_pool=None): """ 获取箱子所有抽奖数据 :param log: 日志对象 :param box_id: 箱子ID :param sql_pool: 数据库连接池对象 :return: """ page = 1 while True: try: len_data = get_draw_single_page(log, box_id, page, sql_pool) if len_data < 30: log.debug(f"当前页为{page}, 数据长度为{len_data} ,数据已经获取完毕........") break page += 1 except Exception as e: log.error(f"Error getting draw single page: {e}") break # ---------------------------------------------------------------------------------------------------------------------- @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def rhyf_main(log): """ 主函数 自动售货机 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: try: log.debug('------------------- 获取所有页码商品 -------------------') get_setlist_all_page(log, sql_pool) except Exception as e: log.error(f'get_all_page error: {e}') # -------------------------------------------------------------------- try: log.debug('------------------- 获取所有商品详情 -------------------') sql_set_id_list = sql_pool.select_all("select set_id from rhyf_product_list_record where detail_state = 0") sql_set_id_list = [item[0] for item in sql_set_id_list] for set_id in sql_set_id_list: try: get_box_detail(log, set_id, sql_pool) # 更改状态 sql_pool.update_one_or_dict( table="rhyf_product_list_record", data={"detail_state": 1}, condition={"set_id": set_id} ) except Exception as e: log.error(f'get_box_detail error: {e}') sql_pool.update_one_or_dict( table="rhyf_product_list_record", data={"detail_state": 2}, condition={"set_id": set_id} ) except Exception as e: log.error(f'get_box_detail error: {e}') # --------------------------------------------------------------------- try: log.debug('------------------- 获取所有商品抽奖数据 -------------------') sql_box_id_list = sql_pool.select_all("select box_id from rhyf_product_detail_record where draw_state = 0") sql_box_id_list = [item[0] for item in sql_box_id_list] for box_id in sql_box_id_list: try: get_draw_list(log, box_id, sql_pool) # 更改状态 sql_pool.update_one_or_dict( table="rhyf_product_detail_record", data={"draw_state": 1}, condition={"box_id": box_id} ) except Exception as e: log.error(f'get_draw_list error: {e}') sql_pool.update_one_or_dict( table="rhyf_product_detail_record", data={"draw_state": 2}, condition={"box_id": box_id} ) except Exception as e: log.error(f'get_draw_list error: {e}') except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': # get_set_single_page(logger) # get_box_detail(logger, 9936) # get_draw_single_page(logger, 84633) rhyf_main(logger)