| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- """
- 通用 UDF —— 与业务无关的数据类型 / 格式操作(JSON / Array / String / Numeric / Date / Hash)
- SparkSQL 入口自动 ADD FILE 注册;业务专用 UDF 请放到 dw_base/udf/business/ 下按需加载
- """
- import difflib
- import hashlib
- import html
- import json
- import random
- import re
- import traceback
- from collections import Counter
- from datetime import datetime
- from typing import Dict, List, Union
- from pyspark.sql.functions import udf
- from pyspark.sql.types import (
- ArrayType, BooleanType, FloatType, IntegerType, LongType, MapType,
- StringType, StructField, StructType,
- )
- from dw_base.utils.datetime_utils import parse_datetime
- # ==================== JSON ====================
- @udf(returnType=BooleanType())
- def is_json(data) -> bool:
- try:
- json.loads(data)
- except:
- return False
- return True
- @udf(returnType=ArrayType(StringType()))
- def json_object_keys(json_str: str) -> List[str]:
- if not json_str:
- return None
- try:
- json_dict = json.loads(json_str) # type:dict
- return [k for k in json_dict.keys()]
- except:
- return None
- def flatten_json(json_str: str, reserve_parent: bool = True) -> str:
- """展平 json,reserve_parent 控制是否保留父 key"""
- def flatten_json_node(parent, json_element) -> Union[float, int, str, Dict, List]:
- if isinstance(json_element, dict):
- result = {}
- if parent and reserve_parent and reserve_parent is True:
- for key, value in json_element.items():
- result[f'{parent}.{key}'] = value
- else:
- for key, value in json_element.items():
- result.update(flatten_json_node(key, value))
- return result
- elif isinstance(json_element, list):
- result = []
- if parent and reserve_parent and reserve_parent is True:
- for index in range(len(json_element)):
- result.append(flatten_json_node(f'{parent}.[{index}]', json_element[index]))
- else:
- for index in range(len(json_element)):
- result.append(flatten_json_node(None, json_element[index]))
- return result
- else:
- return {parent: json_element}
- if not json_str:
- return json_str
- try:
- json_node = json.loads(json_str)
- flattened_json = flatten_json_node(None, json_node)
- return json.dumps(flattened_json, ensure_ascii=False)
- except Exception as e:
- traceback.format_exc(e)
- return json_str
- def remove_empty_key(info):
- """递归删除 json 中 value 为空的 key"""
- json_info = json.loads(info)
- def internal_remove(json_info):
- try:
- if isinstance(json_info, dict):
- info_re = dict()
- for key, value in json_info.items():
- if isinstance(value, dict) or isinstance(value, list):
- re = internal_remove(value)
- if len(re):
- info_re[key] = re
- elif value not in ['', {}, [], 'null', None]:
- info_re[key] = str(value)
- return info_re
- elif isinstance(json_info, list):
- info_re = list()
- for value in json_info:
- if isinstance(value, dict) or isinstance(value, list):
- re = internal_remove(value)
- if len(re):
- info_re.append(re)
- elif value not in ['', {}, [], 'null', None]:
- info_re.append(str(value))
- return info_re
- else:
- return None
- except Exception as e:
- return None
- return json.dumps(internal_remove(json_info), ensure_ascii=False)
- def append_to_json_array(json_array_string: str, new_element, remove_duplicate: bool = False) -> str:
- """向 JSON array 追加元素,可选去重"""
- if not new_element:
- return json_array_string
- if not json_array_string:
- return json.dumps([new_element], ensure_ascii=False)
- json_array = json.loads(json_array_string) # type: list
- json_array.append(new_element)
- if remove_duplicate is True:
- result = []
- for elem in json_array:
- if result.__contains__(elem):
- continue
- result.append(elem)
- return json.dumps(result, ensure_ascii=False)
- return json.dumps(json_array, ensure_ascii=False)
- def json_array_subset(json_array_string: str,
- subset_fields: Union[List, str],
- as_list: bool = False,
- skip_null: bool = False) -> str:
- """按字段提取 json object array 的子集"""
- if not json_array_string:
- return None
- if not subset_fields:
- return None
- if isinstance(subset_fields, str):
- subset_field_list = subset_fields.split(',')
- else:
- subset_field_list = subset_fields
- if len(subset_field_list) == 0:
- return None
- try:
- json_array = json.loads(json_array_string)
- except:
- json_array = eval(json_array_string)
- list_subset = []
- if len(subset_field_list) == 1 and as_list:
- only_subset_field = subset_field_list[0]
- for element in json_array: # type:Dict
- if isinstance(element, dict):
- field_value = element.get(only_subset_field)
- if field_value or not skip_null:
- list_subset.append(field_value)
- else:
- for element in json_array: # type:Dict
- subset_of_element = {}
- if isinstance(element, dict):
- for field in subset_field_list:
- field_value = element.get(field)
- if field_value or not skip_null:
- subset_of_element[field] = field_value
- list_subset.append(subset_of_element)
- return json.dumps(list_subset, ensure_ascii=False)
- @udf(returnType=ArrayType(StructType([
- StructField("idx", IntegerType(), False),
- StructField("obj", StringType(), False),
- ])))
- def parse_jsonarr_to_arr(s: str):
- return [(i + 1, json.dumps(obj)) for i, obj in enumerate(json.loads(s))]
- @udf(returnType=ArrayType(StructType([
- StructField("idx", IntegerType(), False),
- StructField("obj", StringType(), False),
- ])))
- def parse_jsonarr_to_strarr(s: str):
- return [(i + 1, obj) for i, obj in enumerate(json.loads(s))]
- # ==================== ARRAY ====================
- @udf(returnType=ArrayType(StringType()))
- def array_intersect(arr1, arr2):
- return list(set(arr1) & set(arr2))
- def array_append(array: List, new_element,
- ignore_null: bool = False,
- remove_duplicate: bool = False,
- need_sort: bool = False) -> List:
- if not array or len(array) == 0:
- if new_element or ignore_null is not True:
- return [new_element]
- return []
- if not new_element:
- if ignore_null is True:
- return array
- else:
- if array.__contains__(new_element) and remove_duplicate is True:
- return array
- array.append(new_element)
- if need_sort:
- array.sort()
- return array
- @udf(ArrayType(StringType()))
- def array_slice(input_array, start, end):
- if input_array:
- return input_array[start:end]
- return []
- @udf(returnType=ArrayType(StringType()))
- def merge_list(arr_list: List):
- res = set()
- for e in arr_list:
- if e is not None:
- for i in e:
- if i is not None and i != "":
- res.add(i)
- return list(res)
- @udf(returnType=ArrayType(StringType()))
- def merge_source(incr_source: List, old_source: List):
- res = set()
- if incr_source is not None:
- for i in incr_source:
- if i is not None and i != "":
- res.add(i)
- if old_source is not None:
- for i in old_source:
- if i is not None and i != "":
- res.add(i)
- return list(res)
- @udf(returnType=StructType([
- StructField("k", ArrayType(StringType()), False),
- StructField("kv", StringType()),
- ]))
- def parse_arr_and_count(arr, tag: str, return_count: int = -1):
- ele_cnt_dict = Counter(arr)
- json_list = sorted([{"code": key, "num": value} for key, value in ele_cnt_dict.items()], key=lambda x: x["num"], reverse=True)
- if return_count < 0:
- return [obj['code'] for obj in json_list], ",".join(['{' + f'{i["code"]},{tag}:{i["num"]}' + '}' for i in json_list])
- list_len = len(json_list)
- index = list_len if return_count >= list_len else return_count
- return [obj['code'] for obj in json_list][:index], ",".join(['{' + f'{i["code"]},{tag}:{i["num"]}' + '}' for i in json_list[:index]])
- @udf(returnType=StructType([
- StructField("sum", FloatType(), False),
- StructField("list", StringType()),
- ]))
- def parse_arr_and_sum(struct_arr, tag: str):
- sum_dict = {}
- for s in struct_arr:
- key = s[0]
- value: float = s[1]
- if key not in sum_dict:
- sum_dict[key] = 0.0
- if value is not None:
- sum_dict[key] += value
- json_list = sorted([{"code": key, "num": value} for key, value in sum_dict.items()], key=lambda x: x["num"], reverse=True)
- total = 0.0
- for obj in json_list:
- total += obj["num"]
- return round(total, 2), ",".join(['{' + f'{i["code"]},{tag}:{round(i["num"], 2)}' + '}' for i in json_list])
- # ==================== STRING ====================
- @udf(returnType=BooleanType())
- def has_chinese(datum: str) -> bool:
- if datum:
- pattern = re.compile(u'[\u4e00-\u9fa5]')
- if pattern.search(datum):
- return True
- return False
- @udf(returnType=FloatType())
- def similarity(left: str, right: str) -> float:
- return difflib.SequenceMatcher(None, left, right).quick_ratio()
- @udf(returnType=ArrayType(StringType()))
- def regexp_extract_all(col: str, ptn: str, g: int = 0):
- return [e.group(g) for e in re.compile(ptn).finditer(col if col else '')]
- def add_random_number_prefix(datum: str, separator: str, floor: int, ceiling: int) -> str:
- return f'{random.randint(floor, ceiling)}{separator}{datum}'
- def field_merge(delimiter: str, *fields_values):
- """多字段合并,相同仅保留一个,不同用 delimiter 分隔"""
- if not fields_values:
- return None
- result = []
- [result.append(value.strip()) for value in fields_values if value and value.strip() not in result]
- return delimiter.join(result)
- def space2null(text):
- if text and not text.isspace():
- return text
- return None
- def merge_ws(text: str):
- if text:
- return ' '.join(text.split())
- return None
- def remove_special_char(text, char):
- if text is not None and text.endswith(char):
- return text[:-1]
- return text
- @udf(returnType=ArrayType(StringType()))
- def explode_str_to_arr(text: str) -> list:
- """大于 8 位时,从后往前每次少一位截取子串入数组(用于前缀匹配场景)"""
- if text is None:
- return []
- if len(text) <= 8:
- return [text]
- return [text[:i] for i in range(len(text), 7, -1)]
- def html_unescape(text):
- return html.unescape(text)
- # ==================== NUMERIC / DATE / HASH ====================
- def max_value(*args):
- maxi_value = None
- for elem in args:
- if not elem:
- continue
- if not maxi_value or elem > maxi_value:
- maxi_value = elem
- return maxi_value
- def min_value(*args):
- mini_value = None
- for elem in args:
- if not elem:
- continue
- if not mini_value or elem < mini_value:
- mini_value = elem
- return mini_value
- def millis_timestamp_to_str(ts: int, str_format: str = None) -> str:
- date_time = datetime.fromtimestamp(ts / 1000.0)
- if str_format:
- return date_time.strftime(str_format)
- return date_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
- @udf(returnType=LongType())
- def parse_datetime_to_timestamp(date_time: str, in_milli_seconds: bool = False, original_format: str = None) -> int:
- """字符串日期 → 时间戳;支持 YY.MM.DD / YYYY年M月D日 启发式识别"""
- try:
- if date_time:
- d = date_time.split('.')
- if len(date_time) == 8 and len(d) == 3 and len(d[0]) == 2:
- date_time = '20' + date_time
- ret = re.match(r'(\d+)年(\d+)月(\d+)日', date_time)
- if ret:
- date_time = ret.group().replace('年', '-').replace('月', '-').replace('日', '')
- parsed_date = parse_datetime(date_time, original_format)
- if not parsed_date:
- return None
- if in_milli_seconds is True:
- return int(parsed_date.timestamp() * 1000)
- return int(parsed_date.timestamp())
- except:
- try:
- date_time = int(date_time)
- if datetime.now().timestamp() < date_time:
- return date_time if in_milli_seconds else int(date_time / 1000)
- return date_time * 1000 if in_milli_seconds else date_time
- except Exception:
- return None
- @udf(returnType=StringType())
- def get_md5(*cols: str) -> str:
- """多列拼接(带长度前缀防碰撞)后取 md5"""
- col_and_len_list = []
- for col in cols:
- if col is not None:
- col_and_len_list.append(str(len(col)))
- col_and_len_list.append(col)
- key = ''.join(col_and_len_list)
- if not key:
- return ''
- md5 = hashlib.md5()
- md5.update(key.encode("utf-8"))
- return md5.hexdigest()
- # ==================== CROSS-TYPE CONVERTERS ====================
- def array_to_json(arr: List):
- return json.dumps(arr, ensure_ascii=False)
- def map_to_json(map: dict):
- return json.dumps(map, ensure_ascii=False)
- def struct_to_json(struct):
- json_dict = {key: struct[key] for key in struct.__dict__["__fields__"]}
- return json.dumps(json_dict, ensure_ascii=False)
- def num_to_str(number):
- if isinstance(number, float) and number.is_integer():
- return '{:.0f}'.format(number)
- return str(int(number)) if isinstance(number, int) else str(number)
- @udf(returnType=ArrayType(StringType()))
- def str_to_arr(json_str: str) -> list:
- if json_str:
- return json.loads(json_str)
- return []
- @udf(returnType=ArrayType(StringType()))
- def str_to_json_arr(json_str):
- """JSON array 字符串 → list of json strings(每个元素再 json.dumps)"""
- if json_str:
- try:
- str_arr = json.loads(json_str)
- if isinstance(str_arr, list):
- return [json.dumps(sm) for sm in str_arr]
- except json.JSONDecodeError:
- return []
- return []
- @udf(returnType=ArrayType(MapType(StringType(), StringType())))
- def str_to_map_arr(json_str: str) -> list:
- if json_str:
- return json.loads(json_str)
- return []
- @udf(returnType=StringType())
- def split_str_to_jsonstr(str_list: List):
- """每个元素按 ':' 切成 k:v,聚合成 JSON 字符串"""
- res = []
- for kv_str in str_list:
- arr = kv_str.split(':')
- if len(arr) == 2:
- res.append({arr[0]: arr[1]})
- return json.dumps(res, ensure_ascii=False)
- @udf(returnType=MapType(StringType(), ArrayType(StringType())))
- def split_str_to_maparr(str_list: List):
- """每个元素按 ':' 切成 k:v,同 key 追加到 list"""
- res = {}
- for kv_str in str_list:
- arr = kv_str.split(':')
- if len(arr) == 2:
- if arr[0] not in res:
- res[arr[0]] = [arr[1]]
- else:
- res[arr[0]].append(arr[1])
- return res
|