| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- # -*- coding: utf-8 -*-
- # Author : Charley
- # Python : 3.10.8
- # Date : 2025/2/24 18:34
- import pytz
- import inspect
- import requests
- import user_agent
- from loguru import logger
- from datetime import datetime
- from mysq_pool import MySQLConnectionPool
- from tenacity import retry, stop_after_attempt, wait_fixed
- logger.remove()
- logger.add("./logs/{time:YYYYMMDD}.log", encoding='utf-8', rotation="00:00",
- format="[{time:YYYY-MM-DD HH:mm:ss.SSS}] {level} {message}",
- level="DEBUG", retention="15 day")
- def after_log(retry_state):
- """
- retry 回调
- :param retry_state: RetryCallState 对象
- """
- # 检查 args 是否存在且不为空
- if retry_state.args and len(retry_state.args) > 0:
- log = retry_state.args[0] # 获取传入的 logger
- else:
- log = logger # 使用全局 logger
- if retry_state.outcome.failed:
- log.warning(
- f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} Times")
- else:
- log.info(f"Function '{retry_state.fn.__name__}', Attempt {retry_state.attempt_number} succeeded")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_proxys(log):
- """
- 获取代理
- :return: 代理
- """
- tunnel = "x371.kdltps.com:15818"
- kdl_username = "t13753103189895"
- kdl_password = "o0yefv6z"
- try:
- proxies = {
- "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel},
- "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}
- }
- return proxies
- except Exception as e:
- log.error(f"Error getting proxy: {e}")
- raise e
- def save_data(sql_pool, info):
- """
- 保存数据
- :param sql_pool: sql连接池对象
- :param info: 保存的数据 -> tuple
- """
- sql = """
- INSERT INTO gbca_record (rating_code, front_img, back_img, company_short_name, goods_name, goods_score_name, year, publisher, brand, sub_brand, card_no, middle_score, border_score, card_angle_score, surface_score, sign_score, issue_limit, category2, company_id, create_time, update_time, order_code, card_id)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
- sql_pool.insert_one(sql, info)
- def transfer_ts(timestamp_ms) -> str:
- """
- 转换时间戳 -> 1615975247000
- :param timestamp_ms:
- :return: ret_ts -> str
- """
- # 将毫秒转换为秒
- timestamp_s = timestamp_ms / 1000.0
- # 创建 UTC 时间
- utc_dt = datetime.fromtimestamp(timestamp_s, pytz.utc)
- # 需要转换到特定时区(例如 'Asia/Shanghai')
- shanghai_tz = pytz.timezone('Asia/Shanghai')
- shanghai_dt = utc_dt.astimezone(shanghai_tz)
- ret_ts = shanghai_dt.strftime('%Y-%m-%d %H:%M:%S')
- return ret_ts
- def parse_resp(log, resp, rating_code, sql_pool):
- """
- 解析响应
- :param log: logger对象
- :param resp: 响应
- :param rating_code: 评级编号
- :param sql_pool: sql连接池对象
- """
- if resp.get("errorCode") == 0:
- data = resp.get("data")
- img_list = data.get("imgList")
- if len(img_list) == 2:
- front_img = img_list[0].get("self")
- back_img = img_list[1].get("self")
- elif len(img_list) == 1:
- front_img = img_list[0].get("self")
- back_img = None
- else:
- log.warning(f"{inspect.currentframe().f_code.co_name} -> No img_list:{img_list}")
- front_img = None
- back_img = None
- company_short_name = data.get("companyShortName") # 评级公司简称
- goods_name = data.get("goodsName") # 名称
- goods_score_name = data.get("goodsScoreName") # 分数
- category2 = data.get("category2")
- company_id = data.get("companyId")
- create_time = data.get("createTime")
- create_time = transfer_ts(create_time) if create_time else None
- update_time = data.get("updateTime")
- update_time = transfer_ts(update_time) if update_time else None
- order_code = data.get("orderCode")
- card_id = data.get("id")
- attr_year = data.get("attr", [])
- attr_mapping = {
- '年份': 'year',
- '发行商': 'publisher',
- '卡片系列名称': 'brand',
- '子系列名称': 'sub_brand',
- '卡片编码': 'card_no',
- '居中分数': 'middle_score',
- '边框分数': 'border_score',
- '卡角分数': 'card_angle_score',
- '表面分数': 'surface_score',
- '签字分数': 'sign_score',
- '限发数': 'issue_limit'
- }
- res = {}
- for a in attr_year:
- for key, var_name in attr_mapping.items():
- if key in a:
- try:
- res[var_name] = a.replace(f'{key}:', '').strip()
- except Exception as e:
- log.error(f"Error parsing {key} from {a}: {e}")
- break
- else:
- # 循环遍历完所有键值对后都没有找到与 a 匹配的 key(即没有通过 break 提前退出),则会执行 else 子句内的代码
- log.warning(f"{inspect.currentframe().f_code.co_name} -> a:{a}")
- # 统计
- # countNum = data.get("countNum")
- # countRemark = data.get("countRemark")
- # if countNum:
- # statistics = f"{goods_score_name} 数量:{countNum}"
- # if countRemark:
- # statistics = f"{goods_score_name} 数量:{countNum}({countRemark})"
- # else:
- # statistics = f"{goods_score_name}:{countRemark}"
- info = (
- rating_code, front_img, back_img, company_short_name, goods_name, goods_score_name, res.get("year"),
- res.get("publisher"), res.get("brand"), res.get("sub_brand"), res.get("card_no"), res.get("middle_score"),
- res.get("border_score"), res.get("card_angle_score"), res.get("surface_score"), res.get("sign_score"),
- res.get("issue_limit"), category2, company_id, create_time, update_time, order_code, card_id)
- # print(info)
- save_data(sql_pool, info)
- else:
- log.debug(
- f"{inspect.currentframe().f_code.co_name} rating_code:{rating_code} -> errorCode:{resp.get('errorCode')}, msg:{resp.get('msg')}")
- @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), after=after_log)
- def get_resp(log, rating_code, sql_pool):
- """
- 获取卡片信息
- :param log: logger对象
- :param rating_code: 评级编号
- :param sql_pool: sql连接池对象
- :return:
- """
- log.info(f"{inspect.currentframe().f_code.co_name} rating_code:{rating_code}")
- headers = {
- "Accept": "application/json, text/plain, */*",
- "Content-Type": "application/json;charset=UTF-8",
- "Referer": "https://www.gongbocoins.com/",
- "User-Agent": user_agent.generate_user_agent(),
- "lang": "en"
- }
- url = "https://wapi.gongbocoins.com/gbca/orderCoin/getWebsiteRatingInfo"
- data = {
- "ratingCode": rating_code
- }
- response = requests.post(url, headers=headers, json=data, proxies=get_proxys(log), timeout=10)
- # print(response.json())
- # print(response)
- response.raise_for_status()
- resp_json = response.json()
- if resp_json:
- parse_resp(log, resp_json, rating_code, sql_pool)
- else:
- log.warning(f"{inspect.currentframe().f_code.co_name} -> response:{response.status_code}")
- def get_811_code_list() -> list:
- """
- 获取811类 的 code_list
- :return: code_list
- """
- code_list = [code for code in range(8110000000, 8110150000)]
- return code_list
- def get_821_code_list() -> list:
- """
- 获取821类 的 code_list
- :return: code_list
- """
- code_list = [code for code in range(8210000000, 8210150000)]
- return code_list
- def get_851_code_list() -> list:
- """
- 获取851类 的 code_list
- :return: code_list
- """
- code_list = [code for code in range(8510013072, 8510150000)]
- return code_list
- @retry(stop=stop_after_attempt(50), wait=wait_fixed(1800), after=after_log)
- def gbca_main(log):
- """
- 主函数
- :param log: logger对象
- """
- log.info(
- f'开始运行 {inspect.currentframe().f_code.co_name} 爬虫任务....................................................')
- # 配置 MySQL 连接池
- sql_pool = MySQLConnectionPool(log=log)
- if not sql_pool:
- log.error("MySQL数据库连接失败")
- raise Exception("MySQL数据库连接失败")
- try:
- # rating_code = "8110008988"
- def process_code_list(code_list):
- for rating_code in code_list:
- try:
- get_resp(log, rating_code, sql_pool)
- except Exception as ce:
- log.error(f"{inspect.currentframe().f_code.co_name} -> error: {ce}")
- # code_list_811 = get_811_code_list()
- # process_code_list(code_list_811)
- #
- # code_list_821 = get_821_code_list()
- # process_code_list(code_list_821)
- code_list_851 = get_851_code_list()
- process_code_list(code_list_851)
- except Exception as e:
- log.error(f'{inspect.currentframe().f_code.co_name} error: {e}')
- finally:
- log.info(f'爬虫程序 {inspect.currentframe().f_code.co_name} 运行结束,等待下一轮的采集任务............')
- if __name__ == '__main__':
- gbca_main(logger)
- # aa = {'msg': '操作成功', 'data': {'companyShortName': '北京公博星卡部', 'goodsNotes': None, 'recoverSign': 0, 'ratingAmountLevel': 2, 'recoverRemark': None, 'goodsDefect': '', 'ratingBoxTypePid': None, 'screenSign': 0, 'ratingInstallType': 1, 'id': 14423275, 'goodsScore': '', 'attr': ['年份:2023', '发行商:PANINI', '卡片系列名称:DONRUSS', '卡片编码:#007', '居中分数:N/G', '边框分数:N/G', '卡角分数:N/G', '表面分数:N/G'], 'goodsName': 'RONDALE MOORE', 'category2': 53, 'countNum': '2', 'goodsSize': None, 'screenRemark': None, 'updateTime': None, 'ratingResult': 1, 'goodsScoreStatus': 'AUTH.', 'companyId': 36, 'ratingCode': '8110100083', 'createTime': 1702608381650, 'orderCode': '5221702608381652', 'coinName': 'RONDALE MOORE', 'goodsDefectName': None, 'countRemark': '含代码部分', 'customerTel': '13691236280', 'goodsScoreName': 'AUTH.', 'imgList': [{'self': 'https://imgcdnwww.gongbocoins.com/202502271733/d9ddd683daf93722c62baad3961809f8/Photo%2FukluhROUC8husk4held4%2F8110100083-1.jpg%2Fself', 'list': 'https://imgcdnwww.gongbocoins.com/202502271733/0ded1c6de30e58ebb7a0afacd8f7cc1f/Photo%2FukluhROUC8husk4held4%2F8110100083-1.jpg%2Flist'}, {'self': 'https://imgcdnwww.gongbocoins.com/202502271733/a800fcca54c8c5dd6f8a581b6f0624cb/Photo%2FukluhROUC8husk4held4%2F8110100083.jpg%2Fself', 'list': 'https://imgcdnwww.gongbocoins.com/202502271733/5091de46fbad3b48a1b4862751b2d08c/Photo%2FukluhROUC8husk4held4%2F8110100083.jpg%2Flist'}]}, 'errorCode': 0}
- # parse_resp(logger, aa, "8110100083", None)
|