# -*- coding:utf-8 -*- import json from dw_base.ds.builders import ( SHELL_TASK_DEFAULTS, build_linear_relations, build_schedule_body, build_shell_task, build_workflow_body, ) # --- build_shell_task --- def test_shell_task_minimal(): """name + raw_script 最小调用,默认值齐全。""" t = build_shell_task('first', 'echo hi') assert t['name'] == 'first' assert t['taskParams']['rawScript'] == 'echo hi' assert t['taskParams']['localParams'] == [] assert t['taskParams']['resourceList'] == [] # 项目惯例默认值 assert t['taskType'] == 'SHELL' assert t['workerGroup'] == 'd1~d3' assert t['failRetryTimes'] == 0 assert t['timeoutFlag'] == 'CLOSE' assert t['environmentCode'] == -1 assert t['taskExecuteType'] == 'BATCH' # 不传 code / version 则不写入字典 assert 'code' not in t assert 'version' not in t def test_shell_task_with_file_param(): """file_param 注入 localParams。""" t = build_shell_task('foo', 'echo ${file}', file_param='jobs/raw/x.ini') assert t['taskParams']['localParams'] == [ {'prop': 'file', 'direct': 'IN', 'type': 'VARCHAR', 'value': 'jobs/raw/x.ini'} ] def test_shell_task_with_code_and_version(): """code 和 version 是 PUT 场景必传。""" t = build_shell_task('foo', 'echo', code=171234567890, version=2) assert t['code'] == 171234567890 assert t['version'] == 2 def test_shell_task_overrides_default(): """overrides 覆盖默认值(如改 workerGroup 或 timeout)。""" t = build_shell_task('foo', 'echo', workerGroup='custom-group', timeout=600, failRetryTimes=3) assert t['workerGroup'] == 'custom-group' assert t['timeout'] == 600 assert t['failRetryTimes'] == 3 def test_shell_task_overrides_does_not_mutate_defaults(): """重复调用不应共享 dict 引用污染 SHELL_TASK_DEFAULTS。""" t1 = build_shell_task('a', 'echo a', workerGroup='g1') t2 = build_shell_task('b', 'echo b') assert t1['workerGroup'] == 'g1' assert t2['workerGroup'] == 'd1~d3' assert SHELL_TASK_DEFAULTS['workerGroup'] == 'd1~d3' # --- build_linear_relations --- def test_relations_empty(): assert build_linear_relations([]) == [] def test_relations_single_task(): """单 task = 1 条入口 relation(preTaskCode=0)。""" rels = build_linear_relations([(100, 1)]) assert len(rels) == 1 assert rels[0]['preTaskCode'] == 0 assert rels[0]['preTaskVersion'] == 0 assert rels[0]['postTaskCode'] == 100 assert rels[0]['postTaskVersion'] == 1 assert rels[0]['conditionType'] == 'NONE' def test_relations_chain_two(): """双 task = 入口 + 串接 = 2 条 relation。""" rels = build_linear_relations([(100, 1), (200, 1)]) assert len(rels) == 2 assert rels[0]['postTaskCode'] == 100 assert rels[1]['preTaskCode'] == 100 assert rels[1]['preTaskVersion'] == 1 assert rels[1]['postTaskCode'] == 200 assert rels[1]['postTaskVersion'] == 1 def test_relations_chain_three_with_real_versions(): """三 task 链 + 不同 version 透传。""" rels = build_linear_relations([(100, 3), (200, 2), (300, 1)]) assert [r['preTaskCode'] for r in rels] == [0, 100, 200] assert [r['postTaskCode'] for r in rels] == [100, 200, 300] assert [r['preTaskVersion'] for r in rels] == [0, 3, 2] assert [r['postTaskVersion'] for r in rels] == [3, 2, 1] # --- build_workflow_body --- def test_workflow_body_basic(): tasks = [{'code': 100, 'name': 'first'}] rels = [{'postTaskCode': 100}] body = build_workflow_body('wf1', 'desc', tasks, rels) assert body['name'] == 'wf1' assert body['description'] == 'desc' assert body['globalParams'] == '[]' assert body['locations'] == '[]' assert body['timeout'] == 0 assert body['executionType'] == 'SERIAL_WAIT' # tasks / relations 序列化为 JSON 字符串 assert json.loads(body['taskDefinitionJson']) == tasks assert json.loads(body['taskRelationJson']) == rels def test_workflow_body_with_locations(): """locations 非空 list 序列化进 body。""" locs = [{'taskCode': 100, 'x': 380, 'y': 205}, {'taskCode': 200, 'x': 680, 'y': 205}] body = build_workflow_body('wf', '', [], [], locations=locs) assert json.loads(body['locations']) == locs def test_workflow_body_none_description(): """description=None 兜底为空串(DS 不接受 null)。""" body = build_workflow_body('wf', None, [], []) assert body['description'] == '' def test_workflow_body_overrides(): body = build_workflow_body('wf', 'd', [], [], execution_type='PARALLEL', timeout=3600, global_params='[{"prop":"x"}]') assert body['executionType'] == 'PARALLEL' assert body['timeout'] == 3600 assert body['globalParams'] == '[{"prop":"x"}]' def test_workflow_body_chinese_no_ascii_escape(): """中文 task 不应被 ensure_ascii 编码成 \\uXXXX。""" tasks = [{'name': '订单同步', 'rawScript': 'echo 你好'}] body = build_workflow_body('wf', '描述', tasks, []) assert '订单同步' in body['taskDefinitionJson'] assert '你好' in body['taskDefinitionJson'] # --- build_schedule_body --- def test_schedule_body_minimal(): body = build_schedule_body(123456, '0 0 6 * * ? *') assert body['workflowDefinitionCode'] == 123456 inner = json.loads(body['schedule']) assert inner['crontab'] == '0 0 6 * * ? *' assert inner['startTime'] == '2026-01-01 00:00:00' assert inner['endTime'] == '2126-01-01 00:00:00' assert inner['timezoneId'] == 'Asia/Shanghai' # 项目惯例默认值 assert body['workerGroup'] == 'd1~d3' assert body['tenantCode'] == 'bigdata' assert body['failureStrategy'] == 'END' assert body['warningType'] == 'NONE' assert body['warningGroupId'] == 0 assert body['workflowInstancePriority'] == 'MEDIUM' assert body['environmentCode'] == -1 def test_schedule_body_overrides(): body = build_schedule_body( 123, '0 0 8 * * ? *', start_time='2026-05-01 00:00:00', end_time='2027-01-01 00:00:00', timezone_id='UTC', worker_group='wg2', tenant='analytics', failure_strategy='CONTINUE', warning_type='EMAIL', warning_group_id=5, priority='HIGH', environment_code=10, ) inner = json.loads(body['schedule']) assert inner['startTime'] == '2026-05-01 00:00:00' assert inner['endTime'] == '2027-01-01 00:00:00' assert inner['timezoneId'] == 'UTC' assert body['workerGroup'] == 'wg2' assert body['tenantCode'] == 'analytics' assert body['failureStrategy'] == 'CONTINUE' assert body['warningType'] == 'EMAIL' assert body['warningGroupId'] == 5 assert body['workflowInstancePriority'] == 'HIGH' assert body['environmentCode'] == 10