| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- """
- 通用 UDF —— 与业务无关的数据类型 / 格式操作(JSON / Array / String / Numeric / Date / Hash)
- SparkSQL 入口自动 ADD FILE 注册;业务专用 UDF 请放到 dw_base/udf/business/ 下按需加载
- """
- import difflib
- import hashlib
- import html
- import json
- import random
- import re
- from ast import literal_eval
- from datetime import datetime
- from typing import Dict, List, Union
- from pyspark.sql.functions import udf
- from pyspark.sql.types import (
- ArrayType, BooleanType, FloatType, LongType, MapType, StringType,
- )
- from dw_base.utils.datetime_utils import parse_datetime
- def _load_json_or_default(data, default=None):
- """优先按 JSON 解析,失败时返回默认值。"""
- try:
- return json.loads(data)
- except (TypeError, ValueError):
- return default
- def _load_json_or_literal(data, default=None):
- """先按 JSON 解析,失败后再按 Python 字面量兜底解析。"""
- parsed = _load_json_or_default(data, default=None)
- if parsed is not None:
- return parsed
- try:
- return literal_eval(data)
- except (ValueError, SyntaxError, TypeError):
- return default
- def _dedupe_keep_order(values: List) -> List:
- """按原始顺序去重。"""
- result = []
- for value in values:
- if value not in result:
- result.append(value)
- return result
- def _merge_non_empty_values(*arrays: List) -> List[str]:
- """合并多个数组,并过滤 None 与空字符串。"""
- result = set()
- for array in arrays:
- if array is None:
- continue
- for item in array:
- if item is not None and item != "":
- result.add(item)
- return list(result)
- # ==================== JSON ====================
- # UDF-01 JSON校验:判断输入是否为合法 JSON 字符串。
- @udf(returnType=BooleanType())
- def is_json(data) -> bool:
- """判断输入是否为合法 JSON 字符串。"""
- try:
- json.loads(data)
- except (TypeError, ValueError):
- return False
- return True
- # UDF-02 JSON取键:提取 JSON object 的 key 列表。
- @udf(returnType=ArrayType(StringType()))
- def json_object_keys(json_str: str) -> List[str]:
- """提取 JSON object 的 key 列表。"""
- if not json_str:
- return None
- json_dict = _load_json_or_default(json_str, default=None) # type:dict
- if not isinstance(json_dict, dict):
- return None
- return [k for k in json_dict.keys()]
- def flatten_json(json_str: str, reserve_parent: bool = True) -> str:
- """展平 JSON 字符串,`reserve_parent` 控制是否保留父级 key。"""
- def flatten_json_node(parent, json_element) -> Union[float, int, str, Dict, List]:
- if isinstance(json_element, dict):
- result = {}
- if parent and reserve_parent and reserve_parent is True:
- for key, value in json_element.items():
- result[f'{parent}.{key}'] = value
- else:
- for key, value in json_element.items():
- result.update(flatten_json_node(key, value))
- return result
- elif isinstance(json_element, list):
- result = []
- if parent and reserve_parent and reserve_parent is True:
- for index in range(len(json_element)):
- result.append(flatten_json_node(f'{parent}.[{index}]', json_element[index]))
- else:
- for index in range(len(json_element)):
- result.append(flatten_json_node(None, json_element[index]))
- return result
- else:
- return {parent: json_element}
- if not json_str:
- return json_str
- try:
- json_node = json.loads(json_str)
- flattened_json = flatten_json_node(None, json_node)
- return json.dumps(flattened_json, ensure_ascii=False)
- except (TypeError, ValueError):
- return json_str
- def remove_empty_key(info):
- """递归删除 JSON 中 value 为空的 key。"""
- json_info = json.loads(info)
- def internal_remove(json_info):
- try:
- if isinstance(json_info, dict):
- info_re = dict()
- for key, value in json_info.items():
- if isinstance(value, dict) or isinstance(value, list):
- re = internal_remove(value)
- if len(re):
- info_re[key] = re
- elif value not in ['', {}, [], 'null', None]:
- info_re[key] = str(value)
- return info_re
- elif isinstance(json_info, list):
- info_re = list()
- for value in json_info:
- if isinstance(value, dict) or isinstance(value, list):
- re = internal_remove(value)
- if len(re):
- info_re.append(re)
- elif value not in ['', {}, [], 'null', None]:
- info_re.append(str(value))
- return info_re
- else:
- return None
- except Exception as e:
- return None
- return json.dumps(internal_remove(json_info), ensure_ascii=False)
- def append_to_json_array(json_array_string: str, new_element, remove_duplicate: bool = False) -> str:
- """向 JSON array 末尾追加元素,可选去重。"""
- if not new_element:
- return json_array_string
- if not json_array_string:
- return json.dumps([new_element], ensure_ascii=False)
- json_array = _load_json_or_default(json_array_string, default=None) # type: list
- if not isinstance(json_array, list):
- return json_array_string
- json_array.append(new_element)
- if remove_duplicate is True:
- return json.dumps(_dedupe_keep_order(json_array), ensure_ascii=False)
- return json.dumps(json_array, ensure_ascii=False)
- def json_array_subset(json_array_string: str,
- subset_fields: Union[List, str],
- as_list: bool = False,
- skip_null: bool = False) -> str:
- """按字段提取 JSON object array 的子集。"""
- if not json_array_string:
- return None
- if not subset_fields:
- return None
- if isinstance(subset_fields, str):
- subset_field_list = subset_fields.split(',')
- else:
- subset_field_list = subset_fields
- if len(subset_field_list) == 0:
- return None
- json_array = _load_json_or_literal(json_array_string, default=None)
- if not isinstance(json_array, list):
- return None
- list_subset = []
- if len(subset_field_list) == 1 and as_list:
- only_subset_field = subset_field_list[0]
- for element in json_array: # type:Dict
- if isinstance(element, dict):
- field_value = element.get(only_subset_field)
- if field_value or not skip_null:
- list_subset.append(field_value)
- else:
- for element in json_array: # type:Dict
- subset_of_element = {}
- if isinstance(element, dict):
- for field in subset_field_list:
- field_value = element.get(field)
- if field_value or not skip_null:
- subset_of_element[field] = field_value
- list_subset.append(subset_of_element)
- return json.dumps(list_subset, ensure_ascii=False)
- # ==================== ARRAY ====================
- # UDF-21 数组交集:计算两个数组的交集。
- @udf(returnType=ArrayType(StringType()))
- def array_intersect(arr1, arr2):
- """计算两个数组的交集。"""
- return list(set(arr1) & set(arr2))
- def array_append(array: List, new_element,
- ignore_null: bool = False,
- remove_duplicate: bool = False,
- need_sort: bool = False) -> List:
- """向数组追加元素,可按现有规则控制空值、去重和排序。"""
- if not array or len(array) == 0:
- if new_element or ignore_null is not True:
- return [new_element]
- return []
- if not new_element:
- if ignore_null is True:
- return array
- else:
- if array.__contains__(new_element) and remove_duplicate is True:
- return array
- array.append(new_element)
- if need_sort:
- array.sort()
- return array
- # UDF-22 数组切片:按起止下标截取数组。
- @udf(ArrayType(StringType()))
- def array_slice(input_array, start, end):
- """截取数组切片,行为与 Python 切片一致。"""
- if input_array:
- return input_array[start:end]
- return []
- # UDF-23 数组合并:合并二维数组,并过滤 None 与空字符串。
- @udf(returnType=ArrayType(StringType()))
- def merge_list(arr_list: List):
- """合并二维数组,并过滤 None 与空字符串。"""
- return _merge_non_empty_values(*(arr_list or []))
- # ==================== STRING ====================
- # UDF-31 中文检测:判断字符串中是否包含中文字符。
- @udf(returnType=BooleanType())
- def has_chinese(datum: str) -> bool:
- """判断字符串中是否包含中文字符。"""
- if datum:
- pattern = re.compile(u'[\u4e00-\u9fa5]')
- if pattern.search(datum):
- return True
- return False
- # UDF-32 相似度计算:计算两个字符串的快速相似度。
- @udf(returnType=FloatType())
- def similarity(left: str, right: str) -> float:
- """计算两个字符串的快速相似度。"""
- return difflib.SequenceMatcher(None, left, right).quick_ratio()
- # UDF-33 正则全提取:提取正则表达式的全部匹配结果。
- @udf(returnType=ArrayType(StringType()))
- def regexp_extract_all(col: str, ptn: str, g: int = 0):
- """提取正则表达式的全部匹配结果。"""
- return [e.group(g) for e in re.compile(ptn).finditer(col if col else '')]
- def add_random_number_prefix(datum: str, separator: str, floor: int, ceiling: int) -> str:
- """给字符串追加随机数字前缀。"""
- return f'{random.randint(floor, ceiling)}{separator}{datum}'
- def field_merge(delimiter: str, *fields_values):
- """合并多个字段值,去重后用指定分隔符拼接。"""
- if not fields_values:
- return None
- result = []
- for value in fields_values:
- if value and value.strip() not in result:
- result.append(value.strip())
- return delimiter.join(result)
- def space2null(text):
- """把空白字符串规范化为 None。"""
- if text and not text.isspace():
- return text
- return None
- def merge_ws(text: str):
- """压缩多余空白符,只保留单个空格。"""
- if text:
- return ' '.join(text.split())
- return None
- def remove_special_char(text, char):
- """如果字符串以指定字符结尾,则移除最后一个字符。"""
- if text is not None and text.endswith(char):
- return text[:-1]
- return text
- def html_unescape(text):
- """反转义 HTML 实体。"""
- return html.unescape(text)
- # ==================== NUMERIC / DATE / HASH ====================
- def max_value(*args):
- """按现有真假值规则返回最大值。"""
- maxi_value = None
- for elem in args:
- if not elem:
- continue
- if not maxi_value or elem > maxi_value:
- maxi_value = elem
- return maxi_value
- def min_value(*args):
- """按现有真假值规则返回最小值。"""
- mini_value = None
- for elem in args:
- if not elem:
- continue
- if not mini_value or elem < mini_value:
- mini_value = elem
- return mini_value
- def millis_timestamp_to_str(ts: int, str_format: str = None) -> str:
- """把毫秒时间戳转换为时间字符串。"""
- date_time = datetime.fromtimestamp(ts / 1000.0)
- if str_format:
- return date_time.strftime(str_format)
- return date_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
- # UDF-41 时间解析:把日期字符串解析为时间戳。
- @udf(returnType=LongType())
- def parse_datetime_to_timestamp(date_time: str, in_milli_seconds: bool = False, original_format: str = None) -> int:
- """字符串日期 → 时间戳;支持 YY.MM.DD / YYYY年M月D日 启发式识别"""
- try:
- if date_time:
- d = date_time.split('.')
- if len(date_time) == 8 and len(d) == 3 and len(d[0]) == 2:
- date_time = '20' + date_time
- ret = re.match(r'(\d+)年(\d+)月(\d+)日', date_time)
- if ret:
- date_time = ret.group().replace('年', '-').replace('月', '-').replace('日', '')
- parsed_date = parse_datetime(date_time, original_format)
- if not parsed_date:
- return None
- if in_milli_seconds is True:
- return int(parsed_date.timestamp() * 1000)
- return int(parsed_date.timestamp())
- except:
- try:
- date_time = int(date_time)
- if datetime.now().timestamp() < date_time:
- return date_time if in_milli_seconds else int(date_time / 1000)
- return date_time * 1000 if in_milli_seconds else date_time
- except Exception:
- return None
- # UDF-42 MD5摘要:把多列值按长度前缀拼接后计算 MD5。
- @udf(returnType=StringType())
- def get_md5(*cols: str) -> str:
- """多列拼接(带长度前缀防碰撞)后取 md5"""
- col_and_len_list = []
- for col in cols:
- if col is not None:
- col_and_len_list.append(str(len(col)))
- col_and_len_list.append(col)
- key = ''.join(col_and_len_list)
- if not key:
- return ''
- md5 = hashlib.md5()
- md5.update(key.encode("utf-8"))
- return md5.hexdigest()
- # ==================== CROSS-TYPE CONVERTERS ====================
- def array_to_json(arr: List):
- """把数组序列化为 JSON 字符串。"""
- return json.dumps(arr, ensure_ascii=False)
- def map_to_json(map: dict):
- """把字典序列化为 JSON 字符串。"""
- return json.dumps(map, ensure_ascii=False)
- def struct_to_json(struct):
- """把结构体对象转换为 JSON 字符串。"""
- json_dict = {key: struct[key] for key in struct.__dict__["__fields__"]}
- return json.dumps(json_dict, ensure_ascii=False)
- def num_to_str(number):
- """把数值转换成字符串,整数型浮点数去掉小数位。"""
- if isinstance(number, float) and number.is_integer():
- return '{:.0f}'.format(number)
- return str(int(number)) if isinstance(number, int) else str(number)
- # UDF-51 字符串转数组:把 JSON array 字符串转换为 Python list。
- @udf(returnType=ArrayType(StringType()))
- def str_to_arr(json_str: str) -> list:
- """把 JSON array 字符串转换为 Python list。"""
- if json_str:
- parsed = _load_json_or_default(json_str, default=[])
- return parsed if isinstance(parsed, list) else []
- return []
- # UDF-52 字符串转JSON字符串数组:把 JSON array 转为 JSON 字符串数组。
- @udf(returnType=ArrayType(StringType()))
- def str_to_json_arr(json_str):
- """JSON array 字符串 → list of json strings(每个元素再 json.dumps)"""
- if json_str:
- try:
- str_arr = json.loads(json_str)
- if isinstance(str_arr, list):
- return [json.dumps(sm) for sm in str_arr]
- except json.JSONDecodeError:
- return []
- return []
- # UDF-53 字符串转Map数组:把 JSON array 字符串转换为 map 数组。
- @udf(returnType=ArrayType(MapType(StringType(), StringType())))
- def str_to_map_arr(json_str: str) -> list:
- """把 JSON array 字符串转换为 map 数组。"""
- if json_str:
- parsed = _load_json_or_default(json_str, default=[])
- return parsed if isinstance(parsed, list) else []
- return []
|