| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- # -*- coding:utf-8 -*-
- import json
- from dw_base.ds.builders import (
- SHELL_TASK_DEFAULTS,
- build_linear_relations,
- build_schedule_body,
- build_shell_task,
- build_workflow_body,
- )
- # --- build_shell_task ---
- def test_shell_task_minimal():
- """name + raw_script 最小调用,默认值齐全。"""
- t = build_shell_task('first', 'echo hi')
- assert t['name'] == 'first'
- assert t['taskParams']['rawScript'] == 'echo hi'
- assert t['taskParams']['localParams'] == []
- assert t['taskParams']['resourceList'] == []
- # 项目惯例默认值
- assert t['taskType'] == 'SHELL'
- assert t['workerGroup'] == 'd1~d3'
- assert t['failRetryTimes'] == 0
- assert t['timeoutFlag'] == 'CLOSE'
- assert t['environmentCode'] == -1
- assert t['taskExecuteType'] == 'BATCH'
- # 不传 code / version 则不写入字典
- assert 'code' not in t
- assert 'version' not in t
- def test_shell_task_with_file_param():
- """file_param 注入 localParams。"""
- t = build_shell_task('foo', 'echo ${file}', file_param='jobs/raw/x.ini')
- assert t['taskParams']['localParams'] == [
- {'prop': 'file', 'direct': 'IN', 'type': 'VARCHAR',
- 'value': 'jobs/raw/x.ini'}
- ]
- def test_shell_task_with_code_and_version():
- """code 和 version 是 PUT 场景必传。"""
- t = build_shell_task('foo', 'echo', code=171234567890, version=2)
- assert t['code'] == 171234567890
- assert t['version'] == 2
- def test_shell_task_overrides_default():
- """overrides 覆盖默认值(如改 workerGroup 或 timeout)。"""
- t = build_shell_task('foo', 'echo',
- workerGroup='custom-group',
- timeout=600,
- failRetryTimes=3)
- assert t['workerGroup'] == 'custom-group'
- assert t['timeout'] == 600
- assert t['failRetryTimes'] == 3
- def test_shell_task_overrides_does_not_mutate_defaults():
- """重复调用不应共享 dict 引用污染 SHELL_TASK_DEFAULTS。"""
- t1 = build_shell_task('a', 'echo a', workerGroup='g1')
- t2 = build_shell_task('b', 'echo b')
- assert t1['workerGroup'] == 'g1'
- assert t2['workerGroup'] == 'd1~d3'
- assert SHELL_TASK_DEFAULTS['workerGroup'] == 'd1~d3'
- # --- build_linear_relations ---
- def test_relations_empty():
- assert build_linear_relations([]) == []
- def test_relations_single_task():
- """单 task = 1 条入口 relation(preTaskCode=0)。"""
- rels = build_linear_relations([(100, 1)])
- assert len(rels) == 1
- assert rels[0]['preTaskCode'] == 0
- assert rels[0]['preTaskVersion'] == 0
- assert rels[0]['postTaskCode'] == 100
- assert rels[0]['postTaskVersion'] == 1
- assert rels[0]['conditionType'] == 'NONE'
- def test_relations_chain_two():
- """双 task = 入口 + 串接 = 2 条 relation。"""
- rels = build_linear_relations([(100, 1), (200, 1)])
- assert len(rels) == 2
- assert rels[0]['postTaskCode'] == 100
- assert rels[1]['preTaskCode'] == 100
- assert rels[1]['preTaskVersion'] == 1
- assert rels[1]['postTaskCode'] == 200
- assert rels[1]['postTaskVersion'] == 1
- def test_relations_chain_three_with_real_versions():
- """三 task 链 + 不同 version 透传。"""
- rels = build_linear_relations([(100, 3), (200, 2), (300, 1)])
- assert [r['preTaskCode'] for r in rels] == [0, 100, 200]
- assert [r['postTaskCode'] for r in rels] == [100, 200, 300]
- assert [r['preTaskVersion'] for r in rels] == [0, 3, 2]
- assert [r['postTaskVersion'] for r in rels] == [3, 2, 1]
- # --- build_workflow_body ---
- def test_workflow_body_basic():
- tasks = [{'code': 100, 'name': 'first'}]
- rels = [{'postTaskCode': 100}]
- body = build_workflow_body('wf1', 'desc', tasks, rels)
- assert body['name'] == 'wf1'
- assert body['description'] == 'desc'
- assert body['globalParams'] == '[]'
- assert body['locations'] == '[]'
- assert body['timeout'] == 0
- assert body['executionType'] == 'SERIAL_WAIT'
- # tasks / relations 序列化为 JSON 字符串
- assert json.loads(body['taskDefinitionJson']) == tasks
- assert json.loads(body['taskRelationJson']) == rels
- def test_workflow_body_with_locations():
- """locations 非空 list 序列化进 body。"""
- locs = [{'taskCode': 100, 'x': 380, 'y': 205},
- {'taskCode': 200, 'x': 680, 'y': 205}]
- body = build_workflow_body('wf', '', [], [], locations=locs)
- assert json.loads(body['locations']) == locs
- def test_workflow_body_none_description():
- """description=None 兜底为空串(DS 不接受 null)。"""
- body = build_workflow_body('wf', None, [], [])
- assert body['description'] == ''
- def test_workflow_body_overrides():
- body = build_workflow_body('wf', 'd', [], [],
- execution_type='PARALLEL',
- timeout=3600,
- global_params='[{"prop":"x"}]')
- assert body['executionType'] == 'PARALLEL'
- assert body['timeout'] == 3600
- assert body['globalParams'] == '[{"prop":"x"}]'
- def test_workflow_body_chinese_no_ascii_escape():
- """中文 task 不应被 ensure_ascii 编码成 \\uXXXX。"""
- tasks = [{'name': '订单同步', 'rawScript': 'echo 你好'}]
- body = build_workflow_body('wf', '描述', tasks, [])
- assert '订单同步' in body['taskDefinitionJson']
- assert '你好' in body['taskDefinitionJson']
- # --- build_schedule_body ---
- def test_schedule_body_minimal():
- body = build_schedule_body(123456, '0 0 6 * * ? *')
- assert body['workflowDefinitionCode'] == 123456
- inner = json.loads(body['schedule'])
- assert inner['crontab'] == '0 0 6 * * ? *'
- assert inner['startTime'] == '2026-01-01 00:00:00'
- assert inner['endTime'] == '2126-01-01 00:00:00'
- assert inner['timezoneId'] == 'Asia/Shanghai'
- # 项目惯例默认值
- assert body['workerGroup'] == 'd1~d3'
- assert body['tenantCode'] == 'bigdata'
- assert body['failureStrategy'] == 'END'
- assert body['warningType'] == 'NONE'
- assert body['warningGroupId'] == 0
- assert body['workflowInstancePriority'] == 'MEDIUM'
- assert body['environmentCode'] == -1
- def test_schedule_body_overrides():
- body = build_schedule_body(
- 123, '0 0 8 * * ? *',
- start_time='2026-05-01 00:00:00',
- end_time='2027-01-01 00:00:00',
- timezone_id='UTC',
- worker_group='wg2',
- tenant='analytics',
- failure_strategy='CONTINUE',
- warning_type='EMAIL',
- warning_group_id=5,
- priority='HIGH',
- environment_code=10,
- )
- inner = json.loads(body['schedule'])
- assert inner['startTime'] == '2026-05-01 00:00:00'
- assert inner['endTime'] == '2027-01-01 00:00:00'
- assert inner['timezoneId'] == 'UTC'
- assert body['workerGroup'] == 'wg2'
- assert body['tenantCode'] == 'analytics'
- assert body['failureStrategy'] == 'CONTINUE'
- assert body['warningType'] == 'EMAIL'
- assert body['warningGroupId'] == 5
- assert body['workflowInstancePriority'] == 'HIGH'
- assert body['environmentCode'] == 10
|