# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/9/22 19:06 import time import inspect import requests import schedule import user_agent from loguru import logger from parsel import Selector from tenacity import retry, stop_after_attempt, wait_fixed from mysql_pool import MySQLConnectionPool logger.remove() logger.add("./logs/new_{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="7 day") """ cgc先跑6000+6位数——6075+6位数 """ HEADERS = { "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "user-agent": user_agent.generate_user_agent() } COOKIES = { "CaptchaValue": "eyJhbGdvcml0aG0iOiJTSEEtMjU2IiwiY2hhbGxlbmdlIjoiNTZlNTBmMzU4MWRlMGYyNGQ5OGI2MTBhYTJlMTdmNjIzNzU4MDdmZjExOWM2MjNjYTRhNzUyY2MxMmU3ZDNmYyIsIm51bWJlciI6NDUxMTcsInNhbHQiOiJmNWU0ODBkZWNhMDNmYmJmNzdiN2UyYmYiLCJzaWduYXR1cmUiOiI3ZjlkMTc3NjllOTMwM2I3NTM5OTRlNzRlZDg0MzU3NTZkMzljMzU5YTFhMzBmODAzODNlMWI4YjA0MGZhZDVjIiwidG9vayI6ODJ9", "saved-language": "zh-CN", "SessionID": "dd60b7e4-f638-41cd-bb56-e2e4929aee27", "_ga": "GA1.1.706688626.1757584379", "AltchaSessionID": "1622a445-bf77-43e5-be67-13b76ff8e5ca", "_ga_55FF3CQQK2": "GS2.1.s1758539017$o6$g0$t1758539017$j60$l0$h0" } def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_kdlproxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(5), wait=wait_fixed(2), after=after_log) def get_proxys(log): # 已购买账户 北美 # http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36927" # https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36927" http_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36931" https_proxy = "http://u1952150085001297:sJMHl4qc4bM0@proxy.123proxy.cn:36931" # url = "https://ifconfig.me" try: proxySettings = { "http": http_proxy, "https": https_proxy, } return proxySettings except Exception as e: log.error(f"Error getting proxy: {e}") raise e @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def extract_card_info(log, url, cookies=None): """ 从指定URL提取卡片信息 :param log: logger对象 :param url: 卡片详情页URL :param cookies: 请求cookies :return: 包含卡片信息的字典 """ log.debug(f"开始获取 {url} 页面信息.......................") if cookies is None: cookies = COOKIES try: response = requests.get(url, headers=HEADERS, cookies=cookies, timeout=10, proxies=get_kdlproxys(log)) log.debug(f'status_code: {response.status_code}') response.raise_for_status() # 检查请求是否成功 except requests.RequestException as e: logger.error(f"请求失败: {e}") return 2 # 经排查 正常号码段 返回源码中也有此字样 # if '找不到这个项目。请检查CGC评级号码是否正确。' in response.text: # log.warning(f"{url} 获取失败, 找不到这个项目。请检查CGC评级号码是否正确。") # return 3 selector = Selector(response.text) tag_dl_list = selector.xpath('//div[@class="results-pane"]/div[@class="certlookup-intro"]//dl') # print(tag_dl_list) if not tag_dl_list: log.warning(f"{url} 获取失败, 找不到dl标签") return 3 # 初始化变量 data_dict = { "rating_number": None, "card_name":None, "year": None, "manufacturer": None, "card_set": None, "card_no": None, "player": None, # "belonging": None, "grade": None, "image_front": None, "image_back": None } # 提取详细信息 for tag_dl in tag_dl_list: dt_text = tag_dl.xpath('./dt/text()').get() dd_text = tag_dl.xpath('./dd/text()').get() # print(dt_text, dd_text) if dt_text == "评级号码": data_dict["rating_number"] = dd_text.strip() if dd_text else None elif dt_text == "卡牌名称": data_dict["card_name"] = dd_text.strip() if dd_text else None elif dt_text == "年份": data_dict["year"] = dd_text elif dt_text == "制造商": data_dict["manufacturer"] = dd_text elif dt_text == "套装": data_dict["card_set"] = dd_text elif dt_text == "卡牌编号": data_dict["card_no"] = dd_text elif dt_text == "球员": data_dict["player"] = dd_text # elif dt_text == "归属": # data_dict["belonging"] = dd_text elif dt_text == "评级等级": data_dict["grade"] = dd_text.strip() if dd_text else None # 提取图片链接 tag_img_list = selector.xpath('//div[@class="results-pane"]//div[@class="certlookup-images-item"]/a/@href').getall() if len(tag_img_list) >= 2: data_dict["image_front"] = tag_img_list[0] data_dict["image_back"] = tag_img_list[1] elif len(tag_img_list) == 1: data_dict["image_front"] = tag_img_list[0] # print(data_dict) return data_dict @retry(stop=stop_after_attempt(100), wait=wait_fixed(3600), after=after_log) def cgc_card_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool.check_pool_health(): log.error("数据库连接池异常") raise RuntimeError("数据库连接池异常") try: while True: sql_cert_id_list = sql_pool.select_all("select cert_id from cgc_task where state = 0 limit 10000") sql_cert_id_list = [item[0] for item in sql_cert_id_list] if not sql_cert_id_list: log.info("没有需要处理的数据,等待下一轮处理....") time.sleep(3600) continue for cert_id in sql_cert_id_list: try: url = f"https://cards.cgccards.cn/certlookup/{cert_id}/" card_info = extract_card_info(log, url) if card_info and isinstance(card_info, dict): sql_pool.insert_one_or_dict(table="cgc_record", data=card_info) sql_pool.update_one_or_dict(table="cgc_task", data={"state": 1}, condition={"cert_id": cert_id}) elif card_info == 3: sql_pool.update_one_or_dict(table="cgc_task", data={"state": 3}, condition={"cert_id": cert_id}) elif card_info == 2: sql_pool.update_one_or_dict(table="cgc_task", data={"state": 2}, condition={"cert_id": cert_id}) except Exception as e: log.error(f"Error processing card: {e}") time.sleep(10) except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == "__main__": cgc_card_main(logger) # 6060110001