test_partition.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. # -*- coding:utf-8 -*-
  2. import textwrap
  3. from pathlib import Path
  4. from unittest.mock import patch
  5. from dw_base.datax.partition import compute_partition_dt, execute_ddls, parse_ini_partition
  6. def _write_ini(tmp_path: Path, content: str) -> str:
  7. p = tmp_path / 'x.ini'
  8. p.write_text(textwrap.dedent(content))
  9. return str(p)
  10. def test_compute_partition_dt_stop_minus_1():
  11. assert compute_partition_dt('20260423') == '20260422'
  12. def test_compute_partition_dt_month_boundary():
  13. assert compute_partition_dt('20260401') == '20260331'
  14. def test_parse_partitioned_writer(tmp_path):
  15. ini = _write_ini(tmp_path, '''\
  16. [writer]
  17. dataSource = hdfs/prd-ha
  18. path = /user/hive/warehouse/test.db/raw_usr_app_user_cert_info_inc_d/dt=${dt}/
  19. ''')
  20. ddl = parse_ini_partition(ini, stop_date='20260423')
  21. assert ddl == ('ALTER TABLE test.raw_usr_app_user_cert_info_inc_d '
  22. 'ADD IF NOT EXISTS PARTITION(dt=20260422);')
  23. def test_parse_non_partitioned(tmp_path):
  24. ini = _write_ini(tmp_path, '''\
  25. [writer]
  26. dataSource = hdfs/xx
  27. path = /user/hive/warehouse/test.db/non_partitioned/
  28. ''')
  29. assert parse_ini_partition(ini, stop_date='20260423') is None
  30. def test_parse_no_writer_path(tmp_path):
  31. ini = _write_ini(tmp_path, '''\
  32. [writer]
  33. dataSource = mongo/xx
  34. ''')
  35. assert parse_ini_partition(ini, stop_date='20260423') is None
  36. def test_parse_dt_aligns_with_stop_minus_1_for_multiday(tmp_path):
  37. # 多日范围 start=20260401 stop=20260410:dt 应 = stop-1 = 20260409
  38. # 避免老脚本 START_DATE=dt 假设引入的多日范围分区错位
  39. ini = _write_ini(tmp_path, '''\
  40. [writer]
  41. path = /user/hive/warehouse/db1.db/t1/dt=${dt}/
  42. ''')
  43. ddl = parse_ini_partition(ini, stop_date='20260410')
  44. assert 'PARTITION(dt=20260409)' in ddl
  45. @patch('dw_base.datax.partition.subprocess.run')
  46. def test_execute_ddls_empty_is_noop(mock_run):
  47. execute_ddls([])
  48. mock_run.assert_not_called()