import hashlib import json from collections import Counter from typing import List from pyspark.sql.functions import udf from pyspark.sql.types import StringType, ArrayType, StructType, StructField, IntegerType, FloatType, MapType,BooleanType,TimestampType @udf(returnType=ArrayType(StructType([ StructField("idx",IntegerType(),False), StructField("obj",StringType(),False) ]))) def parse_jsonarr_to_arr(s:str)->[(int,str)]: res_arr = [(i+1,json.dumps(obj)) for i,obj in enumerate(json.loads(s))] return res_arr @udf(returnType=ArrayType(StructType([ StructField("idx",IntegerType(),False), StructField("obj",StringType(),False) ]))) def parse_jsonarr_to_strarr(s:str)->[(int,str)]: res_arr = [(i+1,obj) for i,obj in enumerate(json.loads(s))] return res_arr @udf(returnType=StructType([ StructField("k",ArrayType(StringType()),False), StructField("kv",StringType()) ])) def parse_arr_and_count(arr,tag:str,return_count:int=-1): ele_cnt_dict = Counter(arr) json_list = sorted([{"code": key, "num": value} for key, value in ele_cnt_dict.items()],key=lambda x:x["num"], reverse=True) if return_count < 0: return [obj['code'] for obj in json_list],",".join(['{'+f'{i["code"]},{tag}:{i["num"]}'+'}' for i in json_list]) else: list_len = len(json_list) index = list_len if return_count < list_len: index = return_count return [obj['code'] for obj in json_list][:index],",".join(['{'+f'{i["code"]},{tag}:{i["num"]}'+'}' for i in json_list[:index]]) @udf(returnType=StructType([ StructField("sum",FloatType(),False), StructField("list",StringType()) ])) def parse_arr_and_sum(struct_arr,tag:str): sum_dict={} for s in struct_arr: key = s[0] value:float = s[1] if key not in sum_dict: sum_dict[key]=0.0 if value is not None: sum_dict[key] += value json_list = sorted([{"code": key, "num": value} for key, value in sum_dict.items()],key=lambda x:x["num"], reverse=True) total = 0.0 for obj in json_list: total += obj["num"] return round(total,2),",".join(['{'+f'{i["code"]},{tag}:{round(i["num"],2)}'+'}' for i in json_list]) @udf(returnType=StringType()) def split_str_to_jsonstr(str_list: List): res = [] for kv_str in str_list: arr = kv_str.split(':') if len(arr) == 2: res.append({arr[0]: arr[1]}) return json.dumps(res,ensure_ascii=False) @udf(returnType=MapType(StringType(), ArrayType(StringType()))) def split_str_to_maparr(str_list: List): res = {} for kv_str in str_list: arr = kv_str.split(':') if len(arr) == 2: if arr[0] not in res: res[arr[0]]=[arr[1]] else: res[arr[0]].append(arr[1]) return res @udf(returnType=MapType(StringType(), StringType())) def distinct_arrmap(map_list: List): res = {} for kv_map in map_list: if 'time' in res: if int(kv_map["time"]) > int(res["time"]): res = kv_map else: res = kv_map if len(res)==0: return {} else: return {"snovio":res["value"]} @udf(returnType=MapType(StringType(), StringType())) def distinct_arrlist(arr_list: List): # [inv,similar,field,last_time] res = {} for arr in arr_list: if len(res)==0: res["inv"] = [arr[0],arr[2],arr[3]] if arr[2] == 'email': res["similar"] = [arr[1],arr[2],arr[3]] continue if arr[2] == 'email': if arr[3] is None: continue if int(arr[3]) > int(res["inv"][2]): if arr[0] != '': res["inv"] = [arr[0],arr[2],arr[3]] else: if res["inv"][0] == '': res["inv"] = [arr[0], arr[2], arr[3]] if int(arr[3]) > int(res["similar"][2]): if arr[1] != '': res["similar"] = [arr[1], arr[2], arr[3]] else: if res["similar"][0] == '': res["similar"] = [arr[1], arr[2], arr[3]] else: if res["inv"][1]=='ep1': if arr[3] is None: continue if int(arr[3])>int(res["inv"][2]): res["inv"] = [arr[0],arr[2],arr[3]] else: if arr[2]=='ep1': res["inv"] = [arr[0],arr[2],arr[3]] else: if arr[3] is None: continue if int(arr[3]) > int(res["inv"][2]): res["inv"] = [arr[0], arr[2], arr[3]] # if res["similar"][1] == 'ep1': # if arr[3] is None: # continue # if int(arr[3]) > int(res["similar"][2]): # res["similar"] = [arr[1], arr[2], arr[3]] # else: # if arr[2] == 'ep1': # res["similar"] = [arr[1], arr[2], arr[3]] # else: # if arr[3] is None: # continue # if int(arr[3]) > int(res["similar"][2]): # res["similar"] = [arr[1], arr[2], arr[3]] wrap_res = {} if "similar" in res and res["similar"][0] is not None and res["similar"][0]!='': wrap_res["mail_sou_linkedin_similar"] = res["similar"][0] if "inv" in res and res["inv"][0] is not None and res["inv"][0]!='': wrap_res["mail_sou_linkedin_inv"] = res["inv"][0] return wrap_res @udf(returnType=StructType([ StructField("status",BooleanType(),True), StructField("hidden",BooleanType(),True), StructField("start_date",TimestampType(),True), StructField("end_date",TimestampType(),True), StructField("insert_time",TimestampType(),True) ])) def merge_status_info(status_info_list: list): res = { "status":None, "hidden":None, "start_date":None, "end_date":None, "insert_time":None } if status_info_list is not None and len(status_info_list)>1: for status_info in status_info_list: if res["insert_time"] is None: res = status_info else: if res["insert_time"] < status_info["insert_time"]: res = status_info return res @udf(returnType=MapType(StringType(), StringType())) def merge_email(map_list: List): res = {} if map_list is not None: for kv_map in map_list: for k in kv_map.keys(): if k not in res: res[k] = kv_map[k] return res @udf(returnType=MapType(StringType(), ArrayType(StringType()))) def merge_source_p_id(map_obj_list: List[dict]): tmp_res = {} if map_obj_list is not None: for map_obj in map_obj_list: if map_obj is not None: for k, v in map_obj.items(): if k not in tmp_res: tmp_res[k] = set(v) else: tmp_res[k].update(v) res = {} for key,value in tmp_res.items(): res[key] = list(value) return res @udf(returnType=ArrayType(StringType())) def merge_source(incr_source: List,old_source: List): res = set() if incr_source is not None: for i in incr_source: if i is not None and i != "": res.add(i) if old_source is not None: for i in old_source: if i is not None and i != "": res.add(i) return list(res) @udf(returnType=ArrayType(StringType())) def merge_list(arr_list: List): res = set() for e in arr_list: if e is not None: for i in e: if i is not None and i != "": res.add(i) return list(res) @udf(returnType=MapType(StringType(),ArrayType(StringType()))) def merge_position_map(left_dict_list :list): res = {} if left_dict_list is not None: for kv_map in left_dict_list: if kv_map is not None: for k,v in kv_map.items(): if v is not None: if k not in res: res[k] = set(v) else: res[k].update(set(v)) for k,v in res.items(): res[k] = list(v) return res @udf(returnType=ArrayType(StringType())) def merge_location(arr_list: List): # [location,time] res = [] for arr in arr_list: if arr is not None and len(arr) > 1: if len(res) == 0: res.extend(arr) else: if arr[1]>res[1]: res = arr return res @udf(returnType=ArrayType(StructType([ StructField("channel",StringType(),False), StructField("channel_ids",ArrayType(StringType()),True), ]))) def split_channel_to_arr(channels:list,channel_ids: dict): rest = [] if channels is None: return rest for channel in channels: if channel_ids is not None and channel in channel_ids: rest.append({"channel":channel,"channel_ids":channel_ids[channel]}) else: rest.append({"channel": channel, "channel_ids": None}) return rest @udf(returnType=StructType([ StructField("original",MapType(StringType(), StringType()),False), StructField("zh",MapType(StringType(), StringType())) ])) def merge_parse_email(incr_map: dict,old_dict: dict): if old_dict is None: res = {} else: res = old_dict parse_res = {} if incr_map is not None: for k in incr_map.keys(): res[k] = incr_map[k] if "mail_sou_linkedin_inv" in res and res["mail_sou_linkedin_inv"]!='' and res["mail_sou_linkedin_inv"] is not None: parse_res["mail_sou"] = "可能退信" else: if "mail_sou_linkedin_similar" in res and res["mail_sou_linkedin_similar"] is not None: if res["mail_sou_linkedin_similar"]=='2': parse_res["mail_sou"] = "推测+验证" elif res["mail_sou_linkedin_similar"]=='-1': parse_res["mail_sou"] = "推测" elif res["mail_sou_linkedin_similar"]=='1': parse_res["mail_sou"] = "匹配" else: similar_f = float('0' + res["mail_sou_linkedin_similar"]) if similar_f>=0.8 and similar_f<0.9: parse_res["mail_sou"] = "匹配度低" elif similar_f>=0.9 and similar_f<1: parse_res["mail_sou"] = "可能匹配" if "snovio" in res: if res["snovio"]=='valid': parse_res["snovio"]="匹配" elif res["snovio"]=="unknown": parse_res["snovio"]="匹配度低" elif res["snovio"]=="not valid": parse_res["snovio"]="可能退信" elif res["snovio"]=="greylisted": parse_res["snovio"]="推测" return res,parse_res @udf(returnType=StringType()) def get_email_status(status_map: dict): if status_map is None or len(status_map) == 0: return None if 'snovio' in status_map: status_zh = status_map['snovio'] elif 'mail_sou' in status_map: status_zh = status_map['mail_sou'] else: status_zh = None if status_zh == '推测': return 'SPECULATION' elif status_zh == '推测+验证': return 'SPECULATION_VERIFICATION' elif status_zh == '匹配': return 'PERFECT_MATCH' elif status_zh == '可能匹配': return 'POSSIBLE_MATCH' elif status_zh == '匹配度低': return 'LOW_MATCH' elif status_zh == '可能退信': return 'POSSIBLE_REFUND' else: return None @udf(returnType=StringType()) def get_media_type(social_media:str): # 根据本表[social_media]进行判断: # 1,带关键词"linkedin"-->linkedin # 2,带关键词"twitter"-->twitter # 3,带关键词"facebook"-->facebook if social_media is None: return None if 'linkedin' in social_media: return 'linkedin' elif 'twitter' in social_media: return 'twitter' elif 'facebook' in social_media: return 'facebook' @udf(returnType=StringType()) def get_black_white_grey_status(action_status:str,action_status_code:str): def judge_by_action_status(action_status:str): if action_status=='CLICK' or action_status=='OPEN' or action_status=='SENT_SUCCESS': return "WHITE" elif action_status=='MISSING' or action_status=='INIT': return "GREY" elif action_status=='HARD_BOUNCE' or action_status=='SYSTEM_HARD' or action_status=='SYSTEM_SOFT' or action_status=='SYSTEM_BOUNCE' \ or action_status=='UNSUBSCRIBE' or action_status=='SOFT_BOUNCE' or action_status=='SPAM_COMPLAINT' or action_status=='SYSTEM_UNSUBSCRIBE': return "BLACK" else: return "GREY" if action_status_code is None: return judge_by_action_status(action_status) else: if action_status_code == '503' or action_status_code == '506' or action_status_code == '507' or action_status_code == '508' or action_status_code == '401' \ or action_status_code == '402' or action_status_code == '403' or action_status_code == '404' or action_status_code == '406' or action_status_code == '407' or action_status_code == '408' \ or action_status_code == '509' or action_status_code == '409': return "BLACK" elif action_status_code == '505' or action_status_code == '405': return "GREY" else: return judge_by_action_status(action_status) @udf(returnType=IntegerType()) def get_mail_status_priority(action_status:str,action_status_code:str): status_dict = { 'MISSING':0, 'SENT_SUCCESS':2, 'OPEN':4, 'CLICK':5, 'SOFT_BOUNCE':6, 'SYSTEM_SOFT':6, 'UNSUBSCRIBE':7, 'SYSTEM_UNSUBSCRIBE':7, 'SPAM_COMPLAINT':8, 'SYSTEM_BOUNCE':9, 'SYSTEM_HARD':9, 'HARD_BOUNCE':10 } code_dict = { '506':10, '406':10, '404':10, '503':9, '401':9, '403':9, '402':8, '507':7, '407':7, '508':7, '408':7, '509':6, '409':6, '505':1, '405':1 } if action_status_code is not None: if action_status_code in code_dict: return code_dict[action_status_code] else: if action_status in status_dict: return status_dict[action_status] else: return -1 else: if action_status in status_dict: return status_dict[action_status] else: return -1 @udf(returnType=StringType()) def get_md5(*cols:str) -> str: col_and_len_list = [] for col in cols: if col is not None: l = len(col) col_and_len_list.append(str(l)) col_and_len_list.append(col) key = ''.join(col_and_len_list) if key is None or len(key) == 0: return '' md5 = hashlib.md5() md5.update(key.encode("utf-8")) return md5.hexdigest() @udf(returnType=StringType()) def get_mail_usable(black_white_grey_status:str): if black_white_grey_status == 'WHITE': return 'Usable' elif black_white_grey_status == 'BLACK': return 'Disable' elif black_white_grey_status == 'GREY': return 'Uncertain' else: return None @udf(returnType=StructType( [ StructField("info", ArrayType(StructType( [ StructField("same",StringType(),False), StructField("name",StringType(),False), StructField("staff_count",IntegerType(),False) ] ),True)), StructField("num",StringType(),False) ]) ) def get_similar_comanynames(linkedin_related_companies:str): res_dict = {} similar_companies = [] total_num = 0 if linkedin_related_companies is None: res_dict["info"] = None res_dict["num"] = total_num return res_dict for company in json.loads(linkedin_related_companies, encoding="utf-8"): if company is None: continue if 'same' in company and 'name' in company and 'staffCount' in company: similar_companies.append({'same':company['same'],'name':company['name'],'staff_count':company['staffCount']}) total_num += company['staffCount'] res_dict["info"] = similar_companies res_dict["num"] = total_num return res_dict # 注册UDF并指定返回类型 # get_json_arr = udf(parse_jsonarr_to_arr) # get_json_strarr = udf(parse_jsonarr_to_strarr) if __name__ == '__main__': a= parse_jsonarr_to_arr('[{"aaa":"bbb"},{"aaa":"bbb"}]') for i in a: print(i)