test_builders.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. # -*- coding:utf-8 -*-
  2. import json
  3. from dw_base.ds.builders import (
  4. SHELL_TASK_DEFAULTS,
  5. build_linear_relations,
  6. build_schedule_body,
  7. build_shell_task,
  8. build_workflow_body,
  9. )
  10. # --- build_shell_task ---
  11. def test_shell_task_minimal():
  12. """name + raw_script 最小调用,默认值齐全。"""
  13. t = build_shell_task('first', 'echo hi')
  14. assert t['name'] == 'first'
  15. assert t['taskParams']['rawScript'] == 'echo hi'
  16. assert t['taskParams']['localParams'] == []
  17. assert t['taskParams']['resourceList'] == []
  18. # 项目惯例默认值
  19. assert t['taskType'] == 'SHELL'
  20. assert t['workerGroup'] == 'd1~d3'
  21. assert t['failRetryTimes'] == 0
  22. assert t['timeoutFlag'] == 'CLOSE'
  23. assert t['environmentCode'] == -1
  24. assert t['taskExecuteType'] == 'BATCH'
  25. # 不传 code / version 则不写入字典
  26. assert 'code' not in t
  27. assert 'version' not in t
  28. def test_shell_task_with_file_param():
  29. """file_param 注入 localParams。"""
  30. t = build_shell_task('foo', 'echo ${file}', file_param='jobs/raw/x.ini')
  31. assert t['taskParams']['localParams'] == [
  32. {'prop': 'file', 'direct': 'IN', 'type': 'VARCHAR',
  33. 'value': 'jobs/raw/x.ini'}
  34. ]
  35. def test_shell_task_with_code_and_version():
  36. """code 和 version 是 PUT 场景必传。"""
  37. t = build_shell_task('foo', 'echo', code=171234567890, version=2)
  38. assert t['code'] == 171234567890
  39. assert t['version'] == 2
  40. def test_shell_task_overrides_default():
  41. """overrides 覆盖默认值(如改 workerGroup 或 timeout)。"""
  42. t = build_shell_task('foo', 'echo',
  43. workerGroup='custom-group',
  44. timeout=600,
  45. failRetryTimes=3)
  46. assert t['workerGroup'] == 'custom-group'
  47. assert t['timeout'] == 600
  48. assert t['failRetryTimes'] == 3
  49. def test_shell_task_overrides_does_not_mutate_defaults():
  50. """重复调用不应共享 dict 引用污染 SHELL_TASK_DEFAULTS。"""
  51. t1 = build_shell_task('a', 'echo a', workerGroup='g1')
  52. t2 = build_shell_task('b', 'echo b')
  53. assert t1['workerGroup'] == 'g1'
  54. assert t2['workerGroup'] == 'd1~d3'
  55. assert SHELL_TASK_DEFAULTS['workerGroup'] == 'd1~d3'
  56. # --- build_linear_relations ---
  57. def test_relations_empty():
  58. assert build_linear_relations([]) == []
  59. def test_relations_single_task():
  60. """单 task = 1 条入口 relation(preTaskCode=0)。"""
  61. rels = build_linear_relations([(100, 1)])
  62. assert len(rels) == 1
  63. assert rels[0]['preTaskCode'] == 0
  64. assert rels[0]['preTaskVersion'] == 0
  65. assert rels[0]['postTaskCode'] == 100
  66. assert rels[0]['postTaskVersion'] == 1
  67. assert rels[0]['conditionType'] == 'NONE'
  68. def test_relations_chain_two():
  69. """双 task = 入口 + 串接 = 2 条 relation。"""
  70. rels = build_linear_relations([(100, 1), (200, 1)])
  71. assert len(rels) == 2
  72. assert rels[0]['postTaskCode'] == 100
  73. assert rels[1]['preTaskCode'] == 100
  74. assert rels[1]['preTaskVersion'] == 1
  75. assert rels[1]['postTaskCode'] == 200
  76. assert rels[1]['postTaskVersion'] == 1
  77. def test_relations_chain_three_with_real_versions():
  78. """三 task 链 + 不同 version 透传。"""
  79. rels = build_linear_relations([(100, 3), (200, 2), (300, 1)])
  80. assert [r['preTaskCode'] for r in rels] == [0, 100, 200]
  81. assert [r['postTaskCode'] for r in rels] == [100, 200, 300]
  82. assert [r['preTaskVersion'] for r in rels] == [0, 3, 2]
  83. assert [r['postTaskVersion'] for r in rels] == [3, 2, 1]
  84. # --- build_workflow_body ---
  85. def test_workflow_body_basic():
  86. tasks = [{'code': 100, 'name': 'first'}]
  87. rels = [{'postTaskCode': 100}]
  88. body = build_workflow_body('wf1', 'desc', tasks, rels)
  89. assert body['name'] == 'wf1'
  90. assert body['description'] == 'desc'
  91. assert body['globalParams'] == '[]'
  92. assert body['locations'] == '[]'
  93. assert body['timeout'] == 0
  94. assert body['executionType'] == 'SERIAL_WAIT'
  95. # tasks / relations 序列化为 JSON 字符串
  96. assert json.loads(body['taskDefinitionJson']) == tasks
  97. assert json.loads(body['taskRelationJson']) == rels
  98. def test_workflow_body_with_locations():
  99. """locations 非空 list 序列化进 body。"""
  100. locs = [{'taskCode': 100, 'x': 380, 'y': 205},
  101. {'taskCode': 200, 'x': 680, 'y': 205}]
  102. body = build_workflow_body('wf', '', [], [], locations=locs)
  103. assert json.loads(body['locations']) == locs
  104. def test_workflow_body_none_description():
  105. """description=None 兜底为空串(DS 不接受 null)。"""
  106. body = build_workflow_body('wf', None, [], [])
  107. assert body['description'] == ''
  108. def test_workflow_body_overrides():
  109. body = build_workflow_body('wf', 'd', [], [],
  110. execution_type='PARALLEL',
  111. timeout=3600,
  112. global_params='[{"prop":"x"}]')
  113. assert body['executionType'] == 'PARALLEL'
  114. assert body['timeout'] == 3600
  115. assert body['globalParams'] == '[{"prop":"x"}]'
  116. def test_workflow_body_chinese_no_ascii_escape():
  117. """中文 task 不应被 ensure_ascii 编码成 \\uXXXX。"""
  118. tasks = [{'name': '订单同步', 'rawScript': 'echo 你好'}]
  119. body = build_workflow_body('wf', '描述', tasks, [])
  120. assert '订单同步' in body['taskDefinitionJson']
  121. assert '你好' in body['taskDefinitionJson']
  122. # --- build_schedule_body ---
  123. def test_schedule_body_minimal():
  124. body = build_schedule_body(123456, '0 0 6 * * ? *')
  125. assert body['workflowDefinitionCode'] == 123456
  126. inner = json.loads(body['schedule'])
  127. assert inner['crontab'] == '0 0 6 * * ? *'
  128. assert inner['startTime'] == '2026-01-01 00:00:00'
  129. assert inner['endTime'] == '2126-01-01 00:00:00'
  130. assert inner['timezoneId'] == 'Asia/Shanghai'
  131. # 项目惯例默认值
  132. assert body['workerGroup'] == 'd1~d3'
  133. assert body['tenantCode'] == 'bigdata'
  134. assert body['failureStrategy'] == 'END'
  135. assert body['warningType'] == 'NONE'
  136. assert body['warningGroupId'] == 0
  137. assert body['workflowInstancePriority'] == 'MEDIUM'
  138. assert body['environmentCode'] == -1
  139. def test_schedule_body_overrides():
  140. body = build_schedule_body(
  141. 123, '0 0 8 * * ? *',
  142. start_time='2026-05-01 00:00:00',
  143. end_time='2027-01-01 00:00:00',
  144. timezone_id='UTC',
  145. worker_group='wg2',
  146. tenant='analytics',
  147. failure_strategy='CONTINUE',
  148. warning_type='EMAIL',
  149. warning_group_id=5,
  150. priority='HIGH',
  151. environment_code=10,
  152. )
  153. inner = json.loads(body['schedule'])
  154. assert inner['startTime'] == '2026-05-01 00:00:00'
  155. assert inner['endTime'] == '2027-01-01 00:00:00'
  156. assert inner['timezoneId'] == 'UTC'
  157. assert body['workerGroup'] == 'wg2'
  158. assert body['tenantCode'] == 'analytics'
  159. assert body['failureStrategy'] == 'CONTINUE'
  160. assert body['warningType'] == 'EMAIL'
  161. assert body['warningGroupId'] == 5
  162. assert body['workflowInstancePriority'] == 'HIGH'
  163. assert body['environmentCode'] == 10