import datetime import time import base64 import urllib3 import requests from Crypto.Hash import SHA256 from Crypto.PublicKey import RSA from pyasn1_modules import rfc8017 from Crypto.Signature import pkcs1_15 from pyasn1.codec.der import decoder from urllib.parse import urlparse, parse_qs, urlencode urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) HEADERS = { "user-agent": "Kuril+/5.47.1 (Android 11)", "cache-control": "max-age=3600", "x-request-version": "5.93.1", "x-request-sign-type": "RSA2", "x-request-package-sign-version": "0.0.1", "x-request-utm_source": "yingyongbao", "content-type": "application/json", "x-request-id": "", "x-request-sign-version": "v1", "referer": "https://qiandao.cn", "x-request-channel": "yingyongbao", "x-device-id": "fbc87d023bf9ff1b", "host": "api.qiandao.com", "x-request-package-id": "1006", "x-request-device": "android", "x-request-version-code": "245", # "authorization": 'Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6Ijg2NTIxNTgzOTI0MDk4MzQ1NyIsInR5cGUiOiJVU0VSIiwiZGV2aWNlSWQiOiIxMDA2IiwiZXhwIjoxNzUxMTc1NjczLCJpYXQiOjE3NTA5MTY0NzN9.aWao_5NvwpSKHw6-bbRVHtfY-_zw_SlHow4zI-A0qg3vZIvR2PbbXQ2XYIvaM0l7h9jbzGPA5veeW4wSWDVG9uqHQdX-zvghfvflxzvntBoYP1Un1e9TxZhYCrblqi256p49YFguFkTvuEdR15LvdtdlVadfq-GMX_UPKj_w2XnqTZHCryebnJcTQ5Y-uA9wnDEtHHMWUKJ4i_JuwaWR1DKzW_5-Kd4GcoJ3c75RVkfqlWlYLd9PKxVSLBEuiV2YCj459Ceo-hyGWz2x9gcBxg1FLbstsDiI5HgAEFl1upOImwVqj7cBS1B4MAwLUp0A-jdGFamji0PrfW-xH7ZcRA' } def get_proxys(log): """ 获取代理 :return: 代理 """ tunnel = "x371.kdltps.com:15818" kdl_username = "t13753103189895" kdl_password = "o0yefv6z" try: proxies = { "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel}, "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": kdl_username, "pwd": kdl_password, "proxy": tunnel} } return proxies except Exception as e: log.error(f"Error getting proxy: {e}") raise e def get_params_dict(url): """ 因app对list值做过特殊处理 从URL里提取出params :param url: :return: """ query = urlparse(url).query parsed = parse_qs(query, keep_blank_values=True) params = {} for key, value in parsed.items(): clean_key = key.rstrip("[]") params[clean_key] = value[0] if len(value) == 1 else value return params def get_sign(s): """ 获取签名 :param s: :return: """ modulus_b64_str = ( "MIIEpQIBAAKCAQEA5d6YfNTsNo5qrD16Lu4TrfrWND5QhuXQH/1ysF1p9rmcJ0SebEAJvaw0rWbZdDFK" "YYQwKiw6wikZhX5tXPf0XvQ1aTzzPqc5d3ZYm0kCSaP5+Qs4Mv+xVT10f/xoFxrCXCqq/kEpOvS/GGJAP" "H7zQV3fTwYZWQFrj01sJ2wO/nckn6TCNLt56dBUgYEL6lmOaLPbYxQqDFBkCIjYYKJJ228wpBLyuoCYxa" "8tqUHz2WgRTpWmlL/1sco8p7bC2DTYM9n6LgEyndDmfGGlIX09Tu+wXZmPbny3IZ97BpvB5pa7x1X/9lr" "n5FIOxBdH9kh2/n/hb5UB/wR1/C9tJuuafwIDAQABAoIBAQDGIV79+ei//XEklLjDyqFbzGDlFvEB1QPX" "DvXT3jB/YOyfTB3g4DGFMvEUpRm5dOLPushpEUZ0JEjDL33ELFSNo6CF3OssjaaSuYcWEY/POW80od8G1" "i1bc2T/C+gMQhxUpNJN5IxNLLeppMYJXsL9DJR14KPoe7jiA7G9KP6jhRuHH4JC+b91swCqB1bNsh6kWr" "C6dxfhJtlFNkIkZXdenQ3KCoFnApBvsv7bMsLIFqYyv2QRH+KVioV0t+XrCfHoQ4qMzUgFgel6ITcT4JZ" "DGGH+CmPEeiP4TXFtSs32Ii0qJ0CDjXaBWjwRCGWK07VRRZL6FLNYf6el12JgWACBAoGBAOfVDK5jnD9i" "5YLY9OdDFtyO4LtojObGJUnYH3Y2IoEiaArKWYsKQtgGI76TwdJwj2F7blQNXm2C/3tlrqeZOhz1LZqVa" "Nflkhml8/ghYZENcrTUTGKWTLmZnWWUQQZHN5aqHb0DTQPbqD6YspqVvlrjF2yJs1/UBZc7GiGbYSc5Ao" "GBAP3VKrl+QpESRa7B/6+GezvPV0DNf/b9zuTEMO/EljWLEnnSghBFBAlk2qihL4Hoo/qmWk2POwMHLYm" "b9Q9zHYzvwbcUOiQIpraFrN0Wn9B+nwDff0Wl+6vyA0W6AylBQHU4GfCZuts6qAMX+fKTAF/h24Ml36/1" "Lci45NIN3ld3AoGBANSSKXqNo2sLh16fCJA0l/XMnIu6pdfEv9Qh81c09BZsMfIS8F/pHLlvh77rRMFsr" "Eu6HcO8LmVDxHalGaxbd0muFg60CNpNidUysa1HDmsuZYshTpjnL5rPG99UPPtAudvQSExThn6PHomnAb" "10qII1z/iZmnu3sRil/KPsEP0hAoGBAK2BETQ77sp0//almt1jAkduwciE74xoDwzmYkDyUm6FAnsM/mS" "amFjHfIM5slyNJdFF9oH/fqniNSlT1l3aJP/aPsKi698HntUyaGezeEgu1QbmvntgKrhss/nsXQ7NEH9P" "esOwgT4rSP7cW7iI7P+dRcvOjqka4VHLuHUwj6OfAoGACezeNNpkxd8PhgjYkAHjtSnrEHgzQB1YErvAr" "jssDWQtkne6jL669lyx0al/f2dIfUxaPp79aOCETle9n/2mpng7vjrM8sGZuvKcsgs1ZsEOLepIEUIaYv" "m0S4kmzwYKVmhxhJzPjWX9scCgORQkNctsOKujmzNvBLwSxbpuxTE=" ) # base64 解码 modulus_b64_str 得到 ASN.1 结构(modulus 和 privateExponent) key_der = base64.b64decode(modulus_b64_str) # 尝试解码为 RSAPrivateKey 格式(标准 PKCS#1) private_key_asn1, _ = decoder.decode(key_der, asn1Spec=rfc8017.RSAPrivateKey()) n = int(private_key_asn1['modulus']) d = int(private_key_asn1['privateExponent']) # 构造 RSA 私钥 key = RSA.construct((n, 65537, d)) # 公共指数默认是 65537(0x10001) # 进行 SHA256 with RSA 签名 h = SHA256.new(s.encode('utf-8')) signature = pkcs1_15.new(key).sign(h) # 返回 base64 编码的签名结果 return base64.b64encode(signature).decode() def get_query_string(params, reverse=False): """ 对参数进行倒序排列 :param reverse: :param params: :return: """ if not params: return '' query_items = [] for key, value in params.items(): if isinstance(value, list): for v in value: query_items.append((f"{key}[]", v)) else: query_items.append((key, value)) if reverse: query_items = sorted(query_items, key=lambda x: x[0], reverse=reverse) return urlencode(query_items) def get_headers(url, params=None, token=None): """ 设置请求头 :param params: :param url: :param token: :return: """ ts = str(int(time.time() * 1000)) query_string = get_query_string(params, reverse=True) sign = get_sign(f'{urlparse(url).path}{query_string}{ts}') HEADERS["x-request-timestamp"] = ts HEADERS["x-request-sign"] = sign if token: HEADERS["authorization"] = token return HEADERS def request_post_data(log, url, data, proxy=True): """ 发送POST请求 :param log: :param url: :param data: :param proxy: :return: """ headers = get_headers(url) headers = get_headers(url, params, token) if proxy: response = requests.post(url, headers=headers, json=data, verify=False, proxies=get_proxys(log)) else: response = requests.post(url, headers=headers, json=data, verify=False) # response = requests.post(url, headers=headers, json=data, verify=False, proxies=get_proxys(log)) # response = requests.post(url, headers=headers, json=data, verify=False) response.raise_for_status() return response.json() def request_get_data(log, url, params, token=None, proxy=True): """ 发送GET请求 :param log: :param url: :param params: :param token: :param proxy: :return: """ headers = get_headers(url, params, token) # GET请求对url参数进行了特殊的处理 query_string = get_query_string(params) url = f'{url}?{query_string}' if proxy: response = requests.get(url, headers=headers, verify=False, proxies=get_proxys(log), timeout=22) else: response = requests.get(url, headers=headers, verify=False, timeout=22) # response = requests.get(url, headers=headers, verify=False, proxies=get_proxys(log), timeout=22) # print(response.text) response.raise_for_status() return response.json() def transform_ms(log, tradingTime): """ 将毫秒转换为时间 :param log: :param tradingTime: :return: """ try: timestamp_s = int(tradingTime) / 1000 # 转换为秒 dt = datetime.datetime.fromtimestamp(timestamp_s) trading_time = dt.strftime('%Y-%m-%d %H:%M:%S') except Exception as e: log.error(e) trading_time = None return trading_time def translate_s(log, timestamp): """ 解析 时间戳 秒 :param log: :param timestamp: :return: """ try: dt = datetime.datetime.fromtimestamp(int(timestamp)) formatted_date = dt.strftime('%Y-%m-%d %H:%M:%S') except Exception as e: log.error(e) formatted_date = None return formatted_date