Kaynağa Gözat

chore(bin): 删老 DataX 入口 7 文件 + kb 收官

删除:
- bin/datax-{single,multiple,multiple-hive}-job-starter.{sh,py}
- bin/datax-job-config-generator.py
新入口 bin/datax-{hive-import,hdfs-export}-starter.{sh,py} 全面接管
runner 已走 dw_base.datax.cli gen-json,不再依赖 bin shim
kb/90 §2.6 标 ✅(retention 规则保留上下文)
kb/91 §4.3 老脚本使用说明整段删、保留 §4.4 老分发档案给 kb/93 ADR-02 做背景
kb/92 changelog 追加批次 5 收官条目

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 1 hafta önce
ebeveyn
işleme
f6e26ea621

+ 0 - 142
bin/datax-job-config-generator.py

@@ -1,142 +0,0 @@
-#!/usr/bin/env /usr/bin/python3
-# -*- coding:utf-8 -*-
-"""
-  读取定义在`conf/datax/config/${源类型}-${目标类型}/${项目}-${分层}-${Hive环境}[/数据库环境[/数据分组]]/${源类型}-${目标类型}-${源库名称}-${源表名称}.ini`中的配置,
-  以及在上述配置中定义、存储于`conf/datax/datasource/${ds-type}/${project}-${layer}-${env}`的`${ds-type}-${ds-name}.ini`中的
-  `reader`和`writer`,生成DataX作业配置文件。
-  若未提供`-output`来指定路径存储生成的DataX作业配置文件,则会默认将生成的DataX作业配置文件存储于`conf/datax/generated`中。
-"""
-import os
-import sys
-
-project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-sys.path.append(project_root_dir)
-from dw_base import DO_RESET, NORM_CYN, NORM_GRN, NORM_MGT, NORM_RED, NORM_YEL
-from dw_base.common.config_constants import K_CONFIG_FILE, K_DIRECTORY, KL_HELP
-from dw_base.datax.job_config_generator import JobConfigGenerator
-from dw_base.utils.common_utils import exist
-from dw_base.utils.config_utils import parse_args
-from dw_base.utils.datetime_utils import get_yesterday, get_today
-from dw_base.utils.file_utils import list_files, get_abs_path
-from dw_base.utils.log_utils import pretty_print
-
-
-def usage(code: int):
-    print(
-        f'{NORM_MGT}Usage: {sys.argv[0]}\n'
-        f'{NORM_CYN}\t[-h/-H/--h/--H/--help]           打印脚本使用方法{DO_RESET}'
-    )
-    print(
-        f'{NORM_MGT}Usage: {sys.argv[0]}\n'
-        f'{NORM_GRN}\t<[-]-c< /=>job config>           DataX作业配置生成器配置文件路径,可以多次传入-c/--c或以逗号分隔的形式\n'
-        f'{NORM_GRN}\t                                 传入多个,传多个时参数start-date和stop-date共同使用\n'
-        f'{NORM_GRN}\t<[-]-d< /=>job config directory> 扫描指定路径下所有DataX作业配置生成器配置\n'
-        f'{NORM_GRN}\t                                 可以多次传入-d/--d或以逗号分隔的形式传入多个,优先级低于-c/--c\n'
-        f'{NORM_GRN}\t<[-]-r>                          递归扫描指定路径下所有DataX作业配置生成器配置,与-d/--d配合使用\n'
-        f'{NORM_CYN}\t[[-]-start-date< /=>start date]  yyyyMMdd[-/-yyyyMMdd]格式表达的日期(或日期范围)\n'
-        f'{NORM_CYN}\t[[-]-stop-date< /=>stop date]    yyyyMMdd[-/-yyyyMMdd]格式表达的日期(或日期范围)\n'
-        f'{NORM_CYN}\t[[-]-o< /=>output path]          DataX作业配置输出文件夹(绝对路径)'
-        f'{DO_RESET}'
-    )
-    exit(code)
-
-
-def collect_generate_config_files():
-    generator_config_files = set()
-    if CONFIG.__contains__(K_CONFIG_FILE):
-        # 传递了`DataX作业配置生成器配置.ini文件`
-        generator_config = CONFIG.get(K_CONFIG_FILE)
-        pretty_print(f'{NORM_MGT}使用“DataX作业配置文件生成器”配置文件')
-        if isinstance(generator_config, list):
-            # 以列表形式提供的
-            for c in generator_config:
-                generator_config_files.add(get_abs_path(c, check_exist=False))
-        else:
-            # 以逗号分隔形式提供的
-            for c in generator_config.split(','):
-                generator_config_files.add(get_abs_path(c, check_exist=False))
-    elif CONFIG.__contains__(K_DIRECTORY):
-        # 传递了含有`DataX作业配置生成器配置.ini文件`的目录
-        pretty_print(f'{NORM_MGT}使用“DataX作业配置文件生成器”配置文件目录')
-        recursive = CONFIG.get('r', False)
-        generator_config_dirs = []
-        if isinstance(CONFIG.get(K_DIRECTORY), str):
-            dir_conf = CONFIG.get(K_DIRECTORY).split(',')  # type:list
-            # 以逗号分隔的多个目录
-        else:
-            # 目录列表
-            dir_conf = CONFIG.get(K_DIRECTORY)  # type:list
-        for d in dir_conf:
-            d = get_abs_path(d, check_exist=False)
-            if not os.path.exists(d):
-                pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置文件目录 {NORM_GRN}{d}{NORM_YEL} 不存在')
-                raise FileNotFoundError(d)
-            if not os.path.isdir(d):
-                pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置文件目录 {NORM_GRN}{d}{NORM_YEL} 存在,但不是目录')
-                raise NotADirectoryError(d)
-            generator_config_dirs.append(d)
-        for each_dir in generator_config_dirs:
-            # 递归处理目录
-            files = list_files(each_dir, recursive)
-            for file in files:
-                generator_config_files.add(file)
-    return generator_config_files
-
-
-if __name__ == '__main__':
-    pretty_print(f'{NORM_MGT}{sys.argv[0]} 收到参数:{NORM_GRN}{" ".join(sys.argv[1:])}')
-    CONFIG, _ = parse_args(sys.argv[1:])
-    # 未提供任何参数或查看帮助
-    if len(sys.argv) == 1 or exist(CONFIG, KL_HELP):
-        usage(0)
-    # DataX作业配置生成器配置文件列表
-    generator_config_files = collect_generate_config_files()
-    if len(generator_config_files) == 0:
-        pretty_print(f'{NORM_RED}未找到任何有效的“DataX作业配置文件生成器”配置文件')
-        exit(1)
-    # 检查所有文件是否都存在
-    for gcf in generator_config_files:
-        short_name = gcf.replace(f'{project_root_dir}/', '')
-        try:
-            if not os.path.exists(gcf):
-                pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置 {NORM_GRN}{gcf}{NORM_YEL} 不存在')
-                raise FileNotFoundError(gcf)
-            elif not os.path.isfile(gcf):
-                pretty_print(f'{NORM_YEL}“DataX作业配置文件生成器”配置 {NORM_GRN}{gcf}{NORM_YEL} 存在,但不是文件')
-                raise IsADirectoryError(gcf)
-        except Exception as e:
-            pretty_print(f'使用配置文件 {short_name} 生成DataX作业配置文件(.json)失败')
-            raise e
-    # 开始生成`DataX作业配置文件`
-    start_date = CONFIG.get('start-date', get_yesterday())
-    stop_date = CONFIG.get('stop-date', get_today())
-    for gcf in generator_config_files:
-        # gcf应形如:${project_base_dir}/conf/datax/config/${src-type}-${dst-type}/${project}-${layer}-${env}/${src-type}-${dst-type}-${src-name}.ini
-        # ${project}-${layer}-${env}
-        temp = os.path.dirname(gcf).replace(f'{project_root_dir}/', '').replace(f'conf/datax/config/', '').split('/')
-        src_dst = temp[0]
-        if len(temp) > 1:
-            project_layer_env = temp[1]
-        else:
-            project_layer_env = 'default'
-        # project_layer_env = os.path.basename(path.dirname(gcf))
-        # src_dst = os.path.basename(path.dirname(path.dirname(gcf)))
-        # 默认输出(绝对)路径
-        default_output_dir = f'{project_root_dir}/conf/datax/generated'
-        job_config_name = os.path.basename(gcf).replace('.ini', '.json')
-
-        # 指定的输出路径,可以是绝对路径,也可以是相对(项目根目录)的路径
-        output_path_arr = ([CONFIG.get("o", default_output_dir), src_dst, project_layer_env] + temp[2:]
-                           + [job_config_name])
-        output = '/'.join(output_path_arr)
-
-        os.system(f'mkdir -p {os.path.dirname(output)}')
-        try:
-            pretty_print(f'{NORM_MGT}开始使用 {NORM_GRN}{gcf}{NORM_MGT} 生成DataX作业配置文件')
-            job_config_generator = JobConfigGenerator(project_root_dir, gcf, start_date, stop_date, output)
-            job_config_generator.run()
-            pretty_print(f'{NORM_MGT}DataX作业配置文件 {NORM_GRN}{output}{NORM_MGT} 生成成功')
-        except Exception as e:
-            pretty_print(f'使用配置文件 {gcf} 生成DataX作业配置文件(.json)失败')
-            pretty_print(f'{NORM_MGT}使用配置文件 ${NORM_GRN}{gcf} {NORM_MGT}生成DataX作业配置文件失败')
-            raise e

+ 0 - 13
bin/datax-multiple-hive-job-starter.py

@@ -1,13 +0,0 @@
-#!/usr/bin/env /usr/bin/python3
-# -*- coding:utf-8 -*-
-"""
-  Note:为方便本地调试设计,请勿在调度中使用
-"""
-import os
-import sys
-
-project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-sys.path.append(project_root_dir)
-
-if __name__ == '__main__':
-    os.system(f'{project_root_dir}/bin/datax-multiple-hive-job-starter.sh {" ".join(sys.argv[1:])}')

+ 0 - 262
bin/datax-multiple-hive-job-starter.sh

@@ -1,262 +0,0 @@
-#!/bin/bash
-#--------------------------------------------------------------------------------------------------
-# 分布式并行启动多个DataX MySQL-Hive作业
-# 1. 可以同时通过4种方式来指定作业,但作业中是否有重复的配置,需要开发者来判断
-# 2. 可以传递--override来覆盖脚本内的所有配置,重新传递要执行的作业(方便单独跑失败的作业)
-# 3. 运行模式:本机串行、随机串行(意义不大)、本机并行(默认模式)、随机并行
-#--------------------------------------------------------------------------------------------------
-set -e
-BASE_DIR=$(
-  cd "$(dirname "$(realpath "$0")")/.." || exit
-  pwd
-)
-. "${BASE_DIR}"/bin/common/init.sh
-function usage() {
-  echo -e "${NORM_MGT}Usage: $0
-  ${NORM_CYN}\t[-h/-H/--h/--H/--help]                打印脚本使用方法${DO_RESET}"
-  echo -e "${NORM_MGT}Usage: $0
-  ${NORM_CYN}\t[--override]                          如果出现override,则只执行传入的配置,文件里定义的配置被忽略
-  ${NORM_CYN}\t[-t< /=>table]                        需要建分区的表
-  ${NORM_CYN}\t[-jc< /=>job config]                  DataX作业配置文件(json)
-  ${NORM_CYN}\t[-jcd< /=>job config directory]       DataX作业配置文件(json)夹
-  ${NORM_CYN}\t[-gc< /=>generator config]            DataX作业配置文件生成器的配置文件(ini)
-  ${NORM_CYN}\t[-gcd< /=>generator config directory] DataX作业配置文件生成器的配置文件(ini)夹
-  ${NORM_CYN}\t[-start-date< /=>start date]          开始日期(用以筛选数据)
-  ${NORM_CYN}\t[-stop-date< /=>stop date]            结束日期(用以筛选数据)
-  ${NORM_CYN}\t[-skip-add-partition]                 跳过添加分区
-  ${NORM_CYN}\t[-skip-datax]                         跳过DataX导出作业
-  ${NORM_CYN}\t[-random]                             随机选择Worker(默认本机执行,易造成压力大)
-  ${NORM_CYN}\t[-parallel]                           并行执行(默认串行)
-  ${DO_RESET}"
-  exit "$1"
-}
-
-function parse_args() {
-  for index in $(seq 1 $#); do
-    arg=${*:index:1}
-    case $arg in
-    --override)
-      partitioned_tables=()
-      job_config_array=()
-      job_config_directory_array=()
-      generator_config_array=()
-      generator_config_directory_array=()
-      ;;
-    *) ;;
-    esac
-  done
-
-  for index in $(seq 1 $#); do
-    arg=${*:index:1}
-    case $arg in
-    -t)
-      index=$((index + 1))
-      TABLE="${*:index:1}"
-      partitioned_tables+=("${TABLE}")
-      ;;
-    -t=*)
-      TABLE="${arg#*=}"
-      partitioned_tables+=("${TABLE}")
-      ;;
-    -jc)
-      index=$((index + 1))
-      JOB_CONFIG="${*:index:1}"
-      job_config_array+=("${JOB_CONFIG}")
-      ;;
-    -jc=*)
-      JOB_CONFIG="${arg#*=}"
-      job_config_array+=("${JOB_CONFIG}")
-      ;;
-    -jcd)
-      index=$((index + 1))
-      JCD="${*:index:1}"
-      job_config_directory_array+=("${JCD}")
-      ;;
-    -jcd=*)
-      TABLE="${arg#*=}"
-      job_config_directory_array+=("${JCD}")
-      ;;
-    -gc)
-      index=$((index + 1))
-      GC="${*:index:1}"
-      generator_config_array+=("${GC}")
-      ;;
-    -gc=*)
-      GC="${arg#*=}"
-      generator_config_array+=("${GC}")
-      ;;
-    -gcd)
-      index=$((index + 1))
-      GCD="${*:index:1}"
-      generator_config_directory_array+=("${GCD}")
-      ;;
-    -gcd=*)
-      GCD="${arg#*=}"
-      generator_config_directory_array+=("${GCD}")
-      ;;
-    -start-date)
-      index=$((index + 1))
-      START_DATE="${*:index:1}"
-      ;;
-    -start-date=*)
-      START_DATE="${arg#*=}"
-      ;;
-    -stop-date)
-      index=$((index + 1))
-      STOP_DATE="${*:index:1}"
-      ;;
-    -stop-date=*)
-      STOP_DATE="${arg#*=}"
-      ;;
-    -skip-add-partition)
-      SKIP_ADD_PARTITION="true"
-      ;;
-    -skip-datax)
-      DEFAULT_ARGS+=("-skip-datax")
-      ;;
-    -random)
-      DEFAULT_ARGS+=("-random")
-      ;;
-    -parallel)
-      DEFAULT_ARGS+=("-parallel")
-      ;;
-    -h | -H | --h | --H | --help)
-      usage 0
-      ;;
-    *) ;;
-    esac
-  done
-  pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}"
-}
-
-function parse_ddl() {
-  generator_config="${1}"
-  if [ ! -f "${generator_config}" ]; then
-    generator_config_path="${BASE_DIR}/${generator_config}"
-  else
-    generator_config_path="${generator_config}"
-  fi
-  if [ ! -f "${generator_config_path}" ]; then
-    # 没有找到配置文件
-    DDL=""
-    return
-  fi
-  path=$(grep "path =" "${generator_config_path}")
-  if [ "$(echo "${path}" | grep -c "/dt=\${dt}")" -eq 0 ]; then
-    # 非分区表
-    DDL=""
-    return
-  fi
-  if [[ "${path}" =~ .*\.db.* ]]; then
-    hive_db_name=$(echo "${path}" | awk -F'/' '{ for(i=1; i<=NF; i++) if($i ~ /\./) { print $i; exit } }' | cut -d '.' -f1)
-    hive_table_name=$(echo "${path}" | awk -F'/' '{ for(i=1; i<=NF; i++) if($i ~ /\./) { print $(i+1); exit } }')
-  else
-    hive_db_name="tmp"
-    hive_table_name=$(echo "${path}" | cut -d '/' -f5)
-  fi
-  DDL="ALTER TABLE ${hive_db_name}.${hive_table_name} ADD IF NOT EXISTS PARTITION(dt=${START_DATE});"
-}
-
-partitioned_tables=(
-  # 示例:`project`_`layer`.`layer`_`project`_`mysql-table-name`
-)
-# DataX mysql-hive配置文件(json)
-job_config_array=(
-  # 示例:conf/datax/generated/mysql-hive-`mysql-db-name`-`mysql-table-name`.json
-)
-job_config_directory_array=(
-  # 示例:conf/datax/generated
-)
-# DataX作业配置生成器的配置文件
-generator_config_array=(
-  # 示例:conf/datax/config/mysql-hdfs/`project`_`layer`/mysql-hive-`mysql-db-name`-`mysql-table-name`.ini
-  #  conf/datax/config/mysql-hdfs/bms_ods_test/mysql-hdfs-ik_bms_test-activity_labels.ini
-  #  conf/datax/config/mysql-hdfs/bms_ods_test/mysql-hdfs-ik_bms_test-ar_internal_metadata.ini
-)
-generator_config_directory_array=(
-  # 示例:conf/datax/config/mysql-hdfs/`project`_`layer`/
-  #  conf/datax/config/mysql-hdfs/bms_ods
-  #  conf/datax/config/mysql-hdfs/bms_ods_test
-  #  conf/datax/config/mysql-hdfs/crm_ods_dl
-  #  conf/datax/config/mysql-hdfs/jqr_ods
-  #  conf/datax/config/mysql-hdfs/skb_ods
-)
-DEFAULT_ARGS=()
-parse_args "${@}"
-if [ "$(uname)" = "Linux" ]; then
-  YESTERDAY=$(date -d '-1 day' +%Y%m%d)
-  TODAY=$(date +%Y%m%d)
-else
-  YESTERDAY=$(date -v-1d +%Y%m%d)
-  TODAY=$(date +%Y%m%d)
-fi
-if [ -z "${START_DATE}" ]; then
-  START_DATE=${YESTERDAY}
-fi
-if [ -z "${STOP_DATE}" ]; then
-  STOP_DATE=${TODAY}
-fi
-DEFAULT_ARGS+=("-start-date=${START_DATE}")
-DEFAULT_ARGS+=("-stop-date=${STOP_DATE}")
-HIVE_DDL=()
-# 显式声明的表
-for table in "${partitioned_tables[@]}"; do
-  HIVE_DDL+=("ALTER TABLE ${table} add partition(dt=${START_DATE});")
-done
-# 从DataX作业配置生成器配置文件名称中解析出Hive表名
-for generator_config in "${generator_config_array[@]}"; do
-  # 形如:conf/datax/config/mysql-hdfs/`project`_`layer`/mysql-hdfs-`mysql-db-name`-`mysql-table-name`.ini
-  parse_ddl "${generator_config}"
-  if [ -n "${DDL}" ]; then
-    HIVE_DDL+=("${DDL}")
-  fi
-done
-# 从DataX作业配置生成器配置文件中解析出的表
-for generator_config_directory in "${generator_config_directory_array[@]}"; do
-  # 形如:conf/datax/config/mysql-hdfs/`project`_`layer`/
-  if [ ! -f "${generator_config_directory}" ]; then
-    generator_config_directory="${BASE_DIR}/${generator_config_directory}"
-  fi
-  pretty_print "${NORM_MGT}处理生成器配置文件目录 ${NORM_GRN}${generator_config_directory}"
-  for generator_config in "${generator_config_directory}"/*; do
-    # 形如:conf/datax/config/mysql-hdfs/`project`_`layer`/mysql-hdfs-`mysql-db-name`-`mysql-table-name`.ini
-    parse_ddl "${generator_config}"
-    if [ -n "${DDL}" ]; then
-      HIVE_DDL+=("${DDL}")
-    fi
-  done
-done
-if [ -n "${SKIP_ADD_PARTITION}" ]; then
-  pretty_print "${NORM_YEL}跳过添加Hive分区(-skip-add-partition)"
-else
-  if [ ${#HIVE_DDL[@]} -eq 0 ]; then
-    pretty_print "${NORM_YEL}没有需要创建Hive新分区的表"
-  fi
-  for ddl in "${HIVE_DDL[@]}"; do
-    pretty_print "${NORM_MGT}创建Hive新分区:${NORM_GRN}${ddl}"
-  done
-  if [ "${#HIVE_DDL[@]}" -gt 0 ]; then
-    hive -e "${HIVE_DDL[*]}"
-  fi
-fi
-JOB_CONFIG=()
-for job_config in "${job_config_array[@]}"; do
-  JOB_CONFIG+=("-c=${job_config}")
-done
-GENERATOR_CONFIG=()
-for generator_config in "${generator_config_array[@]}"; do
-  GENERATOR_CONFIG+=("-gc=${generator_config}")
-done
-# 运行DataX作业配置文件列表中定义的作业
-if [ "${#JOB_CONFIG[@]}" -gt 0 ]; then
-  "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "${JOB_CONFIG[@]}" "${DEFAULT_ARGS[@]}"
-fi
-for job_config_directory in "${job_config_directory_array[@]}"; do
-  "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "-cd=${job_config_directory}" "${DEFAULT_ARGS[@]}"
-done
-if [ "${#GENERATOR_CONFIG[@]}" -gt 0 ]; then
-  "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "${GENERATOR_CONFIG[@]}" "${DEFAULT_ARGS[@]}"
-fi
-for generator_config_directory in "${generator_config_directory_array[@]}"; do
-  "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "-gcd=${generator_config_directory}" "${DEFAULT_ARGS[@]}"
-done

+ 0 - 13
bin/datax-multiple-job-starter.py

@@ -1,13 +0,0 @@
-#!/usr/bin/env /usr/bin/python3
-# -*- coding:utf-8 -*-
-"""
-  Note:为方便本地调试设计,请勿在调度中使用
-"""
-import os
-import sys
-
-project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-sys.path.append(project_root_dir)
-
-if __name__ == '__main__':
-    os.system(f'{project_root_dir}/bin/datax-multiple-job-starter.sh {" ".join(sys.argv[1:])}')

+ 0 - 264
bin/datax-multiple-job-starter.sh

@@ -1,264 +0,0 @@
-#!/bin/bash
-#--------------------------------------------------------------------------------------------------
-# 启动多个DataX作业
-# 1. 注意确定DataX Workers —— `DATAX_WORKERS`
-# 2. `multiple-job-starter`不会打印日志,请前往`/${LOG_ROOT_DIR}/data/log/datax/${START_DATE}`查看日志
-# 3. 配置调度使用本脚本时如果指定了`-random`选择随机worker,则调度节点必须选择`local-worker`
-#--------------------------------------------------------------------------------------------------
-#set -e
-BASE_DIR=$(
-  cd "$(dirname "$(realpath "$0")")/.." || exit
-  pwd
-)
-. "${BASE_DIR}"/bin/common/init.sh
-
-function usage() {
-  echo -e "${NORM_MGT}Usage: $0
-  ${NORM_CYN}\t[-h/-H/--h/--H/--help]                打印脚本使用方法${DO_RESET}"
-  echo -e "${NORM_MGT}Usage: $0
-  ${NORM_GRN}\t<-c< /=>job config>                   DataX作业配置文件(.json,绝对路径),支持多个,优先级1
-  ${NORM_GRN}\t<-cd< /=>job config directory>        DataX作业配置文件目录(.json文件夹,项目内相对路径或绝对路径),优先级2
-  ${NORM_GRN}\t<-gc< /=>generator config>            DataX作业配置生成器配置文件(.ini,相对路径),支持多个,优先级3
-  ${NORM_GRN}\t<-gcd< /=>generator config directory> DataX作业配置生成器配置文件目录(.ini文件夹,项目内相对路径或绝对路径),优先级4
-  ${NORM_CYN}\t[-start-date< /=>start date]          开始日期(用以筛选数据)
-  ${NORM_CYN}\t[-stop-date< /=>stop date]            结束日期(用以筛选数据)
-  ${NORM_CYN}\t[-host< /=>host]                      执行作业的主机,非${RELEASE_USER}用户或host和r都未指定则在当前机器执行,指定host优先于随机选择主机
-  ${NORM_CYN}\t[-random]                             随机选择Worker,非${RELEASE_USER}用户或host和random都未指定则在当前机器执行
-  ${NORM_CYN}\t[-parallel]                           并行执行(默认串行)
-  ${NORM_CYN}\t[-skip-datax]                         跳过DataX导出作业
-  ${DO_RESET}"
-  exit "$1"
-}
-
-function parse_args() {
-  for index in $(seq 1 $#); do
-    arg=${*:index:1}
-    case $arg in
-    -c)
-      index=$((index + 1))
-      JOB_CONFIG_PATH+=("${*:index:1}")
-      ;;
-    -c=*)
-      JOB_CONFIG_PATH+=("${arg#*=}")
-      ;;
-    -cd)
-      index=$((index + 1))
-      JOB_CONFIG_DIR="${*:index:1}"
-      ;;
-    -cd=*)
-      JOB_CONFIG_DIR="${arg#*=}"
-      ;;
-    -gc)
-      index=$((index + 1))
-      GENERATOR_CONFIG_PATH+=("${*:index:1}")
-      ;;
-    -gc=*)
-      GENERATOR_CONFIG_PATH+=("${arg#*=}")
-      ;;
-    -gcd)
-      index=$((index + 1))
-      GENERATOR_CONFIG_DIR="${*:index:1}"
-      ;;
-    -gcd=*)
-      GENERATOR_CONFIG_DIR="${arg#*=}"
-      ;;
-    -start-date)
-      index=$((index + 1))
-      START_DATE="${*:index:1}"
-      ;;
-    -start-date=*)
-      START_DATE="${arg#*=}"
-      ;;
-    -stop-date)
-      index=$((index + 1))
-      STOP_DATE="${*:index:1}"
-      ;;
-    -stop-date=*)
-      STOP_DATE="${arg#*=}"
-      ;;
-    -host)
-      index=$((index + 1))
-      PASS_ON_ARGS+=("-host=${*:index:1}")
-      ;;
-    -host=*)
-      PASS_ON_ARGS+=("-host=${arg#*=}")
-      ;;
-    -random)
-      PASS_ON_ARGS+=("$arg")
-      ;;
-    -skip-datax)
-      PASS_ON_ARGS+=("$arg")
-      ;;
-    -parallel)
-      PARALLEL="true"
-      ;;
-    -h | -H | --h | --H | --help)
-      usage 0
-      ;;
-    *) ;;
-    esac
-  done
-  pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}"
-  if [ "${#JOB_CONFIG_PATH[@]}" -eq 0 ] && [ -z "${JOB_CONFIG_DIR}" ] && [ "${#GENERATOR_CONFIG_PATH[@]}" -eq 0 ] && [ -z "${GENERATOR_CONFIG_DIR}" ]; then
-    pretty_print "${NORM_RED}请至少提供 ${NORM_GRN}-c、-cd、-gc、-gcd ${NORM_RED}中的一个"
-    usage 1
-  fi
-}
-
-function prepare() {
-  if [ "$(uname)" = "Linux" ]; then
-    YESTERDAY=$(date -d '-1 day' +%Y%m%d)
-    TODAY=$(date +%Y%m%d)
-  else
-    YESTERDAY=$(date -v-1d +%Y%m%d)
-    TODAY=$(date +%Y%m%d)
-  fi
-  if [ -z "${START_DATE}" ]; then
-    START_DATE=${YESTERDAY}
-  fi
-  if [ -z "${STOP_DATE}" ]; then
-    STOP_DATE=${TODAY}
-  fi
-  PASS_ON_ARGS+=("-start-date=${START_DATE}")
-  PASS_ON_ARGS+=("-stop-date=${STOP_DATE}")
-  if [ "${#JOB_CONFIG_PATH[@]}" -gt 0 ]; then
-    # 传递了`作业配置文件(列表)`
-    for jc in "${JOB_CONFIG_PATH[@]}"; do
-      if [[ "${JC_GC_ARGS[*]}" =~ .*"${jc}".* ]]; then
-        pretty_print "${NORM_YEL}提供了重复的DataX作业配置文件:${NORM_GRN}${jc}"
-        continue
-      fi
-      JC_GC_ARGS+=("-c=${jc}")
-    done
-  elif [ -n "${JOB_CONFIG_DIR}" ]; then
-    # 传递了`作业配置文件目录`
-    if [ ! -d "${JOB_CONFIG_DIR}" ]; then
-      # 目录不存在(可能原因是未在根目录执行脚本,传递的目录一定要是`相对路径`才可以)
-      JCD="${BASE_DIR}/${JOB_CONFIG_DIR}"
-      if [ ! -d "${JCD}" ]; then
-        pretty_print "${NORM_RED}提供的DataX作业配置目录 ${NORM_GRN}${JOB_CONFIG_DIR} ${NORM_RED}不存在"
-        exit 1
-      fi
-      JOB_CONFIG_DIR="${JCD}"
-    fi
-    for jc in "${JOB_CONFIG_DIR}"/*; do
-      # 需要`jc`都是json文件
-      if [ -f "${jc}" ]; then
-        JC_GC_ARGS+=("-c=${jc}")
-      fi
-    done
-  elif [ "${#GENERATOR_CONFIG_PATH[@]}" -gt 0 ]; then
-    # 传递了`DataX作业配置生成器配置文件(列表)`
-    for gc in "${GENERATOR_CONFIG_PATH[@]}"; do
-      if [[ "${JC_GC_ARGS[*]}" =~ .*"${gc}".* ]]; then
-        pretty_print "${NORM_YEL}提供了重复的DataX作业配置生成器配置文件:${NORM_GRN}${gc}"
-        continue
-      fi
-      JC_GC_ARGS+=("-gc=${gc}")
-    done
-  else
-    # 传递了`DataX作业配置生成器配置文件目录`
-    if [ ! -d "${GENERATOR_CONFIG_DIR}" ]; then
-      # 目录不存在(可能原因是未在根目录执行脚本,传递的目录一定要是相对路径才可以)
-      GCD="${BASE_DIR}/${GENERATOR_CONFIG_DIR}"
-      if [ ! -d "${GCD}" ]; then
-        pretty_print "${NORM_RED}提供的DataX作业配置生成器配置文件目录 ${NORM_GRN}${GENERATOR_CONFIG_DIR} ${NORM_RED}不存在"
-        exit 1
-      fi
-      GENERATOR_CONFIG_DIR="${GCD}"
-    fi
-    for gc in "${GENERATOR_CONFIG_DIR}"/*; do
-      # 需要`gc`都是ini文件
-      if [ -f "${gc}" ]; then
-        JC_GC_ARGS+=("-gc=${gc}")
-      fi
-    done
-  fi
-}
-
-function run_multiple_datax_job() {
-  success_count=0
-  failure_count=0
-  export MULTIPLE="true"
-  for arg in "${JC_GC_ARGS[@]}"; do
-    TEMP=$(dirname "${arg#*=}")
-    TEMP=${TEMP#"${BASE_DIR}/"}
-    TEMP=${TEMP#"conf/datax/config/"}
-    TEMP=${TEMP#"conf/datax/generated/"}
-    SRC_DST=$(echo "${TEMP}" | cut -d '/' -f1)
-    PROJECT_LAYER_ENV=$(echo "${TEMP}" | cut -d '/' -f2)
-    DB_ENV=$(echo "${TEMP}" | cut -d '/' -f3)
-    GROUP=$(echo "${TEMP}" | cut -d '/' -f4)
-    case ${arg} in
-    -c=*)
-      # -c=conf/datax/generated/from-to-database-table.json
-      JOB_NAME=$(basename "${arg#*=}" .json)
-      pretty_print "${NORM_MGT}使用DataX作业配置文件 ${NORM_GRN}${arg#*=} ${NORM_MGT}运行"
-      ;;
-    -gc=*)
-      # -gc=conf/datax/config/from-to/project_layer/from-to-database-table.ini
-      JOB_NAME=$(basename "${arg#*=}" .ini)
-      pretty_print "${NORM_MGT}使用DataX作业配置生成器配置文件 ${NORM_GRN}${arg#*=} ${NORM_MGT}运行"
-      ;;
-    *)
-      continue
-      ;;
-    esac
-    if [ "${USER}" == "${RELEASE_USER}" ]; then
-      if [ -n "${GROUP}" ]; then
-        LOG_RELATIVE_PATH="datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${GROUP}/${START_DATE}"
-      elif [ -n "${DB_ENV}" ]; then
-        LOG_RELATIVE_PATH="datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${START_DATE}"
-      else
-        LOG_RELATIVE_PATH="datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${START_DATE}"
-      fi
-    else
-      if [ -n "${GROUP}" ]; then
-        LOG_RELATIVE_PATH="users/${USER}/datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${GROUP}/${START_DATE}"
-      elif [ -n "${DB_ENV}" ]; then
-        LOG_RELATIVE_PATH="users/${USER}/datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${START_DATE}"
-      else
-        LOG_RELATIVE_PATH="users/${USER}/datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${START_DATE}"
-      fi
-    fi
-    LOG_DIR="${LOG_ROOT_DIR}/${LOG_RELATIVE_PATH}"
-    mkdir -p "${LOG_DIR}"
-    LOG_FILE_NAME="${START_DATE}-${JOB_NAME}.log"
-    LOG_FILE="${LOG_DIR}/${LOG_FILE_NAME}"
-    if [[ -n "${IS_RUN_BY_NORMAL_USER}" ]] || [ -z "${PARALLEL}" ] || [ "${CURRENT_HOST}" != "${RELEASE_HOST}" ]; then
-      # 普通用户或未指定并行或非发布主机
-      pretty_print "${NORM_MGT}日志将写入文件 ${NORM_GRN}${LOG_FILE} ${NORM_MGT}"
-      "${BASE_DIR}"/bin/datax-single-job-starter.sh "${PASS_ON_ARGS[@]}" "${arg}" | tee "${LOG_FILE}"
-      if [ "${PIPESTATUS[0]}" -eq 0 ]; then
-        success_count=$((success_count + 1))
-      else
-        failure_count=$((failure_count + 1))
-      fi
-    else
-      # 发布用户且指定并行
-      pretty_print "${NORM_MGT}日志将写入文件 ${NORM_GRN}${LOG_FILE} ${NORM_MGT}"
-      "${BASE_DIR}"/bin/datax-single-job-starter.sh "${PASS_ON_ARGS[@]}" "${arg}" >"${LOG_FILE}" 2>&1 &
-      success_count=$((success_count + 1))
-    fi
-    sleep 0.5s
-  done
-  if [[ -n "${IS_RUN_BY_NORMAL_USER}" ]] || [ -z "${PARALLEL}" ]; then
-    # 普通用户或未指定并行
-    pretty_print "${NORM_MGT}所有DataX作业都已完成,成功 ${NORM_GRN}${success_count}${NORM_MGT} 个,失败 ${NORM_RED}${failure_count}${NORM_MGT} 个"
-    exit ${failure_count}
-  else
-    pretty_print "${NORM_MGT}所有DataX作业都已启动(共启动 ${NORM_GRN}${success_count}${NORM_MGT} 个)"
-  fi
-}
-
-# DataX作业配置文件列表
-JOB_CONFIG_PATH=()
-# DataX作业配置生成器配置文件列表
-GENERATOR_CONFIG_PATH=()
-# 不作任何处理,传递给`datax-single-job-starter.sh`的参数
-PASS_ON_ARGS=()
-# 需作处理,加上`-c/-gc`的`DataX作业配置文件`或`DataX作业配置生成器配置文件`
-JC_GC_ARGS=()
-parse_args "${@}"
-prepare
-run_multiple_datax_job

+ 0 - 13
bin/datax-single-job-starter.py

@@ -1,13 +0,0 @@
-#!/usr/bin/env /usr/bin/python3
-# -*- coding:utf-8 -*-
-"""
-  Note:为方便本地调试设计,请勿在调度中使用
-"""
-import os
-import sys
-
-project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-sys.path.append(project_root_dir)
-
-if __name__ == '__main__':
-    os.system(f'{project_root_dir}/bin/datax-single-job-starter.sh {" ".join(sys.argv[1:])}')

+ 0 - 277
bin/datax-single-job-starter.sh

@@ -1,277 +0,0 @@
-#!/bin/bash
-#--------------------------------------------------------------------------------------------------
-# 启动单个DataX作业
-# 1. 注意确定Python3的路径`PYTHON3_PATH'、DataX的安装目录`DATAX_HOME`以及DataX Workers的声明——`datax_workers`
-# 2. 可以用-c传入生成好的DataX作业Json配置文件(绝对路径),或-gc传入ini文件(相对路径)
-# 3. ini文件的名称格式为:源系统类型-目标系统类型-数据库名称-数据集名称.ini
-#--------------------------------------------------------------------------------------------------
-CURRENT_DIR=$(pwd)
-BASE_DIR=$(
-  cd "$(dirname "$(realpath "$0")")/.." || exit
-  pwd
-)
-. "${BASE_DIR}"/bin/common/init.sh
-
-function usage() {
-  echo -e "${NORM_MGT}Usage: $0
-  ${NORM_CYN}\t[-h/-H/--h/--H/--help]       打印脚本使用方法${DO_RESET}"
-  echo -e "${NORM_MGT}Usage: $0
-  ${NORM_GRN}\t<-c< /=>job config>          DataX作业配置文件(.json,绝对路径)
-  ${NORM_GRN}\t<-gc< /=>generator config>   DataX作业配置生成器配置文件(.ini,项目内相对路径或绝对路径),-c优先
-  ${NORM_CYN}\t[-start-date< /=>start date] 开始日期(用以筛选数据)
-  ${NORM_CYN}\t[-stop-date< /=>stoP date]   结束日期(用以筛选数据)
-  ${NORM_CYN}\t[-host< /=>host]             执行作业的主机,非${RELEASE_USER}用户或host和random都未指定则在当前机器执行,指定host优先于随机选择主机
-  ${NORM_CYN}\t[-random]                    随机选择主机,非${RELEASE_USER}用户或host和random都未指定则在当前机器执行
-  ${NORM_CYN}\t[-skip-datax]                跳过DataX导出作业
-  ${DO_RESET}"
-  exit "$1"
-}
-
-function select_worker() {
-  pretty_print "${NORM_MGT}本次作业执行用户为 ${NORM_GRN}${USER}"
-  if [ -z "${IS_RUN_BY_RELEASE_USER}" ]; then
-    # 非 ${RELEASE_USER} 用户只能在本机执行
-    selected_worker=${CURRENT_HOST}
-    pretty_print "${NORM_MGT}非 ${NORM_GRN}${RELEASE_USER}${NORM_MGT} 用户限制以本机 ${NORM_GRN}${CURRENT_HOST} ${NORM_MGT}为Worker运行作业"
-  elif [ -z "${IS_RUN_IN_RELEASE_DIR}" ]; then
-    # 非 ${RELEASE_USER} 用户只能在本机执行
-    selected_worker=${CURRENT_HOST}
-    pretty_print "${NORM_MGT}非发布目录 (${NORM_GRN}${RELEASE_ROOT_DIR}${NORM_MGT}) 下限制以本机 ${NORM_GRN}${CURRENT_HOST} ${NORM_MGT}为Worker运行作业"
-  else
-    if [ -n "${HOST}" ]; then
-      pretty_print "${NORM_MGT}用户指定执行Worker为 ${NORM_GRN}${HOST}"
-      selected_worker=${HOST}
-    elif [ -n "${IS_RANDOM}" ]; then
-      # 生成一个>=0, <数组长度的随机数
-      worker_index=$((RANDOM % ${#DATAX_WORKERS_QUEUE[@]}))
-      selected_worker=${DATAX_WORKERS_QUEUE[${worker_index}]}
-      pretty_print "${NORM_MGT}用户指定随机选择Worker, 执行Worker为 ${NORM_GRN}${selected_worker}"
-    else
-      # 只能在本机执行的情况
-      selected_worker=${CURRENT_HOST}
-      pretty_print "${NORM_MGT}用户既未指定Worker,也未选择随机决定Worker, 执行Worker为本机 ${NORM_GRN}${CURRENT_HOST}"
-    fi
-  fi
-}
-
-function generate_job_config() {
-  if [ -z "${JOB_CONFIG_PATH}" ]; then
-    # 未提供`DataX作业配置文件`
-    # 由提供的`DataX作业配置生成器配置文件`生成`DataX作业配置文件`
-    if [ "${selected_worker}" == "${CURRENT_HOST}" ]; then
-      ${PYTHON3_PATH} -u "${BASE_DIR}"/bin/datax-job-config-generator.py \
-        -c "${GENERATOR_CONFIG_PATH}" \
-        -start-date "${START_DATE}" \
-        -stop-date "${STOP_DATE}"
-    else
-      ssh "${selected_worker}" "${PYTHON3_PATH}" -u "${BASE_DIR}"/bin/datax-job-config-generator.py \
-        -c "${GENERATOR_CONFIG_PATH}" \
-        -start-date "${START_DATE}" \
-        -stop-date "${STOP_DATE}"
-    fi
-    # shellcheck disable=SC2181
-    if [ "$?" -ne 0 ]; then
-      pretty_print "${NORM_MGT}使用配置文件 ${NORM_GRN}${GENERATOR_CONFIG_RELATIVE_PATH} ${NORM_MGT}生成DataX作业配置文件失败"
-      #      fi
-      exit 1
-    fi
-    TEMP=$(dirname "${GENERATOR_CONFIG_PATH}")
-    TEMP=${TEMP#"${BASE_DIR}/"}
-    TEMP=${TEMP#"conf/datax/config/"}
-    SRC_DST=$(echo "${TEMP}" | cut -d '/' -f1)
-    PROJECT_LAYER_ENV=$(echo "${TEMP}" | cut -d '/' -f2)
-    DB_ENV=$(echo "${TEMP}" | cut -d '/' -f3)
-    GROUP=$(echo "${TEMP}" | cut -d '/' -f4)
-    NEW=$(echo "${TEMP}" | cut -d '/' -f5)
-
-    # 修改生成的作业名称,能够识别多级目录
-    JOB_NAME=$(basename "${GENERATOR_CONFIG_PATH}" .ini)
-    JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${TEMP}/${JOB_NAME}.json"
-#    if [ -n "${NEW}" ]; then
-#      JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${GROUP}/${NEW}/${JOB_NAME}.json"
-#    elif [ -n "${GROUP}" ]; then
-#      JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${GROUP}/${JOB_NAME}.json"
-#    elif [ -n "${DB_ENV}" ]; then
-#      JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${JOB_NAME}.json"
-#    else
-#      JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${SRC_DST}/${PROJECT_LAYER_ENV}/${JOB_NAME}.json"
-#    fi
-    JOB_CONFIG_PATH="${BASE_DIR}/${JOB_CONFIG_RELATIVE_PATH}"
-  else
-    # 提供了`DataX作业配置文件`
-    if [ ! -f "${JOB_CONFIG_PATH}" ]; then
-      # 如果脚本不是在根目录下执行,相对路径的配置文件是找不到的,因此要变成绝对路径
-      JOB_CONFIG_RELATIVE_PATH=${JOB_CONFIG_PATH}
-      JOB_CONFIG_PATH="${BASE_DIR}/${JOB_CONFIG_RELATIVE_PATH}"
-    elif [[ "${JOB_CONFIG_PATH}" =~ "${BASE_DIR}".* ]]; then
-      # `DataX作业配置文件`是绝对路径
-      JOB_CONFIG_RELATIVE_PATH=${JOB_CONFIG_PATH#"${BASE_DIR}/"}
-    else
-      # 执行目录下可找到的相对路径,加上当前目录
-      JOB_CONFIG_PATH="${CURRENT_DIR}/${JOB_CONFIG_PATH}"
-      JOB_CONFIG_RELATIVE_PATH=${JOB_CONFIG_PATH#"${BASE_DIR}/"}
-    fi
-    #    PROJECT_LAYER_ENV=$(basename "$(dirname "${JOB_CONFIG_PATH}")")
-    #    SRC_DST=$(basename "$(dirname "$(dirname "${JOB_CONFIG_PATH}")")")
-    TEMP=$(dirname "${JOB_CONFIG_PATH}")
-    TEMP=${TEMP#"${BASE_DIR}/"}
-    TEMP=${TEMP#"conf/datax/generated/"}
-    SRC_DST=$(echo "${TEMP}" | cut -d '/' -f1)
-    PROJECT_LAYER_ENV=$(echo "${TEMP}" | cut -d '/' -f2)
-    DB_ENV=$(echo "${TEMP}" | cut -d '/' -f3)
-    GROUP=$(echo "${TEMP}" | cut -d '/' -f4)
-    JOB_NAME=$(basename "${JOB_CONFIG_PATH}" .json)
-  fi
-  datax_run_command="${PYTHON3_PATH} -u ${DATAX_HOME}/bin/datax.py ${JOB_CONFIG_PATH}"
-}
-
-function check_data_exists() {
-  DATA_EXISTS="true"
-  if [[ "${JOB_CONFIG_PATH}" =~ ${BASE_DIR}/.*/.*hdfs[-_].* ]]; then
-    path=$(jq -r '.job.content[0].reader.parameter.path' "${JOB_CONFIG_PATH}")
-    if ! hadoop fs -test -e "${path}"; then
-      pretty_print "${NORM_MGT}HDFS 路径 ${NORM_GRN}${path}${NORM_MGT} 不存在,DataX不需要执行"
-      exit 0
-    fi
-    if ! hadoop fs -test -e "${path}/*"; then
-      pretty_print "${NORM_MGT}HDFS 路径 ${NORM_GRN}${path}${NORM_MGT} 中没有数据,DataX不需要执行"
-      DATA_EXISTS=""
-    fi
-    disk_usage=$(hadoop fs -du -s "${path}" | cut -d ' ' -f1)
-    if [ "${disk_usage}" -eq 0 ]; then
-      pretty_print "${NORM_MGT}HDFS 路径 ${NORM_GRN}${path}${NORM_MGT} 中没有数据,DataX不需要执行"
-      DATA_EXISTS=""
-    fi
-  fi
-}
-
-function parse_args() {
-  for index in $(seq 1 $#); do
-    arg=${*:index:1}
-    case $arg in
-    -c)
-      index=$((index + 1))
-      JOB_CONFIG_PATH="${*:index:1}"
-      ;;
-    -c=*)
-      JOB_CONFIG_PATH=${arg#*=}
-      ;;
-    -gc)
-      index=$((index + 1))
-      GENERATOR_CONFIG_PATH="${*:index:1}"
-      ;;
-    -gc=*)
-      GENERATOR_CONFIG_PATH=${arg#*=}
-      ;;
-    -start-date)
-      index=$((index + 1))
-      START_DATE="${*:index:1}"
-      ;;
-    -start-date=*)
-      START_DATE="${arg#*=}"
-      ;;
-    -stop-date)
-      index=$((index + 1))
-      STOP_DATE="${*:index:1}"
-      ;;
-    -stop-date=*)
-      STOP_DATE="${arg#*=}"
-      ;;
-    -host)
-      index=$((index + 1))
-      HOST="${*:index:1}"
-      ;;
-    -host=*)
-      HOST="${arg#*=}"
-      ;;
-    -random)
-      IS_RANDOM='true'
-      ;;
-    -skip-datax)
-      SKIP_DATAX='true'
-      ;;
-    -h | -H | --h | --H | --help)
-      usage 0
-      ;;
-    *) ;;
-    esac
-  done
-  pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}"
-}
-
-function prepare() {
-  if [ -z "${JOB_CONFIG_PATH}" ] && [ -z "${GENERATOR_CONFIG_PATH}" ]; then
-    pretty_print "${NORM_RED}请使用 -gc 提供DataX作业配置生成器配置文件(.ini文件)或使用 -c 提供DataX作业配置文件(.json文件)"
-    usage 1
-  fi
-  if [ "$(uname)" = "Linux" ]; then
-    YESTERDAY=$(date -d '-1 day' +%Y%m%d)
-    TODAY=$(date +%Y%m%d)
-  else
-    YESTERDAY=$(date -v-1d +%Y%m%d)
-    TODAY=$(date +%Y%m%d)
-  fi
-  if [ -z "${START_DATE}" ]; then
-    START_DATE=${YESTERDAY}
-  fi
-  if [ -z "${STOP_DATE}" ]; then
-    STOP_DATE=${TODAY}
-  fi
-  if [ -n "${JOB_CONFIG_PATH}" ]; then
-    JOB_CONFIG_RELATIVE_PATH=${JOB_CONFIG_PATH#"${BASE_DIR}/"}
-    pretty_print "${NORM_MGT}使用配置文件 ${NORM_GRN}${JOB_CONFIG_RELATIVE_PATH} ${NORM_MGT}运行DataX作业"
-  fi
-  if [ -n "${GENERATOR_CONFIG_PATH}" ]; then
-    GENERATOR_CONFIG_RELATIVE_PATH=${GENERATOR_CONFIG_PATH#"${BASE_DIR}/"}
-    pretty_print "${NORM_MGT}使用DataX作业配置生成器配置文件 ${NORM_GRN}${GENERATOR_CONFIG_RELATIVE_PATH} ${NORM_MGT}运行DataX作业"
-  fi
-}
-
-function run_single_datax_job() {
-  generate_job_config
-  if [[ "${JOB_CONFIG_PATH}" =~ .*/hdfs-kafka/.* ]]; then
-    pretty_print "${NORM_GRN}写kafka的ini文件中,writer部分的注意事项:"
-    pretty_print "${NORM_GRN}1. column,与reader的column在顺序上应一一对应,可在此处实现列名的变换"
-    pretty_print "${NORM_GRN}2. columnType,支持的显式类型(类型书写不区分大小写,结构体子字段名称区分大小写)有:"
-    pretty_print "${NORM_GRN}(1)id,表示该字段的值是es document的id"
-    pretty_print "${NORM_GRN}(2)array<string>,如果字段在hive中的定义是array<string>,但其element是数值(包括0开头的字符串数值)、bool的,应显式指定其类型;其他element类型本就是string或者数值、bool,则可以不指明"
-    pretty_print "${NORM_GRN}(3)array<struct<?,?,?>>,如果字段在hive中的定义是array<struct<?,?,?>>,应当显式指定,其结构体属性的名称和类型,名称和类型的分隔符为@,属性间的分隔符为#,例如:array<struct<col1@bigint#col2@string>>,如果字段是已经序列化的命名结构体数组,则不需要指明类型"
-    pretty_print "${NORM_GRN}(4)struct<?,?,?>,如果字段在hive中的定义是struct<?,?,?>,应当显式指定,其结构体属性的名称和类型,名称和类型的分隔符为@,例如:struct<col1@bigint#col2@string>,如果字段是已经序列化的命名结构体,则不需要指明类型"
-    pretty_print "${NORM_GRN}(5)json,如果字段是序列化后的json,应指明需要进行反序列化"
-    pretty_print "${NORM_GRN}(6)其他类型,比如int、long、bigint、bool、double等,都不需要指明类型"
-    pretty_print "${NORM_GRN}3. columnMapping,同hive es mapping表tblproperties['es.mapping.names']"
-    pretty_print "${NORM_GRN}注:不管columnType还是结构体属性字段名称,都会使用columnMapping进行字段名称转换"
-    pretty_print "${NORM_RED}重要:array的element、struct的字段值,一定不要包含英文逗号,否则会出现切分的值不对的情况!!!"
-  fi
-  if [ "${selected_worker}" = "${CURRENT_HOST}" ]; then
-    if [ -z "${SKIP_DATAX}" ]; then
-      check_data_exists
-      if [ "${DATA_EXISTS}" = 'true' ]; then
-        pretty_print "${NORM_MGT}在 ${NORM_GRN}${selected_worker} ${NORM_MGT}上执行以下命令:
-                    ${NORM_GRN}${datax_run_command}"
-        echo -en "${NORM_GRN}"
-        # 本机执行
-        ${datax_run_command}
-      fi
-    else
-      pretty_print "${NORM_MGT}DataX job was set to skipped"
-    fi
-  else
-    if [ -z "${SKIP_DATAX}" ]; then
-      check_data_exists
-      if [ "${DATA_EXISTS}" = 'true' ]; then
-        pretty_print "${NORM_MGT}在 ${NORM_GRN}${CURRENT_HOST} ${NORM_MGT}上执行以下命令:
-                    ${NORM_GRN}ssh ${RELEASE_USER}@${selected_worker} ${datax_run_command}"
-        echo -en "${NORM_GRN}"
-        # shellcheck disable=SC2029
-        ssh  "${selected_worker}" "${datax_run_command}"
-      fi
-    else
-      pretty_print "${NORM_MGT}DataX job was set to skipped"
-    fi
-  fi
-}
-parse_args "${@}"
-prepare
-select_worker
-run_single_datax_job

+ 3 - 1
kb/90-重构路线.md

@@ -163,7 +163,9 @@ L3   SparkSQL(...) 显式传参  +  extra_spark_config  +  命令行 -sc
 
 **已作废的子项**(2026-04-22 方向反转):`-env` 参数 + datasource 按 env 分子目录 + 一套代码跑多环境。前期跨环境同步是常态(test 业务库 → prod HDFS),不存在"全局 env"概念,sync ini 里 `dataSource = {db_type}/{env}-{实例简称}` 显式指向具体 source ini;source ini 落位 `datasource/{db_type}/{env}-{实例简称}.ini`,扁平组织。代码侧 db_type 判定改按首段斜杠(= 父目录)在 `plugin.py:37` + `plugin_factory.py:34` 落地(✅ 2026-04-22)。
 
-### 2.6 DataX 入口脚本收口为两条命令(中优先级)
+### 2.6 DataX 入口脚本收口为两条命令 ✅ 2026-04-23
+
+> ✅ 2026-04-23 落地:新入口 `bin/datax-{hive-import,hdfs-export}-starter.{sh,py}` + `dw_base/datax/` 7 模块(`path_utils` / `worker` / `partition` / `runner` / `batch` / `entry` / `cli`);老 7 个脚本(`datax-{single,multiple,multiple-hive}-job-starter.{sh,py}` + `datax-job-config-generator.py`)整体删除。详见 kb/92 对应日 changelog。本节保留为上下文(kb/90 retention 规则)。
 
 **现状**:DataX 入口分散,学习成本高:
 

+ 5 - 248
kb/91-重构备忘.md

@@ -1,251 +1,8 @@
-### 4.3 DataX 脚本详细使用说明 (即将重构
+### 4.4 老 DataX 脚本 Worker 分发与日志链路(2026-04-23 查证
 
-DataX 脚本分为**执行脚本**和**辅助工具**两类,调用链如下:
-
-```
-datax-multiple-hive-job-starter.sh   (MySQL→Hive 专用批量入口,含自动分区管理)
-        │
-        ▼
-datax-multiple-job-starter.sh        (通用批量入口)
-        │
-        ▼
-datax-single-job-starter.sh          (单任务入口)
-        │
-        ├─► datax-job-config-generator.py   (ini→json 配置生成)
-        │           │
-        │           ▼
-        │   dw_base/datax/job_config_generator.py(生成引擎)
-        │
-        ▼
-python datax.py generated.json       (DataX 框架执行数据同步)
-```
-
-辅助工具:`datax-gc-generator.py`(连接源库元数据,批量生成 ini 配置文件)
-
-> **`.py` 同名包装器**:`datax-single-job-starter.py`、`datax-multiple-job-starter.py`、`datax-multiple-hive-job-starter.py` 是对应 `.sh` 的薄 Python 包装,**仅供本地调试**,禁止在 DolphinScheduler 调度中使用。
-
----
-
-#### 4.3.1 `datax-single-job-starter.sh` —— 单任务启动
-
-**用途**:启动单个 DataX 同步任务。接受已生成的 JSON 或待生成的 ini 配置。
-
-**参数**:
-
-| 参数                     | 必填   | 说明                                                      |
-| ------------------------ | ------ | --------------------------------------------------------- |
-| `-c <path>`              | 二选一 | DataX 作业 JSON 配置文件(绝对路径)                      |
-| `-gc <path>`             | 二选一 | DataX ini 配置文件(项目内相对路径或绝对路径),`-c` 优先 |
-| `-start-date <yyyyMMdd>` | 否     | 开始日期,默认昨天                                        |
-| `-stop-date <yyyyMMdd>`  | 否     | 结束日期,默认今天                                        |
-| `-host <hostname>`       | 否     | 指定执行主机,优先于 `-random`                            |
-| `-random`                | 否     | 随机选择 Worker 节点                                      |
-| `-skip-datax`            | 否     | 跳过实际 DataX 执行(仅生成配置)                         |
-
-**Worker 选择逻辑**(`select_worker()`):
-
-1. 非 `bigdata` 用户 → 强制本机执行
-2. 非发布目录下执行 → 强制本机执行
-3. 指定了 `-host` → 使用指定主机
-4. 指定了 `-random` → 从 `DATAX_WORKERS_QUEUE` 随机选一台
-5. 都未指定 → 本机执行
-
-**HDFS 数据检查**(`check_data_exists()`):当 JSON 配置路径包含 `hdfs-` 时,会自动检查 HDFS reader 路径是否存在且有数据,无数据则跳过执行。
-
-**示例**:
-
-```bash
-# 采集任务(raw 层 ini)
-bin/datax-single-job-starter.sh -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini -start-date 20260415 -env prod
-
-# 导出任务(ads 层 ini)
-bin/datax-single-job-starter.sh -gc jobs/ads/trd/ads_trd_gmv_d_export.ini -start-date 20260415 -env prod
-
-# 使用已生成的 JSON(跳过生成,env 已嵌入 JSON)
-bin/datax-single-job-starter.sh -c /abs/path/to/generated.json
-```
-
-> **待重构项**(见 `90-重构路线.md` §2.1 DataX 条目):
->
-> - `-env` 参数目前**尚未实现**,现阶段切环境靠改 `datasource/` 下的实际文件或 `conf/env.sh`(待新建)
-> - `bin/` 下几个 DataX 启动脚本 / 生成器里还残留 `conf/datax/config/` 前缀剥离逻辑(老项目遗留;该目录已迁至 `conf/bak/` 并忽略入库),新项目 ini 放在 `jobs/raw/` / `jobs/ads/` / `manual/`,这段逻辑要清理掉
-
----
-
-#### 4.3.2 `datax-multiple-job-starter.sh` —— 通用批量启动
-
-**用途**:批量启动多个 DataX 任务,支持串行/并行执行。DolphinScheduler 调度的主要入口。
-
-**参数**:
-
-| 参数          | 优先级    | 说明                      |
-| ------------- | --------- | ------------------------- |
-| `-c <path>`   | 1(最高) | JSON 配置文件,可多次传入 |
-| `-cd <dir>`   | 2         | JSON 配置文件目录         |
-| `-gc <path>`  | 3         | ini 配置文件,可多次传入  |
-| `-gcd <dir>`  | 4(最低) | ini 配置文件目录          |
-| `-start-date` | —         | 开始日期,默认昨天        |
-| `-stop-date`  | —         | 结束日期,默认今天        |
-| `-host`       | —         | 指定 Worker 节点          |
-| `-random`     | —         | 随机选择 Worker           |
-| `-parallel`   | —         | 并行执行(默认串行)      |
-| `-skip-datax` | —         | 跳过 DataX 执行           |
-
-**执行模式**:
-
-- **串行**(默认):逐个调用 `datax-single-job-starter.sh`,日志实时输出(`tee`),结束后汇报成功/失败计数
-- **并行**(`-parallel`):后台启动所有任务,日志写入文件,仅限 `bigdata` 用户 + 发布主机
-
-**日志路径**:`${LOG_ROOT_DIR}/datax/${src-dst}/${project_layer_env}/${START_DATE}/${START_DATE}-${JOB_NAME}.log`
-
-**示例**(目标态):
-
-```bash
-# 批量执行整个业务域下的 raw 采集 ini
-bin/datax-multiple-job-starter.sh -gcd jobs/raw/trd -start-date 20260415 -env prod -parallel
-
-# 指定多个 ini 文件串行执行
-bin/datax-multiple-job-starter.sh \
-  -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini \
-  -gc jobs/raw/usr/raw_usr_user_info_inc_d.ini \
-  -start-date 20260415 -env prod
-```
-
----
-
-#### 4.3.3 `datax-multiple-hive-job-starter.sh` —— 带 Hive 分区自动管理的批量启动
-
-**用途**:在 `datax-multiple-job-starter.sh` 之上封装了 **Hive 分区自动管理**。任何写入 Hive 分区表的 DataX 同步作业(不限于 MySQL→Hive)都可以用它,脚本头注释里"MySQL-Hive 作业"只是历史命名。**日常采集作业的主力入口**。
-
-**与 multiple-job-starter 的区别**:
-
-1. 自动从 ini 配置中解析 Hive 表名和分区路径(`parse_ddl()` 函数,`grep "path =" <ini>`)
-2. 在执行 DataX 前自动执行 `ALTER TABLE ... ADD IF NOT EXISTS PARTITION(dt=...)` 
-3. 支持在脚本内硬编码配置列表(`partitioned_tables`、`generator_config_array` 等数组),适合固定调度场景
-4. 支持 `--override` 参数临时覆盖脚本内硬编码配置
-
-> **自动建分区只对 ini 输入生效**:`parse_ddl()` 读的是 ini 里的 `path = ...` 行。如果走 `-jc` / `-jcd` 传已生成的 JSON,脚本没有 ini 可解析,自动建分区**不触发**,此时要么改用 `-t db.table` 显式声明分区、要么把分区记录在脚本内 `partitioned_tables` 数组。
-
-**额外参数**:
-
-| 参数                                    | 说明                                               |
-| --------------------------------------- | -------------------------------------------------- |
-| `--override`                            | 忽略脚本内硬编码的配置列表,只执行命令行传入的配置 |
-| `-t <db.table>`                         | 显式指定需要建分区的 Hive 表,可多次传入           |
-| `-skip-add-partition`                   | 跳过 Hive 分区创建                                 |
-| `-jc` / `-jcd` / `-gc` / `-gcd`         | 同 multiple-job-starter                            |
-| `-start-date` / `-stop-date`            | 同上                                               |
-| `-random` / `-parallel` / `-skip-datax` | 同上                                               |
-
-**分区解析逻辑**(`parse_ddl()`):
-
-1. 读取 ini 文件中 `path =` 行
-2. 检查路径是否包含 `/dt=${dt}`(分区标识)
-3. 从 HDFS 路径中提取 `{db}.db/{table_name}` → 拼接 `ALTER TABLE {db}.{table} ADD IF NOT EXISTS PARTITION(dt={START_DATE})`
-
-**示例**(目标态):
-
-```bash
-# 执行某业务域下所有 raw 采集 ini + 自动建 Hive 分区
-bin/datax-multiple-hive-job-starter.sh \
-  -gcd jobs/raw/trd \
-  -start-date 20260415 -env prod -parallel
-
-# 覆盖脚本内硬编码配置,只跑指定的失败任务
-bin/datax-multiple-hive-job-starter.sh --override \
-  -gc jobs/raw/trd/raw_trd_order_pay_inc_d.ini \
-  -start-date 20260415 -env prod
-```
-
----
-
-#### 4.3.4 `datax-job-config-generator.py` —— ini→JSON 配置生成器
-
-**用途**:将人类可读的 `.ini` 任务配置转换为 DataX 框架所需的 `.json` 作业配置文件。通常由 `datax-single-job-starter.sh` 自动调用,也可独立执行。
-
-**参数**:
-
-| 参数          | 说明                                     |
-| ------------- | ---------------------------------------- |
-| `-c <path>`   | ini 配置文件路径,可多次传入或逗号分隔   |
-| `-d <dir>`    | 扫描指定目录下的 ini 文件                |
-| `-r`          | 配合 `-d` 使用,递归扫描子目录           |
-| `-start-date` | 开始日期,默认昨天                       |
-| `-stop-date`  | 结束日期,默认今天                       |
-| `-o <dir>`    | 输出目录(默认 `conf/datax/generated/`) |
-
-**生成路径规则**(**当前脚本残留老逻辑,待清理**):脚本里仍保留 `temp = os.path.dirname(gcf).replace(project_root_dir, '').replace('conf/datax/config/', '').split('/')` 这段——老项目的 ini 放在 `conf/datax/config/{src-dst}/{env}/` 下,前缀剥离后能派生出 `src_dst` / `project_layer_env` 拼接输出路径。新项目 ini 已经不走这条路径(`conf/datax/config/` 整体挪到 `conf/bak/` 并 gitignore),但脚本里的 replace 语句仍在执行一次无效剥离,输出会落到 `conf/datax/generated/jobs/raw/trd/xxx.json`——能跑但路径形态不符合新约定。
-
-重构目标:去掉路径前缀剥离逻辑,输出统一扁平为 `conf/datax/generated/{env}/{目标表名}.json`。登记为硬编码待重构项,见 `90-重构路线.md` §2.1。
-
-**示例**(目标态):
-
-```bash
-# 生成单个 ini 对应的 JSON
-python3 bin/datax-job-config-generator.py -c jobs/raw/trd/raw_trd_order_pay_inc_d.ini -env prod
-
-# 批量生成某业务域下所有 ini(递归)
-python3 bin/datax-job-config-generator.py -d jobs/raw/trd -r -env prod
-
-# 指定日期和输出路径
-python3 bin/datax-job-config-generator.py -c jobs/raw/trd/raw_trd_order_pay_inc_d.ini \
-  -start-date 20260415 -stop-date 20260416 -env prod -o /tmp/datax-out
-```
-
----
-
-#### 4.3.5 `datax-gc-generator.py` —— ini 配置元生成器
-
-此部分需要完全重构,此记录仅为重构提供思路。
-
-**用途**:连接源数据库读取表结构元数据,自动生成 DataX ini 配置文件。是开发阶段的**辅助工具**,用于批量初始化 ini 配置,生成后通常需要人工检查和调整。
-
-**通用参数**:
-
-| 参数             | 说明                                                         |
-| ---------------- | ------------------------------------------------------------ |
-| `--from <type>`  | 源系统类型(`mysql` / `hdfs`,默认 `mysql`)                 |
-| `--to <type>`    | 目标系统类型(`hdfs` / `hbase` / `kafka` / `mongo` / `elasticsearch` / `mysql`,默认 `hdfs`) |
-| `--output <dir>` | 生成的 ini 文件存储目录                                      |
-
-**MySQL 作为源(`--from mysql`)额外参数**:`-h`(主机)、`-P`(端口)、`-u`(用户)、`-p`(密码)、`-D`(数据库)、`-t`(指定表)、`-tr`(表名正则)、`-e`(排除表)、`-er`(排除表正则)、`--inc-col`(增量字段,默认 `update_time`)
-
-**HDFS 作为源(`--from hdfs`)额外参数**:`-d`(Hive 数据库)、`-t`(Hive 表)、`-e`(排除表)、`--partitioned`(是否分区表)
-
-**示例**:
-
-```bash
-# 为 MySQL 库中所有表生成 mysql→hdfs 的 ini 配置,输出到 raw/trd 业务域
-python3 bin/datax-gc-generator.py --from mysql --to hdfs \
-  -h 10.0.0.1 -u reader -p xxx -D hobby_prod \
-  --output jobs/raw/trd
-
-# 只为指定表生成,排除临时表
-python3 bin/datax-gc-generator.py --from mysql --to hdfs \
-  -h 10.0.0.1 -u reader -p xxx -D hobby_prod \
-  -tr "^order" -er "^tmp_" \
-  --output jobs/raw/trd
-
-# 为 Hive 表生成 hdfs→elasticsearch 的 ini 配置
-python3 bin/datax-gc-generator.py --from hdfs --to elasticsearch \
-  -d ads --partitioned \
-  --output conf/datax/config/hdfs-elasticsearch/prod
-```
-
-> **安全提示**:该脚本接受数据库账密作为命令行参数。生产环境中建议通过环境变量或临时文件传递敏感信息,避免密码出现在 shell history 和进程列表中。
-
-
-
-
-
-
-
- 已查证:whoami 分流 + env.sh 外配 2026-04-21 完成(kb/92 L175);{module}/{dt}/{file}.log 统一结构
-  仍目标态(日志模块统一 kb/92 L617 待启动)
-
----
-
-### 4.4 Worker 分发与日志链路(2026-04-23 查证)
+> 本节描述的是**老脚本**(`datax-single/multiple/multiple-hive-job-starter.sh`)的分发和日志行为。
+> 老脚本已于 2026-04-23 批次 5 随工作 3 新入口落地整体删除(见 kb/92 changelog)。
+> 本节作为老行为档案保留,给 kb/93 ADR-02(分布式归 DS worker group)提供历史背景。
 
 #### 4.4.1 分发位置
 
@@ -276,4 +33,4 @@ idea 控制台 ← ssh ← m2 bash ← ssh ← <worker> datax.py stdout
 - 每个任务日志落 **m2 本地** `${LOG_ROOT_DIR}/datax/.../<START_DATE>/<START_DATE>-<JOB_NAME>.log`(不分散在各 worker)
 - 进度要 `tail -f` 独立日志文件
 
-并行模式触发三连:USER=bigdata + CURRENT_HOST=RELEASE_HOST + 显式 `-parallel`,任一缺失退串行。
+并行模式触发三连:USER=bigdata + CURRENT_HOST=RELEASE_HOST + 显式 `-parallel`,任一缺失退串行。

+ 1 - 0
kb/92-重构进度.md

@@ -197,3 +197,4 @@
 | 2026-04-23 | **DataX 字段级脱敏配置化落地(插入本轮 scope 完成)**:新建 `dw_base/datax/mask.py` —— PG 方言 5 种脱敏(3 静态 `month_trunc` / `md5` / `mask_middle` + 2 动态 `keep_first_{n}` / `keep_last_{n}`,正则捕获数字参数),列名白名单 `[A-Za-z_][A-Za-z0-9_]*` + mask_type 校验防 SQL 注入;`postgresql_reader.py:load_others` 检测 `[mask]` 段自动调 mask 模块生成 querySql,优先级"手写 querySql > [mask] > table"。新建 `tests/unit/datax/test_mask.py` 9 条单测覆盖静态/动态/综合/各校验分支。`tests/integration/datax/hive_import/app_user_cert_info.ini` 从手写 querySql 切到 `[mask] cert_birthday = month_trunc` 声明式。联动 kb/90 §2.6 高优先级"DataX 字段级脱敏"改写为 [mask] 为规范 + querySql 为手写后备,低优先级"ini 配置化脱敏"条标 ✅ 压缩 | — |
 | 2026-04-23 | **kb/93 新增 2 条架构 ADR 草案 + kb/90 §2.6 联动**:(a) kb/93 §3 决策清单从"待补充"填入 ADR-01(按天补数归 DS,DataX 入口不做日期展开)+ ADR-02(分布式分发归 DS worker group,DataX 不重复随机),两条状态 = 草案、等正式拍板转"已采纳"。背景 / 决策 / 后果 / 候选方案 / 反悔条件五段精简写法。(b) kb/90 §2.6 后延 ADR 低优先级"日期范围展开"条追加"可能不做"备注指向 ADR-01;新增"DataX 仅本机执行"条(低优先级)指向 ADR-02。破例:CLAUDE.md 里 kb/93 "暂不改动除非显式要求",本次用户显式要求开口 | — |
 | 2026-04-23 | **DataX 新入口脚本落地(工作 3 批次 4)**:新建 `bin/datax-hive-import-starter.{sh,py}` + `bin/datax-hdfs-export-starter.{sh,py}`。bash 壳 source `bin/common/init.sh` 初始化 env 后 `exec python3` 到主入口;.py 用 argparse 解析老参数平迁集(-ini / -inis / -start-date / -stop-date / -host / -random / -parallel / -skip-datax + import 专有的 -skip-partition / -t),构造 `DataxImport` / `DataxExport` 门面调 `run`。`tests/integration/datax/hive_import/README.md` 冒烟命令切到新入口 `-ini` 语法。老 `datax-{single,multiple,multiple-hive}-job-starter.{sh,py}` + `datax-job-config-generator.py` 暂不删(批次 5 做,等冒烟 2 通过) | — |
+| 2026-04-23 | **工作 3 批次 5 收官:删老 DataX 脚本 + kb 整理(工作 3 整体 ✅)**:新增 `dw_base/datax/cli.py`(`python -m dw_base.datax.cli gen-json` 子命令,内部调 `JobConfigGenerator`)+ 3 条单测;`runner.py` 的 gen 调用从 `bin/datax-job-config-generator.py` 切到 `-m dw_base.datax.cli gen-json`;删 7 个老文件(`bin/datax-{single,multiple,multiple-hive}-job-starter.{sh,py}` + `bin/datax-job-config-generator.py`);顺带清 `dw_base/datax/plugins/plugin.py:4` 死 `import pwd`(Windows 单测 import 阻塞)。联动 kb/90 §2.6 标 ✅(retention 规则保留正文),kb/91 §4.3 老脚本使用说明整段删、保留 §4.4 老分发/日志查证档案给 kb/93 ADR-02 做背景。整套单测 46 条全过。至此工作 3 六批次(批次 0-5)全部完成:术语 memory / §2.6 压缩 → workers 外配 → 冒烟 1 → 脱敏配置化插入 → 工作 3 三聚簇 → 新入口 → 冒烟 2 ✅ → 删老 | — |