test_spark_common_udf.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import json
  2. from datetime import datetime
  3. from dw_base.udf.common import spark_common_udf as udf_module
  4. def test_json_object_keys_returns_keys_for_json_object():
  5. assert udf_module.json_object_keys.func('{"a": 1, "b": 2}') == ["a", "b"]
  6. def test_json_array_subset_supports_python_literal_without_eval():
  7. data = "[{'name': 'alice', 'age': 18}, {'name': 'bob', 'age': 20}]"
  8. result = udf_module.json_array_subset(data, "name", as_list=True)
  9. assert json.loads(result) == ["alice", "bob"]
  10. def test_json_array_subset_returns_none_for_invalid_input():
  11. assert udf_module.json_array_subset("not-json", "name") is None
  12. def test_append_to_json_array_returns_original_when_source_is_invalid_json():
  13. assert udf_module.append_to_json_array("not-json", "x") == "not-json"
  14. def test_append_to_json_array_can_remove_duplicates():
  15. result = udf_module.append_to_json_array('["a", "b"]', "a", remove_duplicate=True)
  16. assert json.loads(result) == ["a", "b"]
  17. def test_flatten_json_returns_original_text_for_invalid_json():
  18. assert udf_module.flatten_json("not-json") == "not-json"
  19. def test_remove_empty_key_removes_empty_values_recursively():
  20. source = json.dumps({
  21. "a": "",
  22. "b": None,
  23. "c": {"d": "", "e": 1},
  24. "f": ["", {"g": "x"}],
  25. })
  26. assert json.loads(udf_module.remove_empty_key(source)) == {"c": {"e": "1"}, "f": [{"g": "x"}]}
  27. def test_merge_list_keeps_existing_semantics():
  28. merged_list = sorted(udf_module.merge_list.func([["a", "", None], ["b", "a"], None]))
  29. assert merged_list == ["a", "b"]
  30. def test_array_intersect_returns_common_items():
  31. assert sorted(udf_module.array_intersect.func(["a", "b"], ["b", "c"])) == ["b"]
  32. def test_array_append_respects_existing_semantics():
  33. assert udf_module.array_append(["a"], "a", remove_duplicate=True) == ["a"]
  34. assert udf_module.array_append(["b"], "a", need_sort=True) == ["a", "b"]
  35. def test_array_slice_returns_sub_list():
  36. assert udf_module.array_slice.func(["a", "b", "c"], 1, 3) == ["b", "c"]
  37. def test_has_chinese_detects_chinese_characters():
  38. assert udf_module.has_chinese.func("abc中文") is True
  39. assert udf_module.has_chinese.func("abc") is False
  40. def test_similarity_returns_high_score_for_identical_strings():
  41. assert udf_module.similarity.func("abc", "abc") == 1.0
  42. def test_regexp_extract_all_extracts_all_matches():
  43. assert udf_module.regexp_extract_all.func("a1b22c333", r"\d+") == ["1", "22", "333"]
  44. def test_field_merge_deduplicates_values():
  45. assert udf_module.field_merge(",", " a ", "b", "a", None) == "a,b"
  46. def test_space2null_and_merge_ws_and_remove_special_char():
  47. assert udf_module.space2null(" ") is None
  48. assert udf_module.space2null(" a ") == " a "
  49. assert udf_module.merge_ws("a b\tc") == "a b c"
  50. assert udf_module.remove_special_char("abc,", ",") == "abc"
  51. def test_html_unescape_restores_html_entities():
  52. assert udf_module.html_unescape("&lt;div&gt;Tom &amp; Jerry&lt;/div&gt;") == "<div>Tom & Jerry</div>"
  53. def test_max_value_and_min_value_keep_existing_truthy_semantics():
  54. assert udf_module.max_value(None, 2, 1) == 2
  55. assert udf_module.min_value(None, 2, 1) == 1
  56. def test_millis_timestamp_to_str_formats_milliseconds():
  57. expected = datetime.fromtimestamp(0).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
  58. assert udf_module.millis_timestamp_to_str(0) == expected
  59. def test_parse_datetime_to_timestamp_supports_seconds_and_milliseconds():
  60. expected_seconds = int(datetime(2024, 1, 2, 3, 4, 5).timestamp())
  61. expected_milliseconds = expected_seconds * 1000
  62. assert udf_module.parse_datetime_to_timestamp.func("2024-01-02 03:04:05") == expected_seconds
  63. assert udf_module.parse_datetime_to_timestamp.func(str(expected_milliseconds)) == expected_seconds
  64. assert udf_module.parse_datetime_to_timestamp.func(str(expected_seconds), in_milli_seconds=True) == expected_milliseconds
  65. def test_get_md5_is_stable_for_same_inputs():
  66. assert udf_module.get_md5.func("ab", "cd") == udf_module.get_md5.func("ab", "cd")
  67. assert udf_module.get_md5.func(None) == ""
  68. def test_array_to_json_and_map_to_json_and_num_to_str():
  69. assert json.loads(udf_module.array_to_json(["a", 1])) == ["a", 1]
  70. assert json.loads(udf_module.map_to_json({"a": 1})) == {"a": 1}
  71. assert udf_module.num_to_str(1.0) == "1"
  72. assert udf_module.num_to_str(2) == "2"
  73. def test_str_to_arr_returns_empty_when_json_is_invalid():
  74. assert udf_module.str_to_arr.func("not-json") == []
  75. def test_str_to_json_arr_returns_json_strings():
  76. assert udf_module.str_to_json_arr.func('[{"a": 1}, {"b": 2}]') == ['{"a": 1}', '{"b": 2}']
  77. def test_str_to_map_arr_returns_empty_when_json_is_not_list():
  78. assert udf_module.str_to_map_arr.func('{"a": 1}') == []
  79. def test_is_json_handles_none():
  80. assert udf_module.is_json.func(None) is False