#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- import json from typing import List from pyspark.sql.functions import udf from pyspark.sql.types import * def array_to_json(arr: List): return json.dumps(arr, ensure_ascii=False) @udf(returnType=ArrayType(StringType())) def arr_str_to_arr(json_str: str) -> list: if json_str: return json.loads(json_str) return [] @udf(ArrayType(StringType())) def array_slice(input_array, start, end): if input_array: result_array = input_array[start:end] return result_array return [] @udf(ArrayType(StringType())) def str_to_json_arr(json_str): if json_str: try: str_arr = json.loads(json_str) if isinstance(str_arr, list): return [json.dumps(sm) for sm in str_arr] except json.JSONDecodeError: return [] return []