| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- import difflib
- import json
- import random
- import re
- import traceback
- from datetime import datetime
- from typing import Union, List, Dict
- import pygeohash
- from pyspark.sql.functions import udf
- from pyspark.sql.types import StringType, ArrayType, BooleanType, FloatType, LongType, MapType
- from dw_base.utils.datetime_utils import parse_datetime
- def add_random_number_prefix(datum: str, separator: str, floor: int, ceiling: int) -> str:
- """
- 为字段添加随机数字前缀
- Args:
- datum:
- separator: 原数据与随机前缀的分隔符
- floor: 随机数字前缀下限
- ceiling: 随机数字前缀上限
- Returns:
- """
- return f'{random.randint(floor, ceiling)}{separator}{datum}'
- def append_to_json_array(json_array_string: str, new_element, remove_duplicate: bool = False) -> str:
- """
- 向JSON array添加元素
- Args:
- json_array_string: JSON array字符串
- new_element: 要添加的元素
- remove_duplicate: 是否去重
- Returns:
- """
- if not new_element:
- return json_array_string
- if not json_array_string:
- return json.dumps([new_element], ensure_ascii=False)
- json_array = json.loads(json_array_string) # type: list
- json_array.append(new_element)
- if remove_duplicate is True:
- result = []
- for elem in json_array:
- if result.__contains__(elem):
- continue
- result.append(elem)
- return json.dumps(result, ensure_ascii=False)
- return json.dumps(json_array, ensure_ascii=False)
- def array_append(array: List, new_element,
- ignore_null: bool = False,
- remove_duplicate: bool = False,
- need_sort: bool = False) -> List:
- if not array or len(array) == 0:
- if new_element or ignore_null is not True:
- return [new_element]
- return []
- if not new_element:
- if ignore_null is True:
- return array
- else:
- if array.__contains__(new_element) and remove_duplicate is True:
- return array
- array.append(new_element)
- if need_sort:
- array.sort()
- return array
- def field_merge(delimiter: str, *fields_values):
- """
- 两个字段合并,如果相同只取一个,不同用delimiter分隔
- Args:
- delimiter:
- *fields_values:
- Returns:
- """
- if not fields_values:
- return None
- result = []
- [result.append(value.strip()) for value in fields_values if value and value.strip() not in result]
- return delimiter.join(result)
- def flatten_json(json_str: str, reserve_parent: bool = True) -> str:
- """
- 展平json
- Args:
- json_str: 待展平的json
- reserve_parent: 是否保留父key,默认保留
- Returns:
- """
- def flatten_json_node(parent, json_element) -> Union[float, int, str, Dict, List]:
- if isinstance(json_element, dict):
- result = {}
- if parent and reserve_parent and reserve_parent is True:
- for key, value in json_element.items():
- result[f'{parent}.{key}'] = value
- else:
- for key, value in json_element.items():
- result.update(flatten_json_node(key, value))
- return result
- elif isinstance(json_element, list):
- result = []
- if parent and reserve_parent and reserve_parent is True:
- for index in range(len(json_element)):
- result.append(flatten_json_node(f'{parent}.[{index}]', json_element[index]))
- else:
- for index in range(len(json_element)):
- result.append(flatten_json_node(None, json_element[index]))
- return result
- else:
- return {parent: json_element}
- if not json_str:
- return json_str
- try:
- json_node = json.loads(json_str)
- flattened_json = flatten_json_node(None, json_node)
- return json.dumps(flattened_json, ensure_ascii=False)
- except Exception as e:
- traceback.format_exc(e)
- return json_str
- def geo_hash(latitude: float, longitude: float, precision: int) -> str:
- return pygeohash.encode(latitude, longitude, precision)
- @udf(returnType=BooleanType())
- def has_chinese(datum: str) -> bool:
- if datum:
- pattern = re.compile(u'[\u4e00-\u9fa5]')
- match = pattern.search(datum)
- if match:
- return True
- return False
- @udf(returnType=BooleanType())
- def is_json(data) -> bool:
- try:
- json.loads(data)
- except:
- return False
- return True
- def json_array_subset(json_array_string: str,
- subset_fields: Union[List, str],
- as_list: bool = False,
- skip_null: bool = False) -> str:
- """
- 获取json object array string的子集
- Args:
- json_array_string:
- subset_fields: 子集字段
- as_list: 如果子集字段只有1个,是否以list返回
- skip_null: 字段的值是None,是否添加在返回的数据中
- Returns: 子集数组的字符串
- """
- if not json_array_string:
- return None
- if not subset_fields:
- return None
- if isinstance(subset_fields, str):
- subset_field_list = subset_fields.split(',')
- else:
- subset_field_list = subset_fields
- if len(subset_field_list) == 0:
- return None
- try:
- json_array = json.loads(json_array_string)
- except:
- json_array = eval(json_array_string)
- list_subset = []
- if len(subset_field_list) == 1 and as_list:
- only_subset_field = subset_field_list[0]
- for element in json_array: # type:Dict
- if isinstance(element, dict):
- field_value = element.get(only_subset_field)
- if field_value or not skip_null:
- list_subset.append(field_value)
- else:
- for element in json_array: # type:Dict
- subset_of_element = {}
- if isinstance(element, dict):
- for field in subset_field_list:
- field_value = element.get(field)
- if field_value or not skip_null:
- subset_of_element[field] = field_value
- list_subset.append(subset_of_element)
- return json.dumps(list_subset, ensure_ascii=False)
- @udf(returnType=ArrayType(StringType()))
- def json_object_keys(json_str: str) -> List[str]:
- if not json_str:
- return None
- try:
- json_dict = json.loads(json_str) # type:dict
- return [k for k in json_dict.keys()]
- except:
- return None
- def max_value(*args):
- maxi_value = None
- for elem in args:
- if not elem:
- continue
- if not maxi_value or elem > maxi_value:
- maxi_value = elem
- return maxi_value
- def min_value(*args):
- mini_value = None
- for elem in args:
- if not elem:
- continue
- if not mini_value or elem < mini_value:
- mini_value = elem
- return mini_value
- def millis_timestamp_to_str(ts: int, str_format: str = None) -> str:
- date_time = datetime.fromtimestamp(ts / 1000.0)
- if str_format:
- return date_time.strftime(str_format)
- return date_time.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
- @udf(returnType=LongType())
- def parse_datetime_to_timestamp(date_time: str, in_milli_seconds: bool = False, original_format: str = None) -> int:
- """
- 把字符串表达的日期转为时间戳
- Args:
- date_time: 日期
- original_format: 原日期格式,不传则智能识别
- in_milli_seconds: 是否返回毫秒
- Returns:
- 转换后的日期
- """
- try:
- if date_time:
- d = date_time.split('.')
- if len(date_time) == 8 and len(d) == 3 and len(d[0]) == 2:
- date_time = '20' + date_time
- ret = re.match('(\d+)年(\d+)月(\d+)日', date_time)
- if ret:
- date_time = ret.group().replace('年', '-').replace('月', '-').replace('日', '')
- parsed_date = parse_datetime(date_time, original_format)
- if not parsed_date:
- return None
- if in_milli_seconds is True:
- return int(parsed_date.timestamp() * 1000)
- return int(parsed_date.timestamp())
- except:
- try:
- date_time = int(date_time)
- # 当前时间小于传入的时间戳,认为是毫秒
- if datetime.now().timestamp() < date_time:
- if in_milli_seconds is True:
- return date_time
- else:
- return int(date_time / 1000)
- else:
- if in_milli_seconds is True:
- return date_time * 1000
- else:
- return date_time
- except Exception as e:
- return None
- @udf(returnType=FloatType())
- def similarity(left: str, right: str) -> float:
- """
- 计算两个字符串的相似度
- Args:
- left:
- right:
- Returns:
- """
- return difflib.SequenceMatcher(None, left, right).quick_ratio()
- def remove_empty_key(info):
- """
- 删除json中value为空的key
- Returns: json
- """
- json_info = json.loads(info)
- def internal_remove(json_info):
- try:
- if isinstance(json_info, dict):
- info_re = dict()
- for key, value in json_info.items():
- if isinstance(value, dict) or isinstance(value, list):
- re = internal_remove(value)
- if len(re):
- info_re[key] = re
- elif value not in ['', {}, [], 'null', None]:
- info_re[key] = str(value)
- return info_re
- elif isinstance(json_info, list):
- info_re = list()
- for value in json_info:
- if isinstance(value, dict) or isinstance(value, list):
- re = internal_remove(value)
- if len(re):
- info_re.append(re)
- elif value not in ['', {}, [], 'null', None]:
- info_re.append(str(value))
- return info_re
- else:
- return None
- except Exception as e:
- return None
- return json.dumps(internal_remove(json_info), ensure_ascii=False)
- @udf(returnType=ArrayType(StringType()))
- def regexp_extract_all(col: str, ptn: str, g: int = 0):
- return [e.group(g) for e in re.compile(ptn).finditer(col if col else '')]
- @udf(returnType=ArrayType(StringType()))
- def array_intersect(arr1, arr2):
- """
- 计算两个数组的交集
- :param arr1:
- :param arr2:
- :return:
- """
- return list(set(arr1) & set(arr2))
- def array_to_json(arr: List):
- """
- 数组转为jsonstring
- :param arr:
- :return:
- """
- return json.dumps(arr, ensure_ascii=False)
- def map_to_json(map: dict):
- """
- map转为jsonstring
- """
- return json.dumps(map, ensure_ascii=False)
- def struct_to_json(struct):
- json_dict = {key: struct[key] for key in struct.__dict__["__fields__"]}
- return json.dumps(json_dict, ensure_ascii=False)
- @udf(returnType=ArrayType(MapType(StringType(), StringType())))
- def str_to_map_arr(json_str: str) -> list:
- if json_str:
- return json.loads(json_str)
- return []
- def num_to_str(number):
- # 确保 number 是 float 类型
- if isinstance(number, float) and number.is_integer():
- return '{:.0f}'.format(number)
- else:
- return str(int(number)) if isinstance(number, int) else str(number)
- def space2null(text):
- if text and not text.isspace():
- return text
- return None
- if __name__ == '__main__':
- cases = [
- '',
- None,
- ' ',
- ' ',
- 'hello'
- ]
- for case in cases:
- print(space2null(case))
|