| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 |
- import hashlib
- import json
- from collections import Counter
- from typing import List
- from pyspark.sql.functions import udf
- from pyspark.sql.types import StringType, ArrayType, StructType, StructField, IntegerType, FloatType, MapType,BooleanType,TimestampType
- @udf(returnType=ArrayType(StructType([
- StructField("idx",IntegerType(),False),
- StructField("obj",StringType(),False)
- ])))
- def parse_jsonarr_to_arr(s:str)->[(int,str)]:
- res_arr = [(i+1,json.dumps(obj)) for i,obj in enumerate(json.loads(s))]
- return res_arr
- @udf(returnType=ArrayType(StructType([
- StructField("idx",IntegerType(),False),
- StructField("obj",StringType(),False)
- ])))
- def parse_jsonarr_to_strarr(s:str)->[(int,str)]:
- res_arr = [(i+1,obj) for i,obj in enumerate(json.loads(s))]
- return res_arr
- @udf(returnType=StructType([
- StructField("k",ArrayType(StringType()),False),
- StructField("kv",StringType())
- ]))
- def parse_arr_and_count(arr,tag:str,return_count:int=-1):
- ele_cnt_dict = Counter(arr)
- json_list = sorted([{"code": key, "num": value} for key, value in ele_cnt_dict.items()],key=lambda x:x["num"], reverse=True)
- if return_count < 0:
- return [obj['code'] for obj in json_list],",".join(['{'+f'{i["code"]},{tag}:{i["num"]}'+'}' for i in json_list])
- else:
- list_len = len(json_list)
- index = list_len
- if return_count < list_len:
- index = return_count
- return [obj['code'] for obj in json_list][:index],",".join(['{'+f'{i["code"]},{tag}:{i["num"]}'+'}' for i in json_list[:index]])
- @udf(returnType=StructType([
- StructField("sum",FloatType(),False),
- StructField("list",StringType())
- ]))
- def parse_arr_and_sum(struct_arr,tag:str):
- sum_dict={}
- for s in struct_arr:
- key = s[0]
- value:float = s[1]
- if key not in sum_dict:
- sum_dict[key]=0.0
- if value is not None:
- sum_dict[key] += value
- json_list = sorted([{"code": key, "num": value} for key, value in sum_dict.items()],key=lambda x:x["num"], reverse=True)
- total = 0.0
- for obj in json_list:
- total += obj["num"]
- return round(total,2),",".join(['{'+f'{i["code"]},{tag}:{round(i["num"],2)}'+'}' for i in json_list])
- @udf(returnType=StringType())
- def split_str_to_jsonstr(str_list: List):
- res = []
- for kv_str in str_list:
- arr = kv_str.split(':')
- if len(arr) == 2:
- res.append({arr[0]: arr[1]})
- return json.dumps(res,ensure_ascii=False)
- @udf(returnType=MapType(StringType(), ArrayType(StringType())))
- def split_str_to_maparr(str_list: List):
- res = {}
- for kv_str in str_list:
- arr = kv_str.split(':')
- if len(arr) == 2:
- if arr[0] not in res:
- res[arr[0]]=[arr[1]]
- else:
- res[arr[0]].append(arr[1])
- return res
- @udf(returnType=MapType(StringType(), StringType()))
- def distinct_arrmap(map_list: List):
- res = {}
- for kv_map in map_list:
- if 'time' in res:
- if int(kv_map["time"]) > int(res["time"]):
- res = kv_map
- else:
- res = kv_map
- if len(res)==0:
- return {}
- else:
- return {"snovio":res["value"]}
- @udf(returnType=MapType(StringType(), StringType()))
- def distinct_arrlist(arr_list: List):
- # [inv,similar,field,last_time]
- res = {}
- for arr in arr_list:
- if len(res)==0:
- res["inv"] = [arr[0],arr[2],arr[3]]
- if arr[2] == 'email':
- res["similar"] = [arr[1],arr[2],arr[3]]
- continue
- if arr[2] == 'email':
- if arr[3] is None:
- continue
- if int(arr[3]) > int(res["inv"][2]):
- if arr[0] != '':
- res["inv"] = [arr[0],arr[2],arr[3]]
- else:
- if res["inv"][0] == '':
- res["inv"] = [arr[0], arr[2], arr[3]]
- if int(arr[3]) > int(res["similar"][2]):
- if arr[1] != '':
- res["similar"] = [arr[1], arr[2], arr[3]]
- else:
- if res["similar"][0] == '':
- res["similar"] = [arr[1], arr[2], arr[3]]
- else:
- if res["inv"][1]=='ep1':
- if arr[3] is None:
- continue
- if int(arr[3])>int(res["inv"][2]):
- res["inv"] = [arr[0],arr[2],arr[3]]
- else:
- if arr[2]=='ep1':
- res["inv"] = [arr[0],arr[2],arr[3]]
- else:
- if arr[3] is None:
- continue
- if int(arr[3]) > int(res["inv"][2]):
- res["inv"] = [arr[0], arr[2], arr[3]]
- # if res["similar"][1] == 'ep1':
- # if arr[3] is None:
- # continue
- # if int(arr[3]) > int(res["similar"][2]):
- # res["similar"] = [arr[1], arr[2], arr[3]]
- # else:
- # if arr[2] == 'ep1':
- # res["similar"] = [arr[1], arr[2], arr[3]]
- # else:
- # if arr[3] is None:
- # continue
- # if int(arr[3]) > int(res["similar"][2]):
- # res["similar"] = [arr[1], arr[2], arr[3]]
- wrap_res = {}
- if "similar" in res and res["similar"][0] is not None and res["similar"][0]!='':
- wrap_res["mail_sou_linkedin_similar"] = res["similar"][0]
- if "inv" in res and res["inv"][0] is not None and res["inv"][0]!='':
- wrap_res["mail_sou_linkedin_inv"] = res["inv"][0]
- return wrap_res
- @udf(returnType=StructType([
- StructField("status",BooleanType(),True),
- StructField("hidden",BooleanType(),True),
- StructField("start_date",TimestampType(),True),
- StructField("end_date",TimestampType(),True),
- StructField("insert_time",TimestampType(),True)
- ]))
- def merge_status_info(status_info_list: list):
- res = {
- "status":None,
- "hidden":None,
- "start_date":None,
- "end_date":None,
- "insert_time":None
- }
- if status_info_list is not None and len(status_info_list)>1:
- for status_info in status_info_list:
- if res["insert_time"] is None:
- res = status_info
- else:
- if res["insert_time"] < status_info["insert_time"]:
- res = status_info
- return res
- @udf(returnType=MapType(StringType(), StringType()))
- def merge_email(map_list: List):
- res = {}
- if map_list is not None:
- for kv_map in map_list:
- for k in kv_map.keys():
- if k not in res:
- res[k] = kv_map[k]
- return res
- @udf(returnType=MapType(StringType(), ArrayType(StringType())))
- def merge_source_p_id(map_obj_list: List[dict]):
- tmp_res = {}
- if map_obj_list is not None:
- for map_obj in map_obj_list:
- if map_obj is not None:
- for k, v in map_obj.items():
- if k not in tmp_res:
- tmp_res[k] = set(v)
- else:
- tmp_res[k].update(v)
- res = {}
- for key,value in tmp_res.items():
- res[key] = list(value)
- return res
- @udf(returnType=ArrayType(StringType()))
- def merge_source(incr_source: List,old_source: List):
- res = set()
- if incr_source is not None:
- for i in incr_source:
- if i is not None and i != "":
- res.add(i)
- if old_source is not None:
- for i in old_source:
- if i is not None and i != "":
- res.add(i)
- return list(res)
- @udf(returnType=ArrayType(StringType()))
- def merge_list(arr_list: List):
- res = set()
- for e in arr_list:
- if e is not None:
- for i in e:
- if i is not None and i != "":
- res.add(i)
- return list(res)
- @udf(returnType=MapType(StringType(),ArrayType(StringType())))
- def merge_position_map(left_dict_list :list):
- res = {}
- if left_dict_list is not None:
- for kv_map in left_dict_list:
- if kv_map is not None:
- for k,v in kv_map.items():
- if v is not None:
- if k not in res:
- res[k] = set(v)
- else:
- res[k].update(set(v))
- for k,v in res.items():
- res[k] = list(v)
- return res
- @udf(returnType=ArrayType(StringType()))
- def merge_location(arr_list: List):
- # [location,time]
- res = []
- for arr in arr_list:
- if arr is not None and len(arr) > 1:
- if len(res) == 0:
- res.extend(arr)
- else:
- if arr[1]>res[1]:
- res = arr
- return res
- @udf(returnType=ArrayType(StructType([
- StructField("channel",StringType(),False),
- StructField("channel_ids",ArrayType(StringType()),True),
- ])))
- def split_channel_to_arr(channels:list,channel_ids: dict):
- rest = []
- if channels is None:
- return rest
- for channel in channels:
- if channel_ids is not None and channel in channel_ids:
- rest.append({"channel":channel,"channel_ids":channel_ids[channel]})
- else:
- rest.append({"channel": channel, "channel_ids": None})
- return rest
- @udf(returnType=StructType([
- StructField("original",MapType(StringType(), StringType()),False),
- StructField("zh",MapType(StringType(), StringType()))
- ]))
- def merge_parse_email(incr_map: dict,old_dict: dict):
- if old_dict is None:
- res = {}
- else:
- res = old_dict
- parse_res = {}
- if incr_map is not None:
- for k in incr_map.keys():
- res[k] = incr_map[k]
- if "mail_sou_linkedin_inv" in res and res["mail_sou_linkedin_inv"]!='' and res["mail_sou_linkedin_inv"] is not None:
- parse_res["mail_sou"] = "可能退信"
- else:
- if "mail_sou_linkedin_similar" in res and res["mail_sou_linkedin_similar"] is not None:
- if res["mail_sou_linkedin_similar"]=='2':
- parse_res["mail_sou"] = "推测+验证"
- elif res["mail_sou_linkedin_similar"]=='-1':
- parse_res["mail_sou"] = "推测"
- elif res["mail_sou_linkedin_similar"]=='1':
- parse_res["mail_sou"] = "匹配"
- else:
- similar_f = float('0' + res["mail_sou_linkedin_similar"])
- if similar_f>=0.8 and similar_f<0.9:
- parse_res["mail_sou"] = "匹配度低"
- elif similar_f>=0.9 and similar_f<1:
- parse_res["mail_sou"] = "可能匹配"
- if "snovio" in res:
- if res["snovio"]=='valid':
- parse_res["snovio"]="匹配"
- elif res["snovio"]=="unknown":
- parse_res["snovio"]="匹配度低"
- elif res["snovio"]=="not valid":
- parse_res["snovio"]="可能退信"
- elif res["snovio"]=="greylisted":
- parse_res["snovio"]="推测"
- return res,parse_res
- @udf(returnType=StringType())
- def get_email_status(status_map: dict):
- if status_map is None or len(status_map) == 0:
- return None
- if 'snovio' in status_map:
- status_zh = status_map['snovio']
- elif 'mail_sou' in status_map:
- status_zh = status_map['mail_sou']
- else:
- status_zh = None
- if status_zh == '推测':
- return 'SPECULATION'
- elif status_zh == '推测+验证':
- return 'SPECULATION_VERIFICATION'
- elif status_zh == '匹配':
- return 'PERFECT_MATCH'
- elif status_zh == '可能匹配':
- return 'POSSIBLE_MATCH'
- elif status_zh == '匹配度低':
- return 'LOW_MATCH'
- elif status_zh == '可能退信':
- return 'POSSIBLE_REFUND'
- else:
- return None
- @udf(returnType=StringType())
- def get_media_type(social_media:str):
- # 根据本表[social_media]进行判断:
- # 1,带关键词"linkedin"-->linkedin
- # 2,带关键词"twitter"-->twitter
- # 3,带关键词"facebook"-->facebook
- if social_media is None:
- return None
- if 'linkedin' in social_media:
- return 'linkedin'
- elif 'twitter' in social_media:
- return 'twitter'
- elif 'facebook' in social_media:
- return 'facebook'
- @udf(returnType=StringType())
- def get_black_white_grey_status(action_status:str,action_status_code:str):
- def judge_by_action_status(action_status:str):
- if action_status=='CLICK' or action_status=='OPEN' or action_status=='SENT_SUCCESS':
- return "WHITE"
- elif action_status=='MISSING' or action_status=='INIT':
- return "GREY"
- elif action_status=='HARD_BOUNCE' or action_status=='SYSTEM_HARD' or action_status=='SYSTEM_SOFT' or action_status=='SYSTEM_BOUNCE' \
- or action_status=='UNSUBSCRIBE' or action_status=='SOFT_BOUNCE' or action_status=='SPAM_COMPLAINT' or action_status=='SYSTEM_UNSUBSCRIBE':
- return "BLACK"
- else:
- return "GREY"
- if action_status_code is None:
- return judge_by_action_status(action_status)
- else:
- if action_status_code == '503' or action_status_code == '506' or action_status_code == '507' or action_status_code == '508' or action_status_code == '401' \
- or action_status_code == '402' or action_status_code == '403' or action_status_code == '404' or action_status_code == '406' or action_status_code == '407' or action_status_code == '408' \
- or action_status_code == '509' or action_status_code == '409':
- return "BLACK"
- elif action_status_code == '505' or action_status_code == '405':
- return "GREY"
- else:
- return judge_by_action_status(action_status)
- @udf(returnType=IntegerType())
- def get_mail_status_priority(action_status:str,action_status_code:str):
- status_dict = {
- 'MISSING':0,
- 'SENT_SUCCESS':2,
- 'OPEN':4,
- 'CLICK':5,
- 'SOFT_BOUNCE':6,
- 'SYSTEM_SOFT':6,
- 'UNSUBSCRIBE':7,
- 'SYSTEM_UNSUBSCRIBE':7,
- 'SPAM_COMPLAINT':8,
- 'SYSTEM_BOUNCE':9,
- 'SYSTEM_HARD':9,
- 'HARD_BOUNCE':10
- }
- code_dict = {
- '506':10,
- '406':10,
- '404':10,
- '503':9,
- '401':9,
- '403':9,
- '402':8,
- '507':7,
- '407':7,
- '508':7,
- '408':7,
- '509':6,
- '409':6,
- '505':1,
- '405':1
- }
- if action_status_code is not None:
- if action_status_code in code_dict:
- return code_dict[action_status_code]
- else:
- if action_status in status_dict:
- return status_dict[action_status]
- else:
- return -1
- else:
- if action_status in status_dict:
- return status_dict[action_status]
- else:
- return -1
- @udf(returnType=StringType())
- def get_md5(*cols:str) -> str:
- col_and_len_list = []
- for col in cols:
- if col is not None:
- l = len(col)
- col_and_len_list.append(str(l))
- col_and_len_list.append(col)
- key = ''.join(col_and_len_list)
- if key is None or len(key) == 0:
- return ''
- md5 = hashlib.md5()
- md5.update(key.encode("utf-8"))
- return md5.hexdigest()
- @udf(returnType=StringType())
- def get_mail_usable(black_white_grey_status:str):
- if black_white_grey_status == 'WHITE':
- return 'Usable'
- elif black_white_grey_status == 'BLACK':
- return 'Disable'
- elif black_white_grey_status == 'GREY':
- return 'Uncertain'
- else:
- return None
- @udf(returnType=StructType(
- [
- StructField("info",
- ArrayType(StructType(
- [
- StructField("same",StringType(),False),
- StructField("name",StringType(),False),
- StructField("staff_count",IntegerType(),False)
- ]
- ),True)),
- StructField("num",StringType(),False)
- ])
- )
- def get_similar_comanynames(linkedin_related_companies:str):
- res_dict = {}
- similar_companies = []
- total_num = 0
- if linkedin_related_companies is None:
- res_dict["info"] = None
- res_dict["num"] = total_num
- return res_dict
- for company in json.loads(linkedin_related_companies, encoding="utf-8"):
- if company is None:
- continue
- if 'same' in company and 'name' in company and 'staffCount' in company:
- similar_companies.append({'same':company['same'],'name':company['name'],'staff_count':company['staffCount']})
- total_num += company['staffCount']
- res_dict["info"] = similar_companies
- res_dict["num"] = total_num
- return res_dict
- # 注册UDF并指定返回类型
- # get_json_arr = udf(parse_jsonarr_to_arr)
- # get_json_strarr = udf(parse_jsonarr_to_strarr)
- if __name__ == '__main__':
- a= parse_jsonarr_to_arr('[{"aaa":"bbb"},{"aaa":"bbb"}]')
- for i in a:
- print(i)
|