| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- import json
- from datetime import datetime
- from dw_base.udf.common import spark_common_udf as udf_module
- def test_json_object_keys_returns_keys_for_json_object():
- assert udf_module.json_object_keys.func('{"a": 1, "b": 2}') == ["a", "b"]
- def test_json_array_subset_supports_python_literal_without_eval():
- data = "[{'name': 'alice', 'age': 18}, {'name': 'bob', 'age': 20}]"
- result = udf_module.json_array_subset(data, "name", as_list=True)
- assert json.loads(result) == ["alice", "bob"]
- def test_json_array_subset_returns_none_for_invalid_input():
- assert udf_module.json_array_subset("not-json", "name") is None
- def test_append_to_json_array_returns_original_when_source_is_invalid_json():
- assert udf_module.append_to_json_array("not-json", "x") == "not-json"
- def test_append_to_json_array_can_remove_duplicates():
- result = udf_module.append_to_json_array('["a", "b"]', "a", remove_duplicate=True)
- assert json.loads(result) == ["a", "b"]
- def test_flatten_json_returns_original_text_for_invalid_json():
- assert udf_module.flatten_json("not-json") == "not-json"
- def test_remove_empty_key_removes_empty_values_recursively():
- source = json.dumps({
- "a": "",
- "b": None,
- "c": {"d": "", "e": 1},
- "f": ["", {"g": "x"}],
- })
- assert json.loads(udf_module.remove_empty_key(source)) == {"c": {"e": "1"}, "f": [{"g": "x"}]}
- def test_merge_list_keeps_existing_semantics():
- merged_list = sorted(udf_module.merge_list.func([["a", "", None], ["b", "a"], None]))
- assert merged_list == ["a", "b"]
- def test_array_intersect_returns_common_items():
- assert sorted(udf_module.array_intersect.func(["a", "b"], ["b", "c"])) == ["b"]
- def test_array_append_respects_existing_semantics():
- assert udf_module.array_append(["a"], "a", remove_duplicate=True) == ["a"]
- assert udf_module.array_append(["b"], "a", need_sort=True) == ["a", "b"]
- def test_array_slice_returns_sub_list():
- assert udf_module.array_slice.func(["a", "b", "c"], 1, 3) == ["b", "c"]
- def test_has_chinese_detects_chinese_characters():
- assert udf_module.has_chinese.func("abc中文") is True
- assert udf_module.has_chinese.func("abc") is False
- def test_similarity_returns_high_score_for_identical_strings():
- assert udf_module.similarity.func("abc", "abc") == 1.0
- def test_regexp_extract_all_extracts_all_matches():
- assert udf_module.regexp_extract_all.func("a1b22c333", r"\d+") == ["1", "22", "333"]
- def test_field_merge_deduplicates_values():
- assert udf_module.field_merge(",", " a ", "b", "a", None) == "a,b"
- def test_space2null_and_merge_ws_and_remove_special_char():
- assert udf_module.space2null(" ") is None
- assert udf_module.space2null(" a ") == " a "
- assert udf_module.merge_ws("a b\tc") == "a b c"
- assert udf_module.remove_special_char("abc,", ",") == "abc"
- def test_html_unescape_restores_html_entities():
- assert udf_module.html_unescape("<div>Tom & Jerry</div>") == "<div>Tom & Jerry</div>"
- def test_max_value_and_min_value_keep_existing_truthy_semantics():
- assert udf_module.max_value(None, 2, 1) == 2
- assert udf_module.min_value(None, 2, 1) == 1
- def test_millis_timestamp_to_str_formats_milliseconds():
- expected = datetime.fromtimestamp(0).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
- assert udf_module.millis_timestamp_to_str(0) == expected
- def test_parse_datetime_to_timestamp_supports_seconds_and_milliseconds():
- expected_seconds = int(datetime(2024, 1, 2, 3, 4, 5).timestamp())
- expected_milliseconds = expected_seconds * 1000
- assert udf_module.parse_datetime_to_timestamp.func("2024-01-02 03:04:05") == expected_seconds
- assert udf_module.parse_datetime_to_timestamp.func(str(expected_milliseconds)) == expected_seconds
- assert udf_module.parse_datetime_to_timestamp.func(str(expected_seconds), in_milli_seconds=True) == expected_milliseconds
- def test_get_md5_is_stable_for_same_inputs():
- assert udf_module.get_md5.func("ab", "cd") == udf_module.get_md5.func("ab", "cd")
- assert udf_module.get_md5.func(None) == ""
- def test_array_to_json_and_map_to_json_and_num_to_str():
- assert json.loads(udf_module.array_to_json(["a", 1])) == ["a", 1]
- assert json.loads(udf_module.map_to_json({"a": 1})) == {"a": 1}
- assert udf_module.num_to_str(1.0) == "1"
- assert udf_module.num_to_str(2) == "2"
- def test_str_to_arr_returns_empty_when_json_is_invalid():
- assert udf_module.str_to_arr.func("not-json") == []
- def test_str_to_json_arr_returns_json_strings():
- assert udf_module.str_to_json_arr.func('[{"a": 1}, {"b": 2}]') == ['{"a": 1}', '{"b": 2}']
- def test_str_to_map_arr_returns_empty_when_json_is_not_list():
- assert udf_module.str_to_map_arr.func('{"a": 1}') == []
- def test_is_json_handles_none():
- assert udf_module.is_json.func(None) is False
|