#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- import difflib import json import random import re import traceback from datetime import datetime from typing import Union, List, Dict import pygeohash from pyspark.sql.functions import udf from pyspark.sql.types import StringType, ArrayType, BooleanType, FloatType, LongType, MapType from dw_base.utils.datetime_utils import parse_datetime def add_random_number_prefix(datum: str, separator: str, floor: int, ceiling: int) -> str: """ 为字段添加随机数字前缀 Args: datum: separator: 原数据与随机前缀的分隔符 floor: 随机数字前缀下限 ceiling: 随机数字前缀上限 Returns: """ return f'{random.randint(floor, ceiling)}{separator}{datum}' def append_to_json_array(json_array_string: str, new_element, remove_duplicate: bool = False) -> str: """ 向JSON array添加元素 Args: json_array_string: JSON array字符串 new_element: 要添加的元素 remove_duplicate: 是否去重 Returns: """ if not new_element: return json_array_string if not json_array_string: return json.dumps([new_element], ensure_ascii=False) json_array = json.loads(json_array_string) # type: list json_array.append(new_element) if remove_duplicate is True: result = [] for elem in json_array: if result.__contains__(elem): continue result.append(elem) return json.dumps(result, ensure_ascii=False) return json.dumps(json_array, ensure_ascii=False) def array_append(array: List, new_element, ignore_null: bool = False, remove_duplicate: bool = False, need_sort: bool = False) -> List: if not array or len(array) == 0: if new_element or ignore_null is not True: return [new_element] return [] if not new_element: if ignore_null is True: return array else: if array.__contains__(new_element) and remove_duplicate is True: return array array.append(new_element) if need_sort: array.sort() return array def field_merge(delimiter: str, *fields_values): """ 两个字段合并,如果相同只取一个,不同用delimiter分隔 Args: delimiter: *fields_values: Returns: """ if not fields_values: return None result = [] [result.append(value.strip()) for value in fields_values if value and value.strip() not in result] return delimiter.join(result) def flatten_json(json_str: str, reserve_parent: bool = True) -> str: """ 展平json Args: json_str: 待展平的json reserve_parent: 是否保留父key,默认保留 Returns: """ def flatten_json_node(parent, json_element) -> Union[float, int, str, Dict, List]: if isinstance(json_element, dict): result = {} if parent and reserve_parent and reserve_parent is True: for key, value in json_element.items(): result[f'{parent}.{key}'] = value else: for key, value in json_element.items(): result.update(flatten_json_node(key, value)) return result elif isinstance(json_element, list): result = [] if parent and reserve_parent and reserve_parent is True: for index in range(len(json_element)): result.append(flatten_json_node(f'{parent}.[{index}]', json_element[index])) else: for index in range(len(json_element)): result.append(flatten_json_node(None, json_element[index])) return result else: return {parent: json_element} if not json_str: return json_str try: json_node = json.loads(json_str) flattened_json = flatten_json_node(None, json_node) return json.dumps(flattened_json, ensure_ascii=False) except Exception as e: traceback.format_exc(e) return json_str def geo_hash(latitude: float, longitude: float, precision: int) -> str: return pygeohash.encode(latitude, longitude, precision) @udf(returnType=BooleanType()) def has_chinese(datum: str) -> bool: if datum: pattern = re.compile(u'[\u4e00-\u9fa5]') match = pattern.search(datum) if match: return True return False @udf(returnType=BooleanType()) def is_json(data) -> bool: try: json.loads(data) except: return False return True def json_array_subset(json_array_string: str, subset_fields: Union[List, str], as_list: bool = False, skip_null: bool = False) -> str: """ 获取json object array string的子集 Args: json_array_string: subset_fields: 子集字段 as_list: 如果子集字段只有1个,是否以list返回 skip_null: 字段的值是None,是否添加在返回的数据中 Returns: 子集数组的字符串 """ if not json_array_string: return None if not subset_fields: return None if isinstance(subset_fields, str): subset_field_list = subset_fields.split(',') else: subset_field_list = subset_fields if len(subset_field_list) == 0: return None try: json_array = json.loads(json_array_string) except: json_array = eval(json_array_string) list_subset = [] if len(subset_field_list) == 1 and as_list: only_subset_field = subset_field_list[0] for element in json_array: # type:Dict if isinstance(element, dict): field_value = element.get(only_subset_field) if field_value or not skip_null: list_subset.append(field_value) else: for element in json_array: # type:Dict subset_of_element = {} if isinstance(element, dict): for field in subset_field_list: field_value = element.get(field) if field_value or not skip_null: subset_of_element[field] = field_value list_subset.append(subset_of_element) return json.dumps(list_subset, ensure_ascii=False) @udf(returnType=ArrayType(StringType())) def json_object_keys(json_str: str) -> List[str]: if not json_str: return None try: json_dict = json.loads(json_str) # type:dict return [k for k in json_dict.keys()] except: return None def max_value(*args): maxi_value = None for elem in args: if not elem: continue if not maxi_value or elem > maxi_value: maxi_value = elem return maxi_value def min_value(*args): mini_value = None for elem in args: if not elem: continue if not mini_value or elem < mini_value: mini_value = elem return mini_value def millis_timestamp_to_str(ts: int, str_format: str = None) -> str: date_time = datetime.fromtimestamp(ts / 1000.0) if str_format: return date_time.strftime(str_format) return date_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] @udf(returnType=LongType()) def parse_datetime_to_timestamp(date_time: str, in_milli_seconds: bool = False, original_format: str = None) -> int: """ 把字符串表达的日期转为时间戳 Args: date_time: 日期 original_format: 原日期格式,不传则智能识别 in_milli_seconds: 是否返回毫秒 Returns: 转换后的日期 """ try: if date_time: d = date_time.split('.') if len(date_time) == 8 and len(d) == 3 and len(d[0]) == 2: date_time = '20' + date_time ret = re.match('(\d+)年(\d+)月(\d+)日', date_time) if ret: date_time = ret.group().replace('年', '-').replace('月', '-').replace('日', '') parsed_date = parse_datetime(date_time, original_format) if not parsed_date: return None if in_milli_seconds is True: return int(parsed_date.timestamp() * 1000) return int(parsed_date.timestamp()) except: try: date_time = int(date_time) # 当前时间小于传入的时间戳,认为是毫秒 if datetime.now().timestamp() < date_time: if in_milli_seconds is True: return date_time else: return int(date_time / 1000) else: if in_milli_seconds is True: return date_time * 1000 else: return date_time except Exception as e: return None @udf(returnType=FloatType()) def similarity(left: str, right: str) -> float: """ 计算两个字符串的相似度 Args: left: right: Returns: """ return difflib.SequenceMatcher(None, left, right).quick_ratio() def remove_empty_key(info): """ 删除json中value为空的key Returns: json """ json_info = json.loads(info) def internal_remove(json_info): try: if isinstance(json_info, dict): info_re = dict() for key, value in json_info.items(): if isinstance(value, dict) or isinstance(value, list): re = internal_remove(value) if len(re): info_re[key] = re elif value not in ['', {}, [], 'null', None]: info_re[key] = str(value) return info_re elif isinstance(json_info, list): info_re = list() for value in json_info: if isinstance(value, dict) or isinstance(value, list): re = internal_remove(value) if len(re): info_re.append(re) elif value not in ['', {}, [], 'null', None]: info_re.append(str(value)) return info_re else: return None except Exception as e: return None return json.dumps(internal_remove(json_info), ensure_ascii=False) @udf(returnType=ArrayType(StringType())) def regexp_extract_all(col: str, ptn: str, g: int = 0): return [e.group(g) for e in re.compile(ptn).finditer(col if col else '')] @udf(returnType=ArrayType(StringType())) def array_intersect(arr1, arr2): """ 计算两个数组的交集 :param arr1: :param arr2: :return: """ return list(set(arr1) & set(arr2)) def array_to_json(arr: List): """ 数组转为jsonstring :param arr: :return: """ return json.dumps(arr, ensure_ascii=False) def map_to_json(map: dict): """ map转为jsonstring """ return json.dumps(map, ensure_ascii=False) def struct_to_json(struct): json_dict = {key: struct[key] for key in struct.__dict__["__fields__"]} return json.dumps(json_dict, ensure_ascii=False) @udf(returnType=ArrayType(MapType(StringType(), StringType()))) def str_to_map_arr(json_str: str) -> list: if json_str: return json.loads(json_str) return [] def num_to_str(number): # 确保 number 是 float 类型 if isinstance(number, float) and number.is_integer(): return '{:.0f}'.format(number) else: return str(int(number)) if isinstance(number, int) else str(number) def space2null(text): if text and not text.isspace(): return text return None if __name__ == '__main__': cases = [ '', None, ' ', ' ', 'hello' ] for case in cases: print(space2null(case))