| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 |
- # -*- coding:utf-8 -*-
- import os
- from dw_base.datax.path_utils import job_name_from_ini, json_output_path, log_path
- def test_job_name_from_ini_absolute():
- assert job_name_from_ini(
- '/home/bigdata/release/poyee-data-warehouse/tests/integration/datax/hive_import/app_user_cert_info.ini'
- ) == 'app_user_cert_info'
- def test_job_name_from_ini_relative():
- assert job_name_from_ini('jobs/raw/usr/xxx.ini') == 'xxx'
- def test_job_name_from_ini_no_ext():
- assert job_name_from_ini('xxx') == 'xxx'
- def test_json_output_path_flat():
- out = json_output_path(
- '/opt/release/poyee-data-warehouse',
- 'tests/integration/datax/hive_import/app_user_cert_info.ini',
- )
- assert os.path.normpath(out) == os.path.normpath(
- '/opt/release/poyee-data-warehouse/conf/datax-json/app_user_cert_info.json'
- )
- def test_json_output_path_with_dates():
- out = json_output_path(
- '/opt/release/poyee-data-warehouse',
- 'tests/integration/datax/hive_import/app_user_cert_info.ini',
- '20260311', '20260312',
- )
- assert os.path.normpath(out) == os.path.normpath(
- '/opt/release/poyee-data-warehouse/conf/datax-json/app_user_cert_info_20260311_20260312.json'
- )
- def test_json_output_path_only_start_falls_back():
- # 只传 start 不传 stop(半填),按文档约定退回无后缀格式(任一为空都视作缺省)
- out = json_output_path(
- '/opt/release/poyee-data-warehouse',
- 'tests/integration/datax/hive_import/app_user_cert_info.ini',
- '20260311', None,
- )
- assert os.path.normpath(out) == os.path.normpath(
- '/opt/release/poyee-data-warehouse/conf/datax-json/app_user_cert_info.json'
- )
- def test_log_path_template():
- out = log_path('/home/bigdata/log', 'datax', '20260422', 'app_user_cert_info')
- assert os.path.normpath(out) == os.path.normpath(
- '/home/bigdata/log/datax/20260422/app_user_cert_info.log'
- )
|