spark_mmq_udf.py 885 B

1234567891011121314151617181920212223242526272829303132333435363738
  1. #!/usr/bin/env /usr/bin/python3
  2. # -*- coding:utf-8 -*-
  3. import json
  4. from typing import List
  5. from pyspark.sql.functions import udf
  6. from pyspark.sql.types import *
  7. def array_to_json(arr: List):
  8. return json.dumps(arr, ensure_ascii=False)
  9. @udf(returnType=ArrayType(StringType()))
  10. def arr_str_to_arr(json_str: str) -> list:
  11. if json_str:
  12. return json.loads(json_str)
  13. return []
  14. @udf(ArrayType(StringType()))
  15. def array_slice(input_array, start, end):
  16. if input_array:
  17. result_array = input_array[start:end]
  18. return result_array
  19. return []
  20. @udf(ArrayType(StringType()))
  21. def str_to_json_arr(json_str):
  22. if json_str:
  23. try:
  24. str_arr = json.loads(json_str)
  25. if isinstance(str_arr, list):
  26. return [json.dumps(sm) for sm in str_arr]
  27. except json.JSONDecodeError:
  28. return []
  29. return []