import base64 import hashlib import hmac import sys import re import os import urllib import time import requests abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.utils.config_utils import parse_args from dw_base.spark.spark_sql import SparkSQL import http.client import json from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives import serialization from base64 import b64encode # 公钥 public_key_pem = b""" -----BEGIN PUBLIC KEY----- MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDSaL/mqfq/30d5w6/05EL4073z ZgsomKTDI9wKUyz+ETkGwWzaNQm8BAXk9nJMCPz25fCTPd2BkifrS2KFKK2+e4hU pQxs+FQGaSeR8YEBWsCwh8bWaFWgxKuWpPPdfP6Vcnid/pTAsjbnw0KIHT7x83WZ qQTu3GUdyXkfyB41CQIDAQAB -----END PUBLIC KEY----- """ class UserInfo: """公司名称""" company_name: str """真实名称""" name: str """用户id""" user_id: int """用户名""" username: str def __init__(self, company_name: str, name: str, user_id: int, username: str) -> None: self.company_name = company_name self.name = name self.user_id = user_id self.username = username def __str__(self) -> str: return (f"UserInfo:\n" f" Company Name: {self.company_name}\n" f" Name: {self.name}\n" f" User ID: {self.user_id}\n" f" Username: {self.username}") def encrypt_user_id(user_id): public_key = serialization.load_pem_public_key(public_key_pem) encrypted = public_key.encrypt( user_id.encode(), padding.PKCS1v15() ) return b64encode(encrypted).decode() def get_user_info(user_id): encrypted_user_id = encrypt_user_id(user_id) conn = http.client.HTTPConnection("192.168.11.6", 18080) payload = json.dumps({ "encryptUserId": encrypted_user_id }) headers = { 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)', 'Content-Type': 'application/json' } try: conn.request("POST", "/account/personal", payload, headers) res = conn.getresponse() resdata = res.read().decode("utf-8") res_json = json.loads(resdata) user_info = UserInfo(res_json['companyName'], res_json['name'], res_json['userId'], res_json['username']) return user_info except Exception as e: print("Error:", e) finally: conn.close() spark = SparkSQL(udf_files=['dw_base/spark/udf/contacts/ctc_common.py', 'dw_base/spark/udf/spark_id_generate_udf.py']) def get_sign(secret): timestamp = str(round(time.time() * 1000)) secret_enc = secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return timestamp, sign def send_dingtalk_markdown(msg): headers = {"Content-Type": "application/json"} data = { "msgtype": "markdown", "markdown": {"title": '企业库告警', "text": msg, } } json_data = json.dumps(data) secret = 'SECffb7fe1b4c3aacc7be85d3b03de88fdbf93dfb48fe1c13ea7dba34a84847675e' timestamp, sign = get_sign(secret) url = f'https://oapi.dingtalk.com/robot/send?access_token=ffdb7df856220a925196e911107a4aa259acb2fd1160fee8b11d0c3c800974fc×tamp={timestamp}&sign={sign}' response = requests.post(url=url, data=json_data, headers=headers) response.raise_for_status() def send_dingtalk_notification(msg): headers = {"Content-Type": "application/json"} data = { "msgtype": "text", "text": {"content": msg} } json_data = json.dumps(data) url = 'https://oapi.dingtalk.com/robot/send?access_token=5183dfe1ecbe06261bcac7b45c1a6b5ae101fec67877d74120a6a95c88d1f917' # url = 'https://oapi.dingtalk.com/robot/send?access_token=c4086d8ba377fdade2dff869e71063733095bc718d3bafdfbe8be0966aa050d6' # url = 'https://oapi.dingtalk.com/robot/send?access_token=bee997dbf61e839a17de087830ffef6e864c3109fef62a956703bdfe043b0e10' response = requests.post(url=url, data=json_data, headers=headers) response.raise_for_status() # shh非核心业务调用数 def get_shh_non_core_interface_cnt(dt): sql = f''' SELECT sum(cnt) cnt FROM (SELECT count(1) cnt FROM ent_raw.interface_base WHERE topic = "ent_monitor_interface" AND dt = "{dt}" AND GET_JSON_OBJECT(ori_json, "$.type") != "EXPORT" AND GET_JSON_OBJECT(ori_json, "$.source")= 'CONTACT' UNION ALL SELECT count(1) cnt FROM ent_raw.interface_base WHERE topic = "ent_shh_bizr_interface" AND dt = "{dt}" AND GET_JSON_OBJECT(ori_json, "$.type") IN("ROOT", "COMPANY_COUNT") AND GET_JSON_OBJECT(ori_json, "$.source")= 'BIZR' UNION ALL SELECT count(1) cnt FROM ent_raw.interface_base WHERE topic = "ent_shh_mecs_interface" AND dt = "{dt}" AND GET_JSON_OBJECT(ori_json, "$.type") IN("CORP", "SITE") AND GET_JSON_OBJECT(ori_json, "$.source")= 'MECS' UNION ALL SELECT count(1) cnt FROM ent_raw.interface_base WHERE topic = "ent_shh_interface" AND dt = "{dt}" AND GET_JSON_OBJECT(ori_json, "$.type")= "BIZR" AND GET_JSON_OBJECT(ori_json, "$.source")= "BIZR" )t ''' return spark.query(sql)[0].collect()[0]['cnt'] def get_shh_company_interface_cnt(dt): sql = f'select count(1) cnt from ent_ods.ent_shh_api_company_logs where dt = "{dt}" and source != "SCRIPT"' return spark.query(sql)[0].collect()[0]['cnt'] def get_shh_company_interface_script_cnt(dt): sql = f'select count(1) cnt from ent_ods.ent_shh_api_company_logs where dt = "{dt}" and source = "SCRIPT"' return spark.query(sql)[0].collect()[0]['cnt'] def get_shh_contact_interface_cnt(dt): sql = f'select count(1) cnt from ent_raw.interface_base where topic = "ctc_shh_interface" and dt = "{dt}" and GET_JSON_OBJECT(ori_json, "$.source") != "SCRIPT"' return spark.query(sql)[0].collect()[0]['cnt'] def get_shh_contact_interface_script_cnt(dt): sql = f'select count(1) cnt from ent_raw.interface_base where topic = "ctc_shh_interface" and dt = "{dt}" and GET_JSON_OBJECT(ori_json, "$.source") = "SCRIPT"' return spark.query(sql)[0].collect()[0]['cnt'] def get_snv_contact_interface_cnt(dt): sql = f'select count(1) cnt from ent_raw.interface_base where topic = "ctc_snovio_interface" and dt = "{dt}" and GET_JSON_OBJECT(ori_json, "$.source") != "MANUAL_CONSUME" ' return spark.query(sql)[0].collect()[0]['cnt'] def get_snv_contact_interface_script_cnt(dt): sql = f'select count(1) cnt from ent_raw.interface_base where topic = "ctc_snovio_interface" and dt = "{dt}" and GET_JSON_OBJECT(ori_json, "$.source") = "MANUAL_CONSUME" ' return spark.query(sql)[0].collect()[0]['cnt'] def ent_user_top(dt): sql = (f"select GET_JSON_OBJECT(ori_json, '$.params.userId') as user ,count(1) as cnt from ent_raw.interface_base " f"where dt='{dt}' and topic = 'ent_tendata_interface' and GET_JSON_OBJECT(ori_json, '$.type') = 'BRIEF_RESULT' group by GET_JSON_OBJECT(ori_json, '$.params.userId') order by count(1) desc limit 10" ) body = '' for row in spark.query(sql)[0].collect(): userid = row.user user_info = get_user_info(userid) body += f'{user_info.username},{user_info.name},{user_info.company_name},**{row.cnt}**次 \n\n' return body def get_manual_request_cnt(dt): sql = f'''SELECT count(DISTINCT GET_JSON_OBJECT(ori_json, '$.params.traceId')) manual_request_cnt, count(distinct GET_JSON_OBJECT(ori_json, '$.params.userId')) as user_cnt FROM ent_raw.interface_base WHERE topic = 'ent_tendata_interface' AND dt = '{dt}' AND get_json_object(ori_json, '$.source') = 'BING' AND get_json_object(ori_json, '$.type') = 'MANUAL_REFRESH' AND get_json_object(ori_json, '$.result.canRefresh') = 'true' ''' return spark.query(sql)[0].collect()[0] def get_ggl_res(dt): sql = f'''WITH MANUAL AS (SELECT DISTINCT GET_JSON_OBJECT(ori_json, '$.params.traceId') trace_id FROM ent_raw.interface_base WHERE topic = 'ent_tendata_interface' AND dt = '{dt}' AND get_json_object(ori_json, '$.source') = 'BING' AND get_json_object(ori_json, '$.type') = 'MANUAL_REFRESH' AND get_json_object(ori_json, '$.result.canRefresh') = 'true'), auto AS (SELECT DISTINCT GET_JSON_OBJECT(ori_json, '$.params.traceId') trace_id FROM ent_raw.interface_base WHERE topic = 'ent_tendata_interface' AND dt = '{dt}' AND get_json_object(ori_json, '$.source') = 'BING' AND get_json_object(ori_json, '$.type') = 'AUTO_REFRESH' AND get_json_object(ori_json, '$.result.canRefresh') = 'true'), ods AS (SELECT GET_JSON_OBJECT(ori_json, '$.params.traceId') trace_id, GET_JSON_OBJECT(ori_json, '$.result.status_code') res_code FROM ent_raw.interface_base WHERE topic = 'ctc_google_interface' AND dt = '{dt}'), manual_res AS (SELECT 'manual', sum(if(ods.res_code = '200',1,0)) as cnt FROM ods JOIN MANUAL ON ods.trace_id = manual.trace_id), auto_res AS (SELECT 'auto', sum(if(ods.res_code = '200',1,0)) as cnt FROM ods JOIN auto ON ods.trace_id = auto.trace_id), ctc_cnt AS (SELECT 'ctc_cnt', count(1) as cnt FROM ctc_mid.ctc_main_pre WHERE dt = '{dt}' AND SOURCE LIKE '%google%' ) SELECT * FROM manual_res UNION ALL SELECT * FROM auto_res UNION ALL SELECT * FROM ctc_cnt ''' row = spark.query(sql)[0].collect() return row def get_manual_base(dt): sql = f''' select count(1) as cnt, count(distinct GET_JSON_OBJECT(ori_json, '$.params.userId')) as user_cnt, nvl(sum(if(get_json_object(ori_json, '$.result.data.website') is not null, 1, 0)),0) as web_cnt from ent_raw.interface_base where topic = 'ent_tendata_interface' and dt = '{dt}' and get_json_object(ori_json, '$.source') = 'BING' and get_json_object(ori_json, '$.type') = 'MANUAL' ''' row = spark.query(sql)[0].collect()[0] return row def get_auto_base(dt): sql = f''' select count(1) as cnt, count(distinct GET_JSON_OBJECT(ori_json, '$.params.userId')) as user_cnt, sum(if(get_json_object(ori_json, '$.result.data.website') is not null, 1, 0)) as web_cnt from ent_raw.interface_base where topic = 'ent_tendata_interface' and dt = '{dt}' and get_json_object(ori_json, '$.source') = 'BING' and get_json_object(ori_json, '$.type') = 'AUTO' ''' row = spark.query(sql)[0].collect()[0] return row def get_manual_cnt(dt): sql = f''' with webs as (select distinct get_json_object(ori_json, '$.result.data.website') as website from ent_raw.interface_base where topic = 'ent_tendata_interface' and dt = '{dt}' and get_json_object(ori_json, '$.source') = 'BING' and get_json_object(ori_json, '$.type') = 'MANUAL' and get_json_object(ori_json, '$.result.data.website') is not null), tids as (select website, generate_tid(clean_website(website), 'not_null', null) as tid from webs), pre as (select i.id, i.tid, i.source from ctc_mid.ctc_main_pre i join tids t on i.tid = t.tid where i.dt = '{dt}'), shh as (select 'shh' as source, count(distinct tid) as res_cnt, count(id) as ctc_cnt from pre where source like '%shh_%'), snovio as (select 'snovio' as source, count(distinct tid) as res_cnt, count(id) as ctc_cnt from pre where source like '%snovio%'), all_t as (select 'all' as source, count(distinct tid) as res_cnt, count(id) as ctc_cnt from pre where source like '%snovio%' or source like '%shh_%') select * from shh union all select * from snovio union all select * from all_t ''' res = spark.query(sql)[0].collect() return res def get_auto_cnt(dt): sql = f''' with webs as (select distinct get_json_object(ori_json, '$.result.data.website') as website from ent_raw.interface_base where topic = 'ent_tendata_interface' and dt = '{dt}' and get_json_object(ori_json, '$.source') = 'BING' and get_json_object(ori_json, '$.type') = 'AUTO' and get_json_object(ori_json, '$.result.data.website') is not null), tids as (select website, generate_tid(clean_website(website), 'not_null', null) as tid from webs), pre as (select i.id, i.tid, i.source from ctc_mid.ctc_main_pre i join tids t on i.tid = t.tid where i.dt = '{dt}'), shh as (select 'shh' as source, count(distinct tid) as res_cnt, count(id) as ctc_cnt from pre where source like '%shh_%'), snovio as (select 'snovio' as source, count(distinct tid) as res_cnt, count(id) as ctc_cnt from pre where source like '%snovio%'), all_t as (select 'all' as source, count(distinct tid) as res_cnt, count(id) as ctc_cnt from pre where source like '%snovio%' or source like '%shh_%') select * from shh union all select * from snovio union all select * from all_t ''' res = spark.query(sql)[0].collect() return res if __name__ == '__main__': CONFIG, _ = parse_args(sys.argv[1:]) dts = CONFIG.get('dt').split(',') for dt in dts: format_dt = f'{dt[:4]}-{dt[4:6]}-{dt[6:]}' shh_company_interface_cnt = get_shh_company_interface_cnt(dt) shh_company_interface_script_cnt = get_shh_company_interface_script_cnt(dt) shh_contact_interface_cnt = get_shh_contact_interface_cnt(dt) shh_contact_interface_script_cnt = get_shh_contact_interface_script_cnt(dt) snv_contact_interface_cnt = get_snv_contact_interface_cnt(dt) snv_contact_interface_script_cnt = get_snv_contact_interface_script_cnt(dt) shh_non_core_interface_cnt = get_shh_non_core_interface_cnt(dt) msg = f'''【接口调用量统计】------------------------------------------ 统计日期: {format_dt} 1、单接口调用公司信息次数: {shh_company_interface_cnt + shh_company_interface_script_cnt} ①自然调用次数: {shh_company_interface_cnt} ②脚本调用次数: {shh_company_interface_script_cnt} 2、单接口调用联系人次数: {shh_contact_interface_cnt + shh_contact_interface_script_cnt} ①自然调用次数: {shh_contact_interface_cnt} ②脚本调用次数: {shh_contact_interface_script_cnt} 3、snovio调用联系人次数: {snv_contact_interface_cnt + snv_contact_interface_script_cnt} ①自然调用次数: {snv_contact_interface_cnt} ②脚本调用次数: {snv_contact_interface_script_cnt} 4、单接口非核心业务调用次数:{shh_non_core_interface_cnt} ---------------------------------------------------------------''' print(msg) send_dingtalk_notification(msg) ent_user_top_cnt = ent_user_top(dt) msg = f'''### 企业主页接口调用统计top10 > **统计日期 : {format_dt}** {ent_user_top_cnt} ''' print(msg) send_dingtalk_markdown(msg) manual_base = get_manual_base(dt) auto_base = get_auto_base(dt) manual_cnt = get_manual_cnt(dt) auto_cnt = get_auto_cnt(dt) manual_request_res = get_manual_request_cnt(dt) manual_request_cnt = manual_request_res['manual_request_cnt'] manual_user_cnt = manual_request_res['user_cnt'] # 处理分母可能为0的情况 # manual_user_cnt = manual_base['user_cnt'] manual_cnt_total = manual_base['cnt'] manual_web_cnt = manual_base['web_cnt'] auto_user_cnt = auto_base['user_cnt'] auto_cnt_total = auto_base['cnt'] auto_web_cnt = auto_base['web_cnt'] manual_avg_requests = manual_request_cnt / manual_user_cnt if manual_user_cnt != 0 else 0 manual_web_percentage = 100 * manual_web_cnt / manual_cnt_total if manual_cnt_total != 0 else 0 manual_single_interface_percentage = 100 * manual_cnt[0][ 'res_cnt'] / manual_web_cnt if manual_web_cnt != 0 else 0 manual_snovio_percentage = 100 * manual_cnt[1]['res_cnt'] / manual_web_cnt if manual_web_cnt != 0 else 0 manual_solution_percentage = 100 * manual_cnt[2]['res_cnt'] / manual_cnt_total if manual_cnt_total != 0 else 0 auto_avg_requests = auto_cnt_total / auto_user_cnt if auto_user_cnt != 0 else 0 auto_web_percentage = 100 * auto_web_cnt / auto_cnt_total if auto_cnt_total != 0 else 0 auto_single_interface_percentage = 100 * auto_cnt[0]['res_cnt'] / auto_web_cnt if auto_web_cnt != 0 else 0 auto_snovio_percentage = 100 * auto_cnt[1]['res_cnt'] / (auto_web_cnt - auto_cnt[0]['res_cnt']) if ( auto_web_cnt - auto_cnt[ 0][ 'res_cnt']) != 0 else 0 auto_solution_percentage = 100 * ( auto_cnt[0]['res_cnt'] + auto_cnt[1]['res_cnt']) / auto_cnt_total if auto_cnt_total != 0 else 0 ggl_res = get_ggl_res(dt) manual_ggl_cnt = ggl_res[0]['cnt'] auto_ggl_cnt = ggl_res[1]['cnt'] ctc_ggl_cnt = ggl_res[2]['cnt'] msg = f'''【手动/自动更新效果统计】------------------------------------------ 统计日期: {format_dt} 1、手动更新 ①手动更新请求总人数:{manual_user_cnt}人 ②手动更新请求总次数:{manual_request_cnt}次 ③人均请求次数:{manual_avg_requests:.2f}次 ④手动请求bing网址总次数:{manual_cnt_total}次 ⑤bing获取到网址的次数及占比:{manual_web_cnt}次,{manual_web_percentage:.2f}% ⑥单接口获取到联系人次数及占比:{manual_cnt[0]['res_cnt']}次,{manual_single_interface_percentage:.2f}% ⑦单接口获取到联系人去重总数:{manual_cnt[0]['ctc_cnt']} ⑧snovio接口获取到联系人次数及占比:{manual_cnt[1]['res_cnt']}次,{manual_snovio_percentage:.2f}% ⑨snovio接口获取到联系人去重总数:{manual_cnt[1]['ctc_cnt']} ⑩当日手动更新获得联系方式的总次数:{manual_cnt[2]['res_cnt']} ⑪当日手动更新解决问题的百分比:{manual_solution_percentage:.2f}% 2、自动更新 ①自动更新对应的总人数:{auto_user_cnt}人 ②自动更新请求总次数:{auto_cnt_total}次 ③人均对应自动更新次数:{auto_avg_requests:.2f}次 ④bing获取到网址的次数及占比:{auto_web_cnt}次,{auto_web_percentage:.2f}% ⑤单接口获取到联系人次数及占比:{auto_cnt[0]['res_cnt']}次,{auto_single_interface_percentage:.2f}% ⑥单接口获取到联系人去重总数:{auto_cnt[0]['ctc_cnt']} ⑦snovio接口获取到联系人次数及占比:{auto_cnt[1]['res_cnt']}次,{auto_snovio_percentage:.2f}% ⑧snovio接口获取到联系人去重总数:{auto_cnt[1]['ctc_cnt']} ⑨当日自动更新获得联系方式的总次数:{auto_cnt[0]['res_cnt'] + auto_cnt[1]['res_cnt']} ⑩当日自动更新解决问题的百分比:{auto_solution_percentage:.2f}% 3、google补充 ①手动触发google爬虫获取到联系人次数:{manual_ggl_cnt}次 ②自动触发google爬虫获取到联系人次数:{auto_ggl_cnt}次 ③google爬虫获取到联系人去重数:{ctc_ggl_cnt}人 --------------------------------------------------------------- ''' print(msg) send_dingtalk_notification(msg)