#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- """ 通用 UDF —— 与业务无关的数据类型 / 格式操作(JSON / Array / String / Numeric / Date / Hash) SparkSQL 入口自动 ADD FILE 注册;业务专用 UDF 请放到 dw_base/udf/business/ 下按需加载 """ import difflib import hashlib import html import json import random import re from ast import literal_eval from datetime import datetime from typing import Dict, List, Union from pyspark.sql.functions import udf from pyspark.sql.types import ( ArrayType, BooleanType, FloatType, LongType, MapType, StringType, ) from dw_base.utils.datetime_utils import parse_datetime def _load_json_or_default(data, default=None): """优先按 JSON 解析,失败时返回默认值。""" try: return json.loads(data) except (TypeError, ValueError): return default def _load_json_or_literal(data, default=None): """先按 JSON 解析,失败后再按 Python 字面量兜底解析。""" parsed = _load_json_or_default(data, default=None) if parsed is not None: return parsed try: return literal_eval(data) except (ValueError, SyntaxError, TypeError): return default def _dedupe_keep_order(values: List) -> List: """按原始顺序去重。""" result = [] for value in values: if value not in result: result.append(value) return result def _merge_non_empty_values(*arrays: List) -> List[str]: """合并多个数组,并过滤 None 与空字符串。""" result = set() for array in arrays: if array is None: continue for item in array: if item is not None and item != "": result.add(item) return list(result) # ==================== JSON ==================== # UDF-01 JSON校验:判断输入是否为合法 JSON 字符串。 @udf(returnType=BooleanType()) def is_json(data) -> bool: """判断输入是否为合法 JSON 字符串。""" try: json.loads(data) except (TypeError, ValueError): return False return True # UDF-02 JSON取键:提取 JSON object 的 key 列表。 @udf(returnType=ArrayType(StringType())) def json_object_keys(json_str: str) -> List[str]: """提取 JSON object 的 key 列表。""" if not json_str: return None json_dict = _load_json_or_default(json_str, default=None) # type:dict if not isinstance(json_dict, dict): return None return [k for k in json_dict.keys()] def flatten_json(json_str: str, reserve_parent: bool = True) -> str: """展平 JSON 字符串,`reserve_parent` 控制是否保留父级 key。""" def flatten_json_node(parent, json_element) -> Union[float, int, str, Dict, List]: if isinstance(json_element, dict): result = {} if parent and reserve_parent and reserve_parent is True: for key, value in json_element.items(): result[f'{parent}.{key}'] = value else: for key, value in json_element.items(): result.update(flatten_json_node(key, value)) return result elif isinstance(json_element, list): result = [] if parent and reserve_parent and reserve_parent is True: for index in range(len(json_element)): result.append(flatten_json_node(f'{parent}.[{index}]', json_element[index])) else: for index in range(len(json_element)): result.append(flatten_json_node(None, json_element[index])) return result else: return {parent: json_element} if not json_str: return json_str try: json_node = json.loads(json_str) flattened_json = flatten_json_node(None, json_node) return json.dumps(flattened_json, ensure_ascii=False) except (TypeError, ValueError): return json_str def remove_empty_key(info): """递归删除 JSON 中 value 为空的 key。""" json_info = json.loads(info) def internal_remove(json_info): try: if isinstance(json_info, dict): info_re = dict() for key, value in json_info.items(): if isinstance(value, dict) or isinstance(value, list): re = internal_remove(value) if len(re): info_re[key] = re elif value not in ['', {}, [], 'null', None]: info_re[key] = str(value) return info_re elif isinstance(json_info, list): info_re = list() for value in json_info: if isinstance(value, dict) or isinstance(value, list): re = internal_remove(value) if len(re): info_re.append(re) elif value not in ['', {}, [], 'null', None]: info_re.append(str(value)) return info_re else: return None except Exception as e: return None return json.dumps(internal_remove(json_info), ensure_ascii=False) def append_to_json_array(json_array_string: str, new_element, remove_duplicate: bool = False) -> str: """向 JSON array 末尾追加元素,可选去重。""" if not new_element: return json_array_string if not json_array_string: return json.dumps([new_element], ensure_ascii=False) json_array = _load_json_or_default(json_array_string, default=None) # type: list if not isinstance(json_array, list): return json_array_string json_array.append(new_element) if remove_duplicate is True: return json.dumps(_dedupe_keep_order(json_array), ensure_ascii=False) return json.dumps(json_array, ensure_ascii=False) def json_array_subset(json_array_string: str, subset_fields: Union[List, str], as_list: bool = False, skip_null: bool = False) -> str: """按字段提取 JSON object array 的子集。""" if not json_array_string: return None if not subset_fields: return None if isinstance(subset_fields, str): subset_field_list = subset_fields.split(',') else: subset_field_list = subset_fields if len(subset_field_list) == 0: return None json_array = _load_json_or_literal(json_array_string, default=None) if not isinstance(json_array, list): return None list_subset = [] if len(subset_field_list) == 1 and as_list: only_subset_field = subset_field_list[0] for element in json_array: # type:Dict if isinstance(element, dict): field_value = element.get(only_subset_field) if field_value or not skip_null: list_subset.append(field_value) else: for element in json_array: # type:Dict subset_of_element = {} if isinstance(element, dict): for field in subset_field_list: field_value = element.get(field) if field_value or not skip_null: subset_of_element[field] = field_value list_subset.append(subset_of_element) return json.dumps(list_subset, ensure_ascii=False) # ==================== ARRAY ==================== # UDF-21 数组交集:计算两个数组的交集。 @udf(returnType=ArrayType(StringType())) def array_intersect(arr1, arr2): """计算两个数组的交集。""" return list(set(arr1) & set(arr2)) def array_append(array: List, new_element, ignore_null: bool = False, remove_duplicate: bool = False, need_sort: bool = False) -> List: """向数组追加元素,可按现有规则控制空值、去重和排序。""" if not array or len(array) == 0: if new_element or ignore_null is not True: return [new_element] return [] if not new_element: if ignore_null is True: return array else: if array.__contains__(new_element) and remove_duplicate is True: return array array.append(new_element) if need_sort: array.sort() return array # UDF-22 数组切片:按起止下标截取数组。 @udf(ArrayType(StringType())) def array_slice(input_array, start, end): """截取数组切片,行为与 Python 切片一致。""" if input_array: return input_array[start:end] return [] # UDF-23 数组合并:合并二维数组,并过滤 None 与空字符串。 @udf(returnType=ArrayType(StringType())) def merge_list(arr_list: List): """合并二维数组,并过滤 None 与空字符串。""" return _merge_non_empty_values(*(arr_list or [])) # ==================== STRING ==================== # UDF-31 中文检测:判断字符串中是否包含中文字符。 @udf(returnType=BooleanType()) def has_chinese(datum: str) -> bool: """判断字符串中是否包含中文字符。""" if datum: pattern = re.compile(u'[\u4e00-\u9fa5]') if pattern.search(datum): return True return False # UDF-32 相似度计算:计算两个字符串的快速相似度。 @udf(returnType=FloatType()) def similarity(left: str, right: str) -> float: """计算两个字符串的快速相似度。""" return difflib.SequenceMatcher(None, left, right).quick_ratio() # UDF-33 正则全提取:提取正则表达式的全部匹配结果。 @udf(returnType=ArrayType(StringType())) def regexp_extract_all(col: str, ptn: str, g: int = 0): """提取正则表达式的全部匹配结果。""" return [e.group(g) for e in re.compile(ptn).finditer(col if col else '')] def add_random_number_prefix(datum: str, separator: str, floor: int, ceiling: int) -> str: """给字符串追加随机数字前缀。""" return f'{random.randint(floor, ceiling)}{separator}{datum}' def field_merge(delimiter: str, *fields_values): """合并多个字段值,去重后用指定分隔符拼接。""" if not fields_values: return None result = [] for value in fields_values: if value and value.strip() not in result: result.append(value.strip()) return delimiter.join(result) def space2null(text): """把空白字符串规范化为 None。""" if text and not text.isspace(): return text return None def merge_ws(text: str): """压缩多余空白符,只保留单个空格。""" if text: return ' '.join(text.split()) return None def remove_special_char(text, char): """如果字符串以指定字符结尾,则移除最后一个字符。""" if text is not None and text.endswith(char): return text[:-1] return text def html_unescape(text): """反转义 HTML 实体。""" return html.unescape(text) # ==================== NUMERIC / DATE / HASH ==================== def max_value(*args): """按现有真假值规则返回最大值。""" maxi_value = None for elem in args: if not elem: continue if not maxi_value or elem > maxi_value: maxi_value = elem return maxi_value def min_value(*args): """按现有真假值规则返回最小值。""" mini_value = None for elem in args: if not elem: continue if not mini_value or elem < mini_value: mini_value = elem return mini_value def millis_timestamp_to_str(ts: int, str_format: str = None) -> str: """把毫秒时间戳转换为时间字符串。""" date_time = datetime.fromtimestamp(ts / 1000.0) if str_format: return date_time.strftime(str_format) return date_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] # UDF-41 时间解析:把日期字符串解析为时间戳。 @udf(returnType=LongType()) def parse_datetime_to_timestamp(date_time: str, in_milli_seconds: bool = False, original_format: str = None) -> int: """字符串日期 → 时间戳;支持 YY.MM.DD / YYYY年M月D日 启发式识别""" try: if date_time: d = date_time.split('.') if len(date_time) == 8 and len(d) == 3 and len(d[0]) == 2: date_time = '20' + date_time ret = re.match(r'(\d+)年(\d+)月(\d+)日', date_time) if ret: date_time = ret.group().replace('年', '-').replace('月', '-').replace('日', '') parsed_date = parse_datetime(date_time, original_format) if not parsed_date: return None if in_milli_seconds is True: return int(parsed_date.timestamp() * 1000) return int(parsed_date.timestamp()) except: try: date_time = int(date_time) if datetime.now().timestamp() < date_time: return date_time if in_milli_seconds else int(date_time / 1000) return date_time * 1000 if in_milli_seconds else date_time except Exception: return None # UDF-42 MD5摘要:把多列值按长度前缀拼接后计算 MD5。 @udf(returnType=StringType()) def get_md5(*cols: str) -> str: """多列拼接(带长度前缀防碰撞)后取 md5""" col_and_len_list = [] for col in cols: if col is not None: col_and_len_list.append(str(len(col))) col_and_len_list.append(col) key = ''.join(col_and_len_list) if not key: return '' md5 = hashlib.md5() md5.update(key.encode("utf-8")) return md5.hexdigest() # ==================== CROSS-TYPE CONVERTERS ==================== def array_to_json(arr: List): """把数组序列化为 JSON 字符串。""" return json.dumps(arr, ensure_ascii=False) def map_to_json(map: dict): """把字典序列化为 JSON 字符串。""" return json.dumps(map, ensure_ascii=False) def struct_to_json(struct): """把结构体对象转换为 JSON 字符串。""" json_dict = {key: struct[key] for key in struct.__dict__["__fields__"]} return json.dumps(json_dict, ensure_ascii=False) def num_to_str(number): """把数值转换成字符串,整数型浮点数去掉小数位。""" if isinstance(number, float) and number.is_integer(): return '{:.0f}'.format(number) return str(int(number)) if isinstance(number, int) else str(number) # UDF-51 字符串转数组:把 JSON array 字符串转换为 Python list。 @udf(returnType=ArrayType(StringType())) def str_to_arr(json_str: str) -> list: """把 JSON array 字符串转换为 Python list。""" if json_str: parsed = _load_json_or_default(json_str, default=[]) return parsed if isinstance(parsed, list) else [] return [] # UDF-52 字符串转JSON字符串数组:把 JSON array 转为 JSON 字符串数组。 @udf(returnType=ArrayType(StringType())) def str_to_json_arr(json_str): """JSON array 字符串 → list of json strings(每个元素再 json.dumps)""" if json_str: try: str_arr = json.loads(json_str) if isinstance(str_arr, list): return [json.dumps(sm) for sm in str_arr] except json.JSONDecodeError: return [] return [] # UDF-53 字符串转Map数组:把 JSON array 字符串转换为 map 数组。 @udf(returnType=ArrayType(MapType(StringType(), StringType()))) def str_to_map_arr(json_str: str) -> list: """把 JSON array 字符串转换为 map 数组。""" if json_str: parsed = _load_json_or_default(json_str, default=[]) return parsed if isinstance(parsed, list) else [] return []