# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2026/2/3 14:45 import inspect import requests import user_agent from loguru import logger from mysql_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed """ https://grading11.com/cert 2511000001-2511000020 2512000001-2512000945 2601000001-2601000779 """ logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理配置 :param log: 日志对象 :return: 代理字典 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_cert_data(log, cert_id, sql_pool): """ 获取证书数据并保存到数据库 :param log: 日志对象 :param cert_id: 证书ID :param sql_pool: 数据库连接池 """ log.debug(f'{inspect.currentframe().f_code.co_name} -> 开始获取证书数据:{cert_id}') headers = { "content-type": "application/json;charset=UTF-8", "user-agent": user_agent.generate_user_agent() } url = "https://grading11.com/api/vendure/getData" data = [{ "customQuery": { "query": "ordercard-query", "metadata": { # "certNumber": "2511000001" "certNumber": str(cert_id) } } }] try: response = requests.post(url, headers=headers, json=data, timeout=22, proxies=get_proxys(log)) # response = requests.post(url, headers=headers, json=data, timeout=22) # print(response.text) response.raise_for_status() resp_json = response.json() orderCard = resp_json.get("orderCard", {}) if not orderCard: log.error(f"Error getting orderCard: {resp_json}") sql_pool.update_one("UPDATE gea_task SET state = 3 WHERE cert_id = %s", (cert_id,)) return # 提取评分数据 totalGrade = orderCard.get("totalGrade") centeringTotalSubGrade = orderCard.get("centeringTotalSubGrade") surfacesTotalSubGrade = orderCard.get("surfacesTotalSubGrade") edgesTotalSubGrade = orderCard.get("edgesTotalSubGrade") cornersTotalSubGrade = orderCard.get("cornersTotalSubGrade") createdAt = orderCard.get("createdAt") date_graded = createdAt[:10] if createdAt else None remark = orderCard.get("remark") # 提取图片链接 try: front_image = orderCard.get("frontSlabbedImage", {}).get("source") except AttributeError: front_image = None try: back_image = orderCard.get("backSlabbedImage", {}).get("source") except AttributeError: back_image = None # 提取卡片信息 """ 2023 Pokémon Trading Card Game English 【SVP】POKÉMON X VAN GOGH MUSEUM 085 Pikachu with Grey Felt Hat PROMO """ tag_card = orderCard.get("card", {}) cardName = tag_card.get("cardName") # Pikachu with Grey Felt Hat cardNumber = tag_card.get("cardNumber") # 085 brand = tag_card.get("brand", {}).get("name") # Pokémon Trading Card Game releaseYear = tag_card.get("releaseYear") # 2023 language = tag_card.get("language") # en try: cardSet = tag_card.get("cardSet", {}).get("name") # 【SVP】POKÉMON X VAN GOGH MUSEUM except AttributeError: cardSet = None try: rarity = tag_card.get("rarity", {}).get("name") # PROMO except AttributeError: rarity = None data_dict = { "cert_id": cert_id, "total_grade": totalGrade, "centering_grade": centeringTotalSubGrade, "surfaces_grade": surfacesTotalSubGrade, "edges_grade": edgesTotalSubGrade, "corners_grade": cornersTotalSubGrade, "date_graded": date_graded, "remark": remark, "front_image": front_image, "back_image": back_image, "card_name": cardName, "card_number": cardNumber, "brand": brand, "release_year": releaseYear, "language": language, "card_set": cardSet, "rarity": rarity } # print(data_dict) sql_pool.insert_one_or_dict(table="gea_record", data=data_dict) sql_pool.update_one("UPDATE gea_task SET state = 1 WHERE cert_id = %s", (cert_id,)) except requests.RequestException as e: log.error(f"Request failed for cert_id {cert_id}: {e}") sql_pool.update_one( "UPDATE gea_task SET state = 2 WHERE cert_id = %s", (cert_id,) ) raise except Exception as e: log.error(f"Error processing cert_id {cert_id}: {e}") sql_pool.update_one( "UPDATE gea_task SET state = 2 WHERE cert_id = %s", (cert_id,) ) raise @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def gea_his_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: sql_id_list = sql_pool.select_all("SELECT cert_id FROM gea_task WHERE state != 1") sql_id_list = [item[0] for item in sql_id_list] log.info(f'共 {len(sql_id_list)} 个证书待处理') for cert_id in sql_id_list: try: get_cert_data(log, cert_id, sql_pool) except Exception as e: log.error(f'loop -> get_cert_data, error: {e}') sql_pool.update_one("UPDATE gea_task SET state = 2 WHERE cert_id = %s", (cert_id,)) except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': gea_his_main(logger)