import sys import re import os import requests abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.utils.config_utils import parse_args from dw_base.spark.spark_sql import SparkSQL import json spark = SparkSQL(udf_files=['dw_base/spark/udf/contacts/ctc_common.py', 'dw_base/spark/udf/spark_id_generate_udf.py'], extra_spark_config={'spark.sql.crossJoin.enabled': True}) def send_dingtalk_notification(msg): headers = {"Content-Type": "application/json"} data = { "msgtype": "text", "text": {"content": msg} } json_data = json.dumps(data) # 企业库数据产品线 url = 'https://oapi.dingtalk.com/robot/send?access_token=c4086d8ba377fdade2dff869e71063733095bc718d3bafdfbe8be0966aa050d6' # 企业库管理群 # url = 'https://oapi.dingtalk.com/robot/send?access_token=5183dfe1ecbe06261bcac7b45c1a6b5ae101fec67877d74120a6a95c88d1f917' # url = 'https://oapi.dingtalk.com/robot/send?access_token=c4086d8ba377fdade2dff869e71063733095bc718d3bafdfbe8be0966aa050d6' # 企业&联系人机器人测试群 # url = 'https://oapi.dingtalk.com/robot/send?access_token=bee997dbf61e839a17de087830ffef6e864c3109fef62a956703bdfe043b0e10' response = requests.post(url=url, data=json_data, headers=headers) response.raise_for_status() def get_base_cnt(dt, trigger_type): sql = f''' SELECT count(DISTINCT user_id) as user_cnt, count(distinct trace_id) as trace_cnt FROM (SELECT user_id, trace_id FROM ctc_ods.ctc_shh_interface_log WHERE dt = '{dt}' AND trigger_type = '{trigger_type}' UNION ALL SELECT user_id, trace_id FROM ctc_ods.ctc_snv_interface_log WHERE dt = '{dt}' AND trigger_type = '{trigger_type}' UNION ALL SELECT user_id, trace_id FROM ctc_ods.ctc_google_interface_log WHERE dt = '{dt}' AND trigger_type = '{trigger_type}') t ''' return spark.query(sql)[0].collect()[0] def get_web_cnt(dt, trigger_type): sql = f''' select count(1) as request_web_cnt, nvl(sum(if(get_json_object(ori_json, '$.result.data.website') is not null, 1, 0)), 0) as get_web_cnt from ent_raw.interface_base where topic = 'ent_tendata_interface' and dt = '{dt}' and get_json_object(ori_json, '$.source') = 'BING' and get_json_object(ori_json, '$.type') = '{trigger_type}' ''' return spark.query(sql)[0].collect()[0] def get_auto_user_cnt(dt): sql = f''' SELECT count(DISTINCT get_json_object(ori_json, '$.params.userId')) AS request_user_cnt FROM ent_raw.interface_base WHERE topic = 'ent_tendata_interface' AND dt = '{dt}' AND get_json_object(ori_json, '$.source') = 'BING' AND get_json_object(ori_json, '$.type') = 'AUTO' ''' return spark.query(sql)[0].collect()[0]['request_user_cnt'] def get_auto_source_cnt(dt): sql = f''' SELECT * from (select count(distinct trace_id) as shh_cnt FROM ctc_ods.ctc_shh_interface_log WHERE dt = '{dt}' AND trigger_type = 'AUTO') shh join (SELECT count(distinct trace_id) as snv_cnt FROM ctc_ods.ctc_snv_interface_log WHERE dt = '{dt}' AND trigger_type = 'AUTO') snv join (SELECT count(distinct trace_id) as ggl_cnt FROM ctc_ods.ctc_google_interface_log WHERE dt = '{dt}' AND trigger_type = 'AUTO') ggl ''' return spark.query(sql)[0].collect()[0] def get_res_cnt(dt, trigger_type): sql = f''' with init as (select ti, if(source like '%shh_%', 1, 0) as shh_flag, if(source like '%snovio%', 1, 0) as snv_flag, if(source like '%google%', 1, 0) as ggl_flag from ctc_mid.ctc_main_pre LATERAL VIEW explode(trace_id) exploded_table1 AS ti where dt = '{dt}' and array_contains(trigger_type, '{trigger_type}')), flag as (select ti , if(sum(shh_flag) > 0, 1, 0) as shh_get_flag , if(sum(snv_flag) > 0, 1, 0) as snv_get_flag , if(sum(ggl_flag) > 0, 1, 0) as ggl_get_flag from init group by ti) select nvl(sum(shh_get_flag),0) as shh_get_cnt , nvl(sum(snv_get_flag),0) as snv_get_cnt , nvl(sum(ggl_get_flag),0) as ggl_get_cnt , count(ti) as all_get_cnt from flag ''' return spark.query(sql)[0].collect()[0] def get_ctc_cnt(dt, trigger_type): sql = f''' select nvl(sum(if(source like '%shh_%', 1, 0)), 0) as shh_ctc_cnt, nvl(sum(if(source like '%snovio%', 1, 0)), 0) as snv_ctc_cnt, nvl(sum(if(source like '%google%', 1, 0)), 0) as ggl_ctc_cnt from ctc_mid.ctc_main_pre where dt = '{dt}' and array_contains(trigger_type, '{trigger_type}') ''' return spark.query(sql)[0].collect()[0] if __name__ == '__main__': CONFIG, _ = parse_args(sys.argv[1:]) dts = CONFIG.get('dt').split(',') for dt in dts: format_dt = f'{dt[:4]}-{dt[4:6]}-{dt[6:]}' manual_base_cnt = get_base_cnt(dt, 'MANUAL') manual_web_cnt = get_web_cnt(dt, 'MANUAL') manual_res_cnt = get_res_cnt(dt, 'MANUAL') manual_ctc_cnt = get_ctc_cnt(dt, 'MANUAL') manual_user_cnt = manual_base_cnt['user_cnt'] manual_trace_cnt = manual_base_cnt['trace_cnt'] manual_trace_avg = manual_trace_cnt / manual_user_cnt if manual_user_cnt > 0 else 0 manual_web_request_cnt = manual_web_cnt['request_web_cnt'] manual_web_get_cnt = manual_web_cnt['get_web_cnt'] manual_web_get_pct = 100 * manual_web_get_cnt / manual_web_request_cnt if manual_web_request_cnt > 0 else 0 manual_shh_get_cnt = manual_res_cnt['shh_get_cnt'] manual_shh_get_pct = 100 * manual_shh_get_cnt / manual_trace_cnt if manual_trace_cnt > 0 else 0 manual_snv_get_cnt = manual_res_cnt['snv_get_cnt'] manual_snv_get_pct = 100 * manual_snv_get_cnt / manual_trace_cnt if manual_trace_cnt > 0 else 0 manual_ggl_get_cnt = manual_res_cnt['ggl_get_cnt'] manual_ggl_get_pct = 100 * manual_ggl_get_cnt / manual_trace_cnt if manual_trace_cnt > 0 else 0 manual_all_get_cnt = manual_res_cnt['all_get_cnt'] manual_all_get_pct = 100 * manual_all_get_cnt / manual_trace_cnt if manual_trace_cnt > 0 else 0 manual_ctc_shh_cnt = manual_ctc_cnt['shh_ctc_cnt'] manual_ctc_snv_cnt = manual_ctc_cnt['snv_ctc_cnt'] manual_ctc_ggl_cnt = manual_ctc_cnt['ggl_ctc_cnt'] ############################################################ auto_base_cnt = get_base_cnt(dt, 'AUTO') auto_web_cnt = get_web_cnt(dt, 'AUTO') auto_res_cnt = get_res_cnt(dt, 'AUTO') auto_ctc_cnt = get_ctc_cnt(dt, 'AUTO') auto_user_cnt = auto_base_cnt['user_cnt'] auto_trace_cnt = auto_base_cnt['trace_cnt'] auto_trace_avg = auto_trace_cnt / auto_user_cnt if auto_user_cnt > 0 else 0 auto_web_request_cnt = auto_web_cnt['request_web_cnt'] auto_web_get_cnt = auto_web_cnt['get_web_cnt'] auto_web_get_pct = 100 * auto_web_get_cnt / auto_web_request_cnt if auto_web_request_cnt > 0 else 0 auto_source_cnt = get_auto_source_cnt(dt) auto_request_shh_cnt = auto_source_cnt['shh_cnt'] auto_request_snv_cnt = auto_source_cnt['snv_cnt'] auto_request_ggl_cnt = auto_source_cnt['ggl_cnt'] auto_shh_get_cnt = auto_res_cnt['shh_get_cnt'] auto_shh_get_pct = 100 * auto_shh_get_cnt / auto_request_shh_cnt if auto_request_shh_cnt > 0 else 0 auto_snv_get_cnt = auto_res_cnt['snv_get_cnt'] auto_snv_get_pct = 100 * auto_snv_get_cnt / auto_request_snv_cnt if auto_request_snv_cnt > 0 else 0 auto_ggl_get_cnt = auto_res_cnt['ggl_get_cnt'] auto_ggl_get_pct = 100 * auto_ggl_get_cnt / auto_request_ggl_cnt if auto_request_ggl_cnt > 0 else 0 auto_all_get_cnt = auto_res_cnt['all_get_cnt'] auto_all_get_pct = 100 * auto_all_get_cnt / auto_trace_cnt if auto_trace_cnt > 0 else 0 auto_all_get_pct = 100 * auto_all_get_cnt / auto_trace_cnt if auto_trace_cnt > 0 else 0 auto_ctc_shh_cnt = auto_ctc_cnt['shh_ctc_cnt'] auto_ctc_snv_cnt = auto_ctc_cnt['snv_ctc_cnt'] auto_ctc_ggl_cnt = auto_ctc_cnt['ggl_ctc_cnt'] auto_user_cnt = get_auto_user_cnt(dt) msg = f'''【手动/自动更新效果统计】------------------------------------------ 统计日期: {format_dt} 1、手动更新 ①手动更新请求总人数:{manual_user_cnt}人 ②手动更新请求总次数:{manual_trace_cnt}次 ③人均请求次数:{manual_trace_avg:.2f}次 ④手动请求bing网址总次数:{manual_web_request_cnt}次 ⑤bing获取到网址的次数及占比:{manual_web_get_cnt}次,{manual_web_get_pct:.2f}% ⑥单接口获取到联系人次数及占比:{manual_shh_get_cnt}次,{manual_shh_get_pct:.2f}% ⑦单接口获取到联系人去重总数:{manual_ctc_shh_cnt} ⑧snovio接口获取到联系人次数及占比:{manual_snv_get_cnt}次,{manual_snv_get_pct:.2f}% ⑨snovio接口获取到联系人去重总数:{manual_ctc_snv_cnt} ⑩google爬虫获取到联系人次数及占比:{manual_ggl_get_cnt}次,{manual_ggl_get_pct:.2f}% ⑪google爬虫获取到联系人去重总数:{manual_ctc_ggl_cnt} ⑫当日手动更新获得联系方式的总次数:{manual_all_get_cnt} ⑬当日手动更新解决联系人问题的百分比:{manual_all_get_pct:.2f}% 2、自动更新 ① 自动更新请求总人数:{auto_user_cnt}人 ② 自动更新请求总次数:{auto_web_request_cnt}次 ③ 人均请求次数:{auto_trace_avg:.2f}次 ④ 自动请求bing网址总次数:{auto_web_request_cnt}次 ⑤ bing获取到网址的次数及占比:{auto_web_get_cnt}次,{auto_web_get_pct:.2f}% ⑥ 自动请求单接口的总次数:{auto_request_shh_cnt} 次 ⑦ 单接口获取到联系人次数及占比:{auto_shh_get_cnt}次,{auto_shh_get_pct:.2f}% ⑧ 单接口获取到联系人去重总数:{auto_ctc_shh_cnt} ⑨ 自动请求snovio接口的总次数:{auto_request_snv_cnt} 次 ⑩ snovio接口获取到联系人次数及占比:{auto_snv_get_cnt}次,{auto_snv_get_pct:.2f}% ⑪ snovio接口获取到联系人去重总数:{auto_ctc_snv_cnt} ⑫ 自动请求google爬虫的总次数:{auto_request_ggl_cnt} 次 ⑬ google爬虫获取到联系人次数及占比:{auto_ggl_get_cnt}次,{auto_ggl_get_pct:.2f}% ⑭ google爬虫获取到联系人去重总数:{auto_ctc_ggl_cnt} ⑮ 当日自动更新请求联系方式的总次数:{auto_trace_cnt} ⑯ 当日自动更新获得联系方式的总次数:{auto_all_get_cnt} ⑰ 当日自动更新解决联系人问题的百分比:{auto_all_get_pct:.2f}% --------------------------------------------------------------- ''' print(msg) send_dingtalk_notification(msg)