#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- """ 通用 UDF —— 与业务无关的数据类型 / 格式操作(JSON / Array / String / Numeric / Date / Hash) SparkSQL 入口自动 ADD FILE 注册;业务专用 UDF 请放到 dw_base/udf/business/ 下按需加载 """ import difflib import hashlib import html import json import random import re import traceback from collections import Counter from datetime import datetime from typing import Dict, List, Union from pyspark.sql.functions import udf from pyspark.sql.types import ( ArrayType, BooleanType, FloatType, IntegerType, LongType, MapType, StringType, StructField, StructType, ) from dw_base.utils.datetime_utils import parse_datetime # ==================== JSON ==================== @udf(returnType=BooleanType()) def is_json(data) -> bool: try: json.loads(data) except: return False return True @udf(returnType=ArrayType(StringType())) def json_object_keys(json_str: str) -> List[str]: if not json_str: return None try: json_dict = json.loads(json_str) # type:dict return [k for k in json_dict.keys()] except: return None def flatten_json(json_str: str, reserve_parent: bool = True) -> str: """展平 json,reserve_parent 控制是否保留父 key""" def flatten_json_node(parent, json_element) -> Union[float, int, str, Dict, List]: if isinstance(json_element, dict): result = {} if parent and reserve_parent and reserve_parent is True: for key, value in json_element.items(): result[f'{parent}.{key}'] = value else: for key, value in json_element.items(): result.update(flatten_json_node(key, value)) return result elif isinstance(json_element, list): result = [] if parent and reserve_parent and reserve_parent is True: for index in range(len(json_element)): result.append(flatten_json_node(f'{parent}.[{index}]', json_element[index])) else: for index in range(len(json_element)): result.append(flatten_json_node(None, json_element[index])) return result else: return {parent: json_element} if not json_str: return json_str try: json_node = json.loads(json_str) flattened_json = flatten_json_node(None, json_node) return json.dumps(flattened_json, ensure_ascii=False) except Exception as e: traceback.format_exc(e) return json_str def remove_empty_key(info): """递归删除 json 中 value 为空的 key""" json_info = json.loads(info) def internal_remove(json_info): try: if isinstance(json_info, dict): info_re = dict() for key, value in json_info.items(): if isinstance(value, dict) or isinstance(value, list): re = internal_remove(value) if len(re): info_re[key] = re elif value not in ['', {}, [], 'null', None]: info_re[key] = str(value) return info_re elif isinstance(json_info, list): info_re = list() for value in json_info: if isinstance(value, dict) or isinstance(value, list): re = internal_remove(value) if len(re): info_re.append(re) elif value not in ['', {}, [], 'null', None]: info_re.append(str(value)) return info_re else: return None except Exception as e: return None return json.dumps(internal_remove(json_info), ensure_ascii=False) def append_to_json_array(json_array_string: str, new_element, remove_duplicate: bool = False) -> str: """向 JSON array 追加元素,可选去重""" if not new_element: return json_array_string if not json_array_string: return json.dumps([new_element], ensure_ascii=False) json_array = json.loads(json_array_string) # type: list json_array.append(new_element) if remove_duplicate is True: result = [] for elem in json_array: if result.__contains__(elem): continue result.append(elem) return json.dumps(result, ensure_ascii=False) return json.dumps(json_array, ensure_ascii=False) def json_array_subset(json_array_string: str, subset_fields: Union[List, str], as_list: bool = False, skip_null: bool = False) -> str: """按字段提取 json object array 的子集""" if not json_array_string: return None if not subset_fields: return None if isinstance(subset_fields, str): subset_field_list = subset_fields.split(',') else: subset_field_list = subset_fields if len(subset_field_list) == 0: return None try: json_array = json.loads(json_array_string) except: json_array = eval(json_array_string) list_subset = [] if len(subset_field_list) == 1 and as_list: only_subset_field = subset_field_list[0] for element in json_array: # type:Dict if isinstance(element, dict): field_value = element.get(only_subset_field) if field_value or not skip_null: list_subset.append(field_value) else: for element in json_array: # type:Dict subset_of_element = {} if isinstance(element, dict): for field in subset_field_list: field_value = element.get(field) if field_value or not skip_null: subset_of_element[field] = field_value list_subset.append(subset_of_element) return json.dumps(list_subset, ensure_ascii=False) @udf(returnType=ArrayType(StructType([ StructField("idx", IntegerType(), False), StructField("obj", StringType(), False), ]))) def parse_jsonarr_to_arr(s: str): return [(i + 1, json.dumps(obj)) for i, obj in enumerate(json.loads(s))] @udf(returnType=ArrayType(StructType([ StructField("idx", IntegerType(), False), StructField("obj", StringType(), False), ]))) def parse_jsonarr_to_strarr(s: str): return [(i + 1, obj) for i, obj in enumerate(json.loads(s))] # ==================== ARRAY ==================== @udf(returnType=ArrayType(StringType())) def array_intersect(arr1, arr2): return list(set(arr1) & set(arr2)) def array_append(array: List, new_element, ignore_null: bool = False, remove_duplicate: bool = False, need_sort: bool = False) -> List: if not array or len(array) == 0: if new_element or ignore_null is not True: return [new_element] return [] if not new_element: if ignore_null is True: return array else: if array.__contains__(new_element) and remove_duplicate is True: return array array.append(new_element) if need_sort: array.sort() return array @udf(ArrayType(StringType())) def array_slice(input_array, start, end): if input_array: return input_array[start:end] return [] @udf(returnType=ArrayType(StringType())) def merge_list(arr_list: List): res = set() for e in arr_list: if e is not None: for i in e: if i is not None and i != "": res.add(i) return list(res) @udf(returnType=ArrayType(StringType())) def merge_source(incr_source: List, old_source: List): res = set() if incr_source is not None: for i in incr_source: if i is not None and i != "": res.add(i) if old_source is not None: for i in old_source: if i is not None and i != "": res.add(i) return list(res) @udf(returnType=StructType([ StructField("k", ArrayType(StringType()), False), StructField("kv", StringType()), ])) def parse_arr_and_count(arr, tag: str, return_count: int = -1): ele_cnt_dict = Counter(arr) json_list = sorted([{"code": key, "num": value} for key, value in ele_cnt_dict.items()], key=lambda x: x["num"], reverse=True) if return_count < 0: return [obj['code'] for obj in json_list], ",".join(['{' + f'{i["code"]},{tag}:{i["num"]}' + '}' for i in json_list]) list_len = len(json_list) index = list_len if return_count >= list_len else return_count return [obj['code'] for obj in json_list][:index], ",".join(['{' + f'{i["code"]},{tag}:{i["num"]}' + '}' for i in json_list[:index]]) @udf(returnType=StructType([ StructField("sum", FloatType(), False), StructField("list", StringType()), ])) def parse_arr_and_sum(struct_arr, tag: str): sum_dict = {} for s in struct_arr: key = s[0] value: float = s[1] if key not in sum_dict: sum_dict[key] = 0.0 if value is not None: sum_dict[key] += value json_list = sorted([{"code": key, "num": value} for key, value in sum_dict.items()], key=lambda x: x["num"], reverse=True) total = 0.0 for obj in json_list: total += obj["num"] return round(total, 2), ",".join(['{' + f'{i["code"]},{tag}:{round(i["num"], 2)}' + '}' for i in json_list]) # ==================== STRING ==================== @udf(returnType=BooleanType()) def has_chinese(datum: str) -> bool: if datum: pattern = re.compile(u'[\u4e00-\u9fa5]') if pattern.search(datum): return True return False @udf(returnType=FloatType()) def similarity(left: str, right: str) -> float: return difflib.SequenceMatcher(None, left, right).quick_ratio() @udf(returnType=ArrayType(StringType())) def regexp_extract_all(col: str, ptn: str, g: int = 0): return [e.group(g) for e in re.compile(ptn).finditer(col if col else '')] def add_random_number_prefix(datum: str, separator: str, floor: int, ceiling: int) -> str: return f'{random.randint(floor, ceiling)}{separator}{datum}' def field_merge(delimiter: str, *fields_values): """多字段合并,相同仅保留一个,不同用 delimiter 分隔""" if not fields_values: return None result = [] [result.append(value.strip()) for value in fields_values if value and value.strip() not in result] return delimiter.join(result) def space2null(text): if text and not text.isspace(): return text return None def merge_ws(text: str): if text: return ' '.join(text.split()) return None def remove_special_char(text, char): if text is not None and text.endswith(char): return text[:-1] return text @udf(returnType=ArrayType(StringType())) def explode_str_to_arr(text: str) -> list: """大于 8 位时,从后往前每次少一位截取子串入数组(用于前缀匹配场景)""" if text is None: return [] if len(text) <= 8: return [text] return [text[:i] for i in range(len(text), 7, -1)] def html_unescape(text): return html.unescape(text) # ==================== NUMERIC / DATE / HASH ==================== def max_value(*args): maxi_value = None for elem in args: if not elem: continue if not maxi_value or elem > maxi_value: maxi_value = elem return maxi_value def min_value(*args): mini_value = None for elem in args: if not elem: continue if not mini_value or elem < mini_value: mini_value = elem return mini_value def millis_timestamp_to_str(ts: int, str_format: str = None) -> str: date_time = datetime.fromtimestamp(ts / 1000.0) if str_format: return date_time.strftime(str_format) return date_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] @udf(returnType=LongType()) def parse_datetime_to_timestamp(date_time: str, in_milli_seconds: bool = False, original_format: str = None) -> int: """字符串日期 → 时间戳;支持 YY.MM.DD / YYYY年M月D日 启发式识别""" try: if date_time: d = date_time.split('.') if len(date_time) == 8 and len(d) == 3 and len(d[0]) == 2: date_time = '20' + date_time ret = re.match(r'(\d+)年(\d+)月(\d+)日', date_time) if ret: date_time = ret.group().replace('年', '-').replace('月', '-').replace('日', '') parsed_date = parse_datetime(date_time, original_format) if not parsed_date: return None if in_milli_seconds is True: return int(parsed_date.timestamp() * 1000) return int(parsed_date.timestamp()) except: try: date_time = int(date_time) if datetime.now().timestamp() < date_time: return date_time if in_milli_seconds else int(date_time / 1000) return date_time * 1000 if in_milli_seconds else date_time except Exception: return None @udf(returnType=StringType()) def get_md5(*cols: str) -> str: """多列拼接(带长度前缀防碰撞)后取 md5""" col_and_len_list = [] for col in cols: if col is not None: col_and_len_list.append(str(len(col))) col_and_len_list.append(col) key = ''.join(col_and_len_list) if not key: return '' md5 = hashlib.md5() md5.update(key.encode("utf-8")) return md5.hexdigest() # ==================== CROSS-TYPE CONVERTERS ==================== def array_to_json(arr: List): return json.dumps(arr, ensure_ascii=False) def map_to_json(map: dict): return json.dumps(map, ensure_ascii=False) def struct_to_json(struct): json_dict = {key: struct[key] for key in struct.__dict__["__fields__"]} return json.dumps(json_dict, ensure_ascii=False) def num_to_str(number): if isinstance(number, float) and number.is_integer(): return '{:.0f}'.format(number) return str(int(number)) if isinstance(number, int) else str(number) @udf(returnType=ArrayType(StringType())) def str_to_arr(json_str: str) -> list: if json_str: return json.loads(json_str) return [] @udf(returnType=ArrayType(StringType())) def str_to_json_arr(json_str): """JSON array 字符串 → list of json strings(每个元素再 json.dumps)""" if json_str: try: str_arr = json.loads(json_str) if isinstance(str_arr, list): return [json.dumps(sm) for sm in str_arr] except json.JSONDecodeError: return [] return [] @udf(returnType=ArrayType(MapType(StringType(), StringType()))) def str_to_map_arr(json_str: str) -> list: if json_str: return json.loads(json_str) return [] @udf(returnType=StringType()) def split_str_to_jsonstr(str_list: List): """每个元素按 ':' 切成 k:v,聚合成 JSON 字符串""" res = [] for kv_str in str_list: arr = kv_str.split(':') if len(arr) == 2: res.append({arr[0]: arr[1]}) return json.dumps(res, ensure_ascii=False) @udf(returnType=MapType(StringType(), ArrayType(StringType()))) def split_str_to_maparr(str_list: List): """每个元素按 ':' 切成 k:v,同 key 追加到 list""" res = {} for kv_str in str_list: arr = kv_str.split(':') if len(arr) == 2: if arr[0] not in res: res[arr[0]] = [arr[1]] else: res[arr[0]].append(arr[1]) return res