# -*- coding: utf-8 -*- # Author : Charley # Python : 3.10.8 # Date : 2025/2/24 18:34 import pytz import inspect import requests import user_agent from loguru import logger from datetime import datetime from mysq_pool import MySQLConnectionPool from tenacity import retry, stop_after_attempt, wait_fixed logger.remove() logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00", format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}", level="DEBUG", retention="15 day") def after_log(retry_state): """ retry 回调 :param retry_state: RetryCallState 对象 """ # 检查 args 是否存在且不为空 if retry_state.args and len(retry_state.args) > 0: log = retry_state.args[0] # 获取传入的 logger else: log = logger # 使用全局 logger if retry_state.outcome.failed: log.warning( f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times") else: log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e def save_data(sql_pool, info): """ 保存数据 :param sql_pool: sql连接池对象 :param info: 保存的数据 -> tuple """ sql = """ INSERT INTO gbca_record (rating_code, front_img, back_img, company_short_name, goods_name, goods_score_name, year, publisher, brand, sub_brand, card_no, middle_score, border_score, card_angle_score, surface_score, sign_score, issue_limit, category2, company_id, create_time, update_time, order_code, card_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""" sql_pool.insert_one(sql, info) def transfer_ts(timestamp_ms) -> str: """ 转换时间戳 -> 1615975247000 :param timestamp_ms: :return: ret_ts -> str """ # 将毫秒转换为秒 timestamp_s = timestamp_ms / 1000.0 # 创建 UTC 时间 utc_dt = datetime.fromtimestamp(timestamp_s, pytz.utc) # 需要转换到特定时区(例如 'Asia/Shanghai') shanghai_tz = pytz.timezone('Asia/Shanghai') shanghai_dt = utc_dt.astimezone(shanghai_tz) ret_ts = shanghai_dt.strftime('%Y-%m-%d %H:%M:%S') return ret_ts def parse_resp(log, resp, rating_code, sql_pool): """ 解析响应 :param log: logger对象 :param resp: 响应 :param rating_code: 评级编号 :param sql_pool: sql连接池对象 """ if resp.get("errorCode") == 0: data = resp.get("data") img_list = data.get("imgList") if len(img_list) == 2: front_img = img_list[0].get("self") back_img = img_list[1].get("self") elif len(img_list) == 1: front_img = img_list[0].get("self") back_img = None else: log.warning(f"{inspect.currentframe().f_code.co_name} -> No img_list:{img_list}") front_img = None back_img = None company_short_name = data.get("companyShortName") # 评级公司简称 goods_name = data.get("goodsName") # 名称 goods_score_name = data.get("goodsScoreName") # 分数 category2 = data.get("category2") company_id = data.get("companyId") create_time = data.get("createTime") create_time = transfer_ts(create_time) if create_time else None update_time = data.get("updateTime") update_time = transfer_ts(update_time) if update_time else None order_code = data.get("orderCode") card_id = data.get("id") attr_year = data.get("attr", []) attr_mapping = { '年份': 'year', '发行商': 'publisher', '卡片系列名称': 'brand', '子系列名称': 'sub_brand', '卡片编码': 'card_no', '居中分数': 'middle_score', '边框分数': 'border_score', '卡角分数': 'card_angle_score', '表面分数': 'surface_score', '签字分数': 'sign_score', '限发数': 'issue_limit' } res = {} for a in attr_year: for key, var_name in attr_mapping.items(): if key in a: try: res[var_name] = a.replace(f'{key}:', '').strip() except Exception as e: log.error(f"Error parsing {key} from {a}: {e}") break else: # 循环遍历完所有键值对后都没有找到与 a 匹配的 key(即没有通过 break 提前退出),则会执行 else 子句内的代码 log.warning(f"{inspect.currentframe().f_code.co_name} -> a:{a}") # 统计 # countNum = data.get("countNum") # countRemark = data.get("countRemark") # if countNum: # statistics = f"{goods_score_name} 数量:{countNum}" # if countRemark: # statistics = f"{goods_score_name} 数量:{countNum}({countRemark})" # else: # statistics = f"{goods_score_name}:{countRemark}" info = ( rating_code, front_img, back_img, company_short_name, goods_name, goods_score_name, res.get("year"), res.get("publisher"), res.get("brand"), res.get("sub_brand"), res.get("card_no"), res.get("middle_score"), res.get("border_score"), res.get("card_angle_score"), res.get("surface_score"), res.get("sign_score"), res.get("issue_limit"), category2, company_id, create_time, update_time, order_code, card_id) # print(info) save_data(sql_pool, info) else: log.debug( f"{inspect.currentframe().f_code.co_name} rating_code:{rating_code} -> errorCode:{resp.get('errorCode')}, msg:{resp.get('msg')}") @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log) def get_resp(log, rating_code, sql_pool): """ 获取卡片信息 :param log: logger对象 :param rating_code: 评级编号 :param sql_pool: sql连接池对象 :return: """ log.info(f"{inspect.currentframe().f_code.co_name} rating_code:{rating_code}") headers = { "Accept": "application/json, text/plain, */*", "Content-Type": "application/json;charset=UTF-8", "Referer": "https://www.gongbocoins.com/", "User-Agent": user_agent.generate_user_agent(), "lang": "en" } url = "https://wapi.gongbocoins.com/gbca/orderCoin/getWebsiteRatingInfo" data = { "ratingCode": rating_code } response = requests.post(url, headers=headers, json=data, proxies=get_proxys(log), timeout=10) # print(response.json()) # print(response) response.raise_for_status() resp_json = response.json() if resp_json: parse_resp(log, resp_json, rating_code, sql_pool) else: log.warning(f"{inspect.currentframe().f_code.co_name} -> response:{response.status_code}") def get_811_code_list() -> list: """ 获取811类 的 code_list :return: code_list """ code_list = [code for code in range(8110000000, 8110150000)] return code_list def get_821_code_list() -> list: """ 获取821类 的 code_list :return: code_list """ code_list = [code for code in range(8210000000, 8210150000)] return code_list def get_851_code_list() -> list: """ 获取851类 的 code_list :return: code_list """ code_list = [code for code in range(8510013072, 8510150000)] return code_list @retry(stop=stop_after_attempt(50), wait=wait_fixed(1800), after=after_log) def gbca_main(log): """ 主函数 :param log: logger对象 """ log.info( f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................') # 配置 MySQL 连接池 sql_pool = MySQLConnectionPool(log=log) if not sql_pool: log.error("MySQL数据库连接失败") raise Exception("MySQL数据库连接失败") try: # rating_code = "8110008988" def process_code_list(code_list): for rating_code in code_list: try: get_resp(log, rating_code, sql_pool) except Exception as ce: log.error(f"{inspect.currentframe().f_code.co_name} -> error: {ce}") # code_list_811 = get_811_code_list() # process_code_list(code_list_811) # # code_list_821 = get_821_code_list() # process_code_list(code_list_821) code_list_851 = get_851_code_list() process_code_list(code_list_851) except Exception as e: log.error(f'{inspect.currentframe().f_code.co_name} error: {e}') finally: log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............') if __name__ == '__main__': gbca_main(logger) # aa = {'msg': '操作成功', 'data': {'companyShortName': '北京公博星卡部', 'goodsNotes': None, 'recoverSign': 0, 'ratingAmountLevel': 2, 'recoverRemark': None, 'goodsDefect': '', 'ratingBoxTypePid': None, 'screenSign': 0, 'ratingInstallType': 1, 'id': 14423275, 'goodsScore': '', 'attr': ['年份:2023', '发行商:PANINI', '卡片系列名称:DONRUSS', '卡片编码:#007', '居中分数:N/G', '边框分数:N/G', '卡角分数:N/G', '表面分数:N/G'], 'goodsName': 'RONDALE MOORE', 'category2': 53, 'countNum': '2', 'goodsSize': None, 'screenRemark': None, 'updateTime': None, 'ratingResult': 1, 'goodsScoreStatus': 'AUTH.', 'companyId': 36, 'ratingCode': '8110100083', 'createTime': 1702608381650, 'orderCode': '5221702608381652', 'coinName': 'RONDALE MOORE', 'goodsDefectName': None, 'countRemark': '含代码部分', 'customerTel': '13691236280', 'goodsScoreName': 'AUTH.', 'imgList': [{'self': 'https://imgcdnwww.gongbocoins.com/202502271733/d9ddd683daf93722c62baad3961809f8/Photo%2FukluhROUC8husk4held4%2F8110100083-1.jpg%2Fself', 'list': 'https://imgcdnwww.gongbocoins.com/202502271733/0ded1c6de30e58ebb7a0afacd8f7cc1f/Photo%2FukluhROUC8husk4held4%2F8110100083-1.jpg%2Flist'}, {'self': 'https://imgcdnwww.gongbocoins.com/202502271733/a800fcca54c8c5dd6f8a581b6f0624cb/Photo%2FukluhROUC8husk4held4%2F8110100083.jpg%2Fself', 'list': 'https://imgcdnwww.gongbocoins.com/202502271733/5091de46fbad3b48a1b4862751b2d08c/Photo%2FukluhROUC8husk4held4%2F8110100083.jpg%2Flist'}]}, 'errorCode': 0} # parse_resp(logger, aa, "8110100083", None)