# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/9/29 11:57 import user_agent import requests import inspect from loguru import logger from tenacity import retry, stop_after_attempt, wait_fixed from mysql_pool import MySQLConnectionPool logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") """ pcg 微信小程序 """ def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_certificate_info(log, proof_number, sql_pool): """ 获取卡牌信息 :param log: logger对象 :param proof_number: 证书编号 :param sql_pool: MySQL连接池 """ log.debug(f"开始获取证书编号为: {proof_number} 的信息.........") headers = { "Content-Type": "application/json", "User-Agent": user_agent.generate_user_agent() } url = "https://card.pcgcard.cn/api/card/card/search_card_result/" params = { "pageNo": "1", "pageSize": "1", "proof_number": proof_number # "proof_number": "10601111" } response = requests.get(url, headers=headers, params=params, proxies=get_proxys(log), timeout=22) # print(response.text) response.raise_for_status() resp_json = response.json() if resp_json["code"] == 400: log.error(f"{proof_number} 获取信息, 卡片不存在") # 更新状态为2 sql_pool.update_one_or_dict(table="pcg_task", data={"state": 2}, condition={"cert_id": proof_number}) else: resp_data = resp_json.get("data", {}) if not resp_data: log.error(f"{proof_number} 获取信息, 无数据") sql_pool.update_one_or_dict(table="pcg_task", data={"state": 2}, condition={"cert_id": proof_number}) return data_id = resp_data.get("id") create_datetime = resp_data.get("create_datetime") # 商品创建时间 update_datetime = resp_data.get("update_datetime") # 商品更新时间 name = resp_data.get("name") # 商品名称 version = resp_data.get("version") # 卡牌版本 card_number = resp_data.get("card_number") # 商品编号 year = resp_data.get("year") # 发行年份 score = resp_data.get("score") # 分数 owner = resp_data.get("language") # 商品归属 box = resp_data.get("box") # 框 corner = resp_data.get("corner") # 角 edge = resp_data.get("edge") # 边 face = resp_data.get("face") # 面 level_number = resp_data.get("level_number") # 分数等级 loss = resp_data.get("loss") # 扣分内容 post_number = resp_data.get("post_number") # 邮单编号 carousel_pic = resp_data.get("carousel_pic") # 轮播图片, 多个用英文逗号隔开 rating_order = resp_data.get("rating_order") # 评级顺序 user = resp_data.get("user") total_cards = resp_data.get("total_cards") # 同商品评级鉴定数量 data_dict = { "cert_id": proof_number, "data_id": data_id, "create_datetime": create_datetime, "update_datetime": update_datetime, "name": name, "version": version, "card_number": card_number, "year": year, "score": score, "owner": owner, "box": box, "corner": corner, "edge": edge, "face": face, "level_number": level_number, "loss": loss, "post_number": post_number, "carousel_pic": carousel_pic, "user": user, "rating_order": rating_order, "total_cards": total_cards } # print(data_dict) try: sql_pool.insert_one_or_dict(table="pcg_record", data=data_dict, ignore=True) sql_pool.update_one_or_dict(table="pcg_task", data={"state": 1}, condition={"cert_id": proof_number}) except Exception as e: log.error(f"插入数据失败: {e}") @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def pcg_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: while True: sql_cert_id_list = sql_pool.select_all("select cert_id from pcg_task where state = 0 limit 10000") sql_cert_id_list = [i[0] for i in sql_cert_id_list] log.info(f"开始处理 {len(sql_cert_id_list)} 条数据") if not sql_cert_id_list: log.info(f"没有待处理的数据, 停止查询.......") break for noid in sql_cert_id_list: try: get_certificate_info(log, noid, sql_pool) except Exception as e: log.error(f"Request get_certificate_info error: {e}") except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': # get_certificate_info(logger, "10601111", None) pcg_main(logger)