import base64 import hashlib import hmac import sys import re import os import urllib import time import requests abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.utils.config_utils import parse_args from dw_base.spark.spark_sql import SparkSQL import http.client import json from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives import serialization from base64 import b64encode # 公钥 public_key_pem = b""" -----BEGIN PUBLIC KEY----- MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDSaL/mqfq/30d5w6/05EL4073z ZgsomKTDI9wKUyz+ETkGwWzaNQm8BAXk9nJMCPz25fCTPd2BkifrS2KFKK2+e4hU pQxs+FQGaSeR8YEBWsCwh8bWaFWgxKuWpPPdfP6Vcnid/pTAsjbnw0KIHT7x83WZ qQTu3GUdyXkfyB41CQIDAQAB -----END PUBLIC KEY----- """ class UserInfo: """公司名称""" company_name: str """真实名称""" name: str """用户id""" user_id: int """用户名""" username: str def __init__(self, company_name: str, name: str, user_id: int, username: str) -> None: self.company_name = company_name self.name = name self.user_id = user_id self.username = username def __str__(self) -> str: return (f"UserInfo:\n" f" Company Name: {self.company_name}\n" f" Name: {self.name}\n" f" User ID: {self.user_id}\n" f" Username: {self.username}") def encrypt_user_id(user_id): public_key = serialization.load_pem_public_key(public_key_pem) encrypted = public_key.encrypt( user_id.encode(), padding.PKCS1v15() ) return b64encode(encrypted).decode() def get_user_info(user_id): encrypted_user_id = encrypt_user_id(user_id) conn = http.client.HTTPConnection("192.168.11.6", 18080) payload = json.dumps({ "encryptUserId": encrypted_user_id }) headers = { 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Content-Type': 'application/json' } try: conn.request("POST", "/account/personal", payload, headers) res = conn.getresponse() resdata = res.read().decode("utf-8") res_json = json.loads(resdata) user_info = UserInfo(res_json['companyName'], res_json['name'], res_json['userId'], res_json['username']) return user_info except Exception as e: print("Error:", e) finally: conn.close() spark = SparkSQL(udf_files=['dw_base/spark/udf/contacts/ctc_common.py', 'dw_base/spark/udf/spark_id_generate_udf.py']) def get_sign(secret): timestamp = str(round(time.time() * 1000)) secret_enc = secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return timestamp, sign def send_dingtalk_markdown(msg): headers = {"Content-Type": "application/json"} data = { "msgtype": "markdown", "markdown": {"title": '企业库告警', "text": msg, } } json_data = json.dumps(data) secret = 'SECffb7fe1b4c3aacc7be85d3b03de88fdbf93dfb48fe1c13ea7dba34a84847675e' timestamp, sign = get_sign(secret) url = f'https://oapi.dingtalk.com/robot/send?access_token=ffdb7df856220a925196e911107a4aa259acb2fd1160fee8b11d0c3c800974fc×tamp={timestamp}&sign={sign}' response = requests.post(url=url, data=json_data, headers=headers) response.raise_for_status() def ent_user_top(dt): sql = (f"select GET_JSON_OBJECT(ori_json, '$.params.userId') as user ,count(1) as cnt from ent_raw.interface_base " f"where dt='{dt}' and topic = 'ent_tendata_interface' and GET_JSON_OBJECT(ori_json, '$.type') = 'BRIEF_RESULT' group by GET_JSON_OBJECT(ori_json, '$.params.userId') order by count(1) desc limit 10" ) body = '' for row in spark.query(sql)[0].collect(): userid = row.user user_info = get_user_info(userid) body += f'{user_info.username},{user_info.name},{user_info.company_name},**{row.cnt}**次 \n\n' return body if __name__ == '__main__': CONFIG, _ = parse_args(sys.argv[1:]) dts = CONFIG.get('dt').split(',') for dt in dts: format_dt = f'{dt[:4]}-{dt[4:6]}-{dt[6:]}' ent_user_top_cnt = ent_user_top(dt) msg = f'''### 企业主页接口调用统计top10 > **统计日期 : {format_dt}** {ent_user_top_cnt} ''' print(msg) send_dingtalk_markdown(msg)