spark_json_array_udf.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. import hashlib
  2. import json
  3. from collections import Counter
  4. from typing import List
  5. from pyspark.sql.functions import udf
  6. from pyspark.sql.types import StringType, ArrayType, StructType, StructField, IntegerType, FloatType, MapType,BooleanType,TimestampType
  7. @udf(returnType=ArrayType(StructType([
  8. StructField("idx",IntegerType(),False),
  9. StructField("obj",StringType(),False)
  10. ])))
  11. def parse_jsonarr_to_arr(s:str)->[(int,str)]:
  12. res_arr = [(i+1,json.dumps(obj)) for i,obj in enumerate(json.loads(s))]
  13. return res_arr
  14. @udf(returnType=ArrayType(StructType([
  15. StructField("idx",IntegerType(),False),
  16. StructField("obj",StringType(),False)
  17. ])))
  18. def parse_jsonarr_to_strarr(s:str)->[(int,str)]:
  19. res_arr = [(i+1,obj) for i,obj in enumerate(json.loads(s))]
  20. return res_arr
  21. @udf(returnType=StructType([
  22. StructField("k",ArrayType(StringType()),False),
  23. StructField("kv",StringType())
  24. ]))
  25. def parse_arr_and_count(arr,tag:str,return_count:int=-1):
  26. ele_cnt_dict = Counter(arr)
  27. json_list = sorted([{"code": key, "num": value} for key, value in ele_cnt_dict.items()],key=lambda x:x["num"], reverse=True)
  28. if return_count < 0:
  29. return [obj['code'] for obj in json_list],",".join(['{'+f'{i["code"]},{tag}:{i["num"]}'+'}' for i in json_list])
  30. else:
  31. list_len = len(json_list)
  32. index = list_len
  33. if return_count < list_len:
  34. index = return_count
  35. return [obj['code'] for obj in json_list][:index],",".join(['{'+f'{i["code"]},{tag}:{i["num"]}'+'}' for i in json_list[:index]])
  36. @udf(returnType=StructType([
  37. StructField("sum",FloatType(),False),
  38. StructField("list",StringType())
  39. ]))
  40. def parse_arr_and_sum(struct_arr,tag:str):
  41. sum_dict={}
  42. for s in struct_arr:
  43. key = s[0]
  44. value:float = s[1]
  45. if key not in sum_dict:
  46. sum_dict[key]=0.0
  47. if value is not None:
  48. sum_dict[key] += value
  49. json_list = sorted([{"code": key, "num": value} for key, value in sum_dict.items()],key=lambda x:x["num"], reverse=True)
  50. total = 0.0
  51. for obj in json_list:
  52. total += obj["num"]
  53. return round(total,2),",".join(['{'+f'{i["code"]},{tag}:{round(i["num"],2)}'+'}' for i in json_list])
  54. @udf(returnType=StringType())
  55. def split_str_to_jsonstr(str_list: List):
  56. res = []
  57. for kv_str in str_list:
  58. arr = kv_str.split(':')
  59. if len(arr) == 2:
  60. res.append({arr[0]: arr[1]})
  61. return json.dumps(res,ensure_ascii=False)
  62. @udf(returnType=MapType(StringType(), ArrayType(StringType())))
  63. def split_str_to_maparr(str_list: List):
  64. res = {}
  65. for kv_str in str_list:
  66. arr = kv_str.split(':')
  67. if len(arr) == 2:
  68. if arr[0] not in res:
  69. res[arr[0]]=[arr[1]]
  70. else:
  71. res[arr[0]].append(arr[1])
  72. return res
  73. @udf(returnType=MapType(StringType(), StringType()))
  74. def distinct_arrmap(map_list: List):
  75. res = {}
  76. for kv_map in map_list:
  77. if 'time' in res:
  78. if int(kv_map["time"]) > int(res["time"]):
  79. res = kv_map
  80. else:
  81. res = kv_map
  82. if len(res)==0:
  83. return {}
  84. else:
  85. return {"snovio":res["value"]}
  86. @udf(returnType=MapType(StringType(), StringType()))
  87. def distinct_arrlist(arr_list: List):
  88. # [inv,similar,field,last_time]
  89. res = {}
  90. for arr in arr_list:
  91. if len(res)==0:
  92. res["inv"] = [arr[0],arr[2],arr[3]]
  93. if arr[2] == 'email':
  94. res["similar"] = [arr[1],arr[2],arr[3]]
  95. continue
  96. if arr[2] == 'email':
  97. if arr[3] is None:
  98. continue
  99. if int(arr[3]) > int(res["inv"][2]):
  100. if arr[0] != '':
  101. res["inv"] = [arr[0],arr[2],arr[3]]
  102. else:
  103. if res["inv"][0] == '':
  104. res["inv"] = [arr[0], arr[2], arr[3]]
  105. if int(arr[3]) > int(res["similar"][2]):
  106. if arr[1] != '':
  107. res["similar"] = [arr[1], arr[2], arr[3]]
  108. else:
  109. if res["similar"][0] == '':
  110. res["similar"] = [arr[1], arr[2], arr[3]]
  111. else:
  112. if res["inv"][1]=='ep1':
  113. if arr[3] is None:
  114. continue
  115. if int(arr[3])>int(res["inv"][2]):
  116. res["inv"] = [arr[0],arr[2],arr[3]]
  117. else:
  118. if arr[2]=='ep1':
  119. res["inv"] = [arr[0],arr[2],arr[3]]
  120. else:
  121. if arr[3] is None:
  122. continue
  123. if int(arr[3]) > int(res["inv"][2]):
  124. res["inv"] = [arr[0], arr[2], arr[3]]
  125. # if res["similar"][1] == 'ep1':
  126. # if arr[3] is None:
  127. # continue
  128. # if int(arr[3]) > int(res["similar"][2]):
  129. # res["similar"] = [arr[1], arr[2], arr[3]]
  130. # else:
  131. # if arr[2] == 'ep1':
  132. # res["similar"] = [arr[1], arr[2], arr[3]]
  133. # else:
  134. # if arr[3] is None:
  135. # continue
  136. # if int(arr[3]) > int(res["similar"][2]):
  137. # res["similar"] = [arr[1], arr[2], arr[3]]
  138. wrap_res = {}
  139. if "similar" in res and res["similar"][0] is not None and res["similar"][0]!='':
  140. wrap_res["mail_sou_linkedin_similar"] = res["similar"][0]
  141. if "inv" in res and res["inv"][0] is not None and res["inv"][0]!='':
  142. wrap_res["mail_sou_linkedin_inv"] = res["inv"][0]
  143. return wrap_res
  144. @udf(returnType=StructType([
  145. StructField("status",BooleanType(),True),
  146. StructField("hidden",BooleanType(),True),
  147. StructField("start_date",TimestampType(),True),
  148. StructField("end_date",TimestampType(),True),
  149. StructField("insert_time",TimestampType(),True)
  150. ]))
  151. def merge_status_info(status_info_list: list):
  152. res = {
  153. "status":None,
  154. "hidden":None,
  155. "start_date":None,
  156. "end_date":None,
  157. "insert_time":None
  158. }
  159. if status_info_list is not None and len(status_info_list)>1:
  160. for status_info in status_info_list:
  161. if res["insert_time"] is None:
  162. res = status_info
  163. else:
  164. if res["insert_time"] < status_info["insert_time"]:
  165. res = status_info
  166. return res
  167. @udf(returnType=MapType(StringType(), StringType()))
  168. def merge_email(map_list: List):
  169. res = {}
  170. if map_list is not None:
  171. for kv_map in map_list:
  172. for k in kv_map.keys():
  173. if k not in res:
  174. res[k] = kv_map[k]
  175. return res
  176. @udf(returnType=MapType(StringType(), ArrayType(StringType())))
  177. def merge_source_p_id(map_obj_list: List[dict]):
  178. tmp_res = {}
  179. if map_obj_list is not None:
  180. for map_obj in map_obj_list:
  181. if map_obj is not None:
  182. for k, v in map_obj.items():
  183. if k not in tmp_res:
  184. tmp_res[k] = set(v)
  185. else:
  186. tmp_res[k].update(v)
  187. res = {}
  188. for key,value in tmp_res.items():
  189. res[key] = list(value)
  190. return res
  191. @udf(returnType=ArrayType(StringType()))
  192. def merge_source(incr_source: List,old_source: List):
  193. res = set()
  194. if incr_source is not None:
  195. for i in incr_source:
  196. if i is not None and i != "":
  197. res.add(i)
  198. if old_source is not None:
  199. for i in old_source:
  200. if i is not None and i != "":
  201. res.add(i)
  202. return list(res)
  203. @udf(returnType=ArrayType(StringType()))
  204. def merge_list(arr_list: List):
  205. res = set()
  206. for e in arr_list:
  207. if e is not None:
  208. for i in e:
  209. if i is not None and i != "":
  210. res.add(i)
  211. return list(res)
  212. @udf(returnType=MapType(StringType(),ArrayType(StringType())))
  213. def merge_position_map(left_dict_list :list):
  214. res = {}
  215. if left_dict_list is not None:
  216. for kv_map in left_dict_list:
  217. if kv_map is not None:
  218. for k,v in kv_map.items():
  219. if v is not None:
  220. if k not in res:
  221. res[k] = set(v)
  222. else:
  223. res[k].update(set(v))
  224. for k,v in res.items():
  225. res[k] = list(v)
  226. return res
  227. @udf(returnType=ArrayType(StringType()))
  228. def merge_location(arr_list: List):
  229. # [location,time]
  230. res = []
  231. for arr in arr_list:
  232. if arr is not None and len(arr) > 1:
  233. if len(res) == 0:
  234. res.extend(arr)
  235. else:
  236. if arr[1]>res[1]:
  237. res = arr
  238. return res
  239. @udf(returnType=ArrayType(StructType([
  240. StructField("channel",StringType(),False),
  241. StructField("channel_ids",ArrayType(StringType()),True),
  242. ])))
  243. def split_channel_to_arr(channels:list,channel_ids: dict):
  244. rest = []
  245. if channels is None:
  246. return rest
  247. for channel in channels:
  248. if channel_ids is not None and channel in channel_ids:
  249. rest.append({"channel":channel,"channel_ids":channel_ids[channel]})
  250. else:
  251. rest.append({"channel": channel, "channel_ids": None})
  252. return rest
  253. @udf(returnType=StructType([
  254. StructField("original",MapType(StringType(), StringType()),False),
  255. StructField("zh",MapType(StringType(), StringType()))
  256. ]))
  257. def merge_parse_email(incr_map: dict,old_dict: dict):
  258. if old_dict is None:
  259. res = {}
  260. else:
  261. res = old_dict
  262. parse_res = {}
  263. if incr_map is not None:
  264. for k in incr_map.keys():
  265. res[k] = incr_map[k]
  266. if "mail_sou_linkedin_inv" in res and res["mail_sou_linkedin_inv"]!='' and res["mail_sou_linkedin_inv"] is not None:
  267. parse_res["mail_sou"] = "可能退信"
  268. else:
  269. if "mail_sou_linkedin_similar" in res and res["mail_sou_linkedin_similar"] is not None:
  270. if res["mail_sou_linkedin_similar"]=='2':
  271. parse_res["mail_sou"] = "推测+验证"
  272. elif res["mail_sou_linkedin_similar"]=='-1':
  273. parse_res["mail_sou"] = "推测"
  274. elif res["mail_sou_linkedin_similar"]=='1':
  275. parse_res["mail_sou"] = "匹配"
  276. else:
  277. similar_f = float('0' + res["mail_sou_linkedin_similar"])
  278. if similar_f>=0.8 and similar_f<0.9:
  279. parse_res["mail_sou"] = "匹配度低"
  280. elif similar_f>=0.9 and similar_f<1:
  281. parse_res["mail_sou"] = "可能匹配"
  282. if "snovio" in res:
  283. if res["snovio"]=='valid':
  284. parse_res["snovio"]="匹配"
  285. elif res["snovio"]=="unknown":
  286. parse_res["snovio"]="匹配度低"
  287. elif res["snovio"]=="not valid":
  288. parse_res["snovio"]="可能退信"
  289. elif res["snovio"]=="greylisted":
  290. parse_res["snovio"]="推测"
  291. return res,parse_res
  292. @udf(returnType=StringType())
  293. def get_email_status(status_map: dict):
  294. if status_map is None or len(status_map) == 0:
  295. return None
  296. if 'snovio' in status_map:
  297. status_zh = status_map['snovio']
  298. elif 'mail_sou' in status_map:
  299. status_zh = status_map['mail_sou']
  300. else:
  301. status_zh = None
  302. if status_zh == '推测':
  303. return 'SPECULATION'
  304. elif status_zh == '推测+验证':
  305. return 'SPECULATION_VERIFICATION'
  306. elif status_zh == '匹配':
  307. return 'PERFECT_MATCH'
  308. elif status_zh == '可能匹配':
  309. return 'POSSIBLE_MATCH'
  310. elif status_zh == '匹配度低':
  311. return 'LOW_MATCH'
  312. elif status_zh == '可能退信':
  313. return 'POSSIBLE_REFUND'
  314. else:
  315. return None
  316. @udf(returnType=StringType())
  317. def get_media_type(social_media:str):
  318. # 根据本表[social_media]进行判断:
  319. # 1,带关键词"linkedin"-->linkedin
  320. # 2,带关键词"twitter"-->twitter
  321. # 3,带关键词"facebook"-->facebook
  322. if social_media is None:
  323. return None
  324. if 'linkedin' in social_media:
  325. return 'linkedin'
  326. elif 'twitter' in social_media:
  327. return 'twitter'
  328. elif 'facebook' in social_media:
  329. return 'facebook'
  330. @udf(returnType=StringType())
  331. def get_black_white_grey_status(action_status:str,action_status_code:str):
  332. def judge_by_action_status(action_status:str):
  333. if action_status=='CLICK' or action_status=='OPEN' or action_status=='SENT_SUCCESS':
  334. return "WHITE"
  335. elif action_status=='MISSING' or action_status=='INIT':
  336. return "GREY"
  337. elif action_status=='HARD_BOUNCE' or action_status=='SYSTEM_HARD' or action_status=='SYSTEM_SOFT' or action_status=='SYSTEM_BOUNCE' \
  338. or action_status=='UNSUBSCRIBE' or action_status=='SOFT_BOUNCE' or action_status=='SPAM_COMPLAINT' or action_status=='SYSTEM_UNSUBSCRIBE':
  339. return "BLACK"
  340. else:
  341. return "GREY"
  342. if action_status_code is None:
  343. return judge_by_action_status(action_status)
  344. else:
  345. if action_status_code == '503' or action_status_code == '506' or action_status_code == '507' or action_status_code == '508' or action_status_code == '401' \
  346. or action_status_code == '402' or action_status_code == '403' or action_status_code == '404' or action_status_code == '406' or action_status_code == '407' or action_status_code == '408' \
  347. or action_status_code == '509' or action_status_code == '409':
  348. return "BLACK"
  349. elif action_status_code == '505' or action_status_code == '405':
  350. return "GREY"
  351. else:
  352. return judge_by_action_status(action_status)
  353. @udf(returnType=IntegerType())
  354. def get_mail_status_priority(action_status:str,action_status_code:str):
  355. status_dict = {
  356. 'MISSING':0,
  357. 'SENT_SUCCESS':2,
  358. 'OPEN':4,
  359. 'CLICK':5,
  360. 'SOFT_BOUNCE':6,
  361. 'SYSTEM_SOFT':6,
  362. 'UNSUBSCRIBE':7,
  363. 'SYSTEM_UNSUBSCRIBE':7,
  364. 'SPAM_COMPLAINT':8,
  365. 'SYSTEM_BOUNCE':9,
  366. 'SYSTEM_HARD':9,
  367. 'HARD_BOUNCE':10
  368. }
  369. code_dict = {
  370. '506':10,
  371. '406':10,
  372. '404':10,
  373. '503':9,
  374. '401':9,
  375. '403':9,
  376. '402':8,
  377. '507':7,
  378. '407':7,
  379. '508':7,
  380. '408':7,
  381. '509':6,
  382. '409':6,
  383. '505':1,
  384. '405':1
  385. }
  386. if action_status_code is not None:
  387. if action_status_code in code_dict:
  388. return code_dict[action_status_code]
  389. else:
  390. if action_status in status_dict:
  391. return status_dict[action_status]
  392. else:
  393. return -1
  394. else:
  395. if action_status in status_dict:
  396. return status_dict[action_status]
  397. else:
  398. return -1
  399. @udf(returnType=StringType())
  400. def get_md5(*cols:str) -> str:
  401. col_and_len_list = []
  402. for col in cols:
  403. if col is not None:
  404. l = len(col)
  405. col_and_len_list.append(str(l))
  406. col_and_len_list.append(col)
  407. key = ''.join(col_and_len_list)
  408. if key is None or len(key) == 0:
  409. return ''
  410. md5 = hashlib.md5()
  411. md5.update(key.encode("utf-8"))
  412. return md5.hexdigest()
  413. @udf(returnType=StringType())
  414. def get_mail_usable(black_white_grey_status:str):
  415. if black_white_grey_status == 'WHITE':
  416. return 'Usable'
  417. elif black_white_grey_status == 'BLACK':
  418. return 'Disable'
  419. elif black_white_grey_status == 'GREY':
  420. return 'Uncertain'
  421. else:
  422. return None
  423. @udf(returnType=StructType(
  424. [
  425. StructField("info",
  426. ArrayType(StructType(
  427. [
  428. StructField("same",StringType(),False),
  429. StructField("name",StringType(),False),
  430. StructField("staff_count",IntegerType(),False)
  431. ]
  432. ),True)),
  433. StructField("num",StringType(),False)
  434. ])
  435. )
  436. def get_similar_comanynames(linkedin_related_companies:str):
  437. res_dict = {}
  438. similar_companies = []
  439. total_num = 0
  440. if linkedin_related_companies is None:
  441. res_dict["info"] = None
  442. res_dict["num"] = total_num
  443. return res_dict
  444. for company in json.loads(linkedin_related_companies, encoding="utf-8"):
  445. if company is None:
  446. continue
  447. if 'same' in company and 'name' in company and 'staffCount' in company:
  448. similar_companies.append({'same':company['same'],'name':company['name'],'staff_count':company['staffCount']})
  449. total_num += company['staffCount']
  450. res_dict["info"] = similar_companies
  451. res_dict["num"] = total_num
  452. return res_dict
  453. # 注册UDF并指定返回类型
  454. # get_json_arr = udf(parse_jsonarr_to_arr)
  455. # get_json_strarr = udf(parse_jsonarr_to_strarr)
  456. if __name__ == '__main__':
  457. a= parse_jsonarr_to_arr('[{"aaa":"bbb"},{"aaa":"bbb"}]')
  458. for i in a:
  459. print(i)