#!/bin/bash #-------------------------------------------------------------------------------------------------- # 启动多个DataX作业 # 1. 注意确定DataX Workers —— `DATAX_WORKERS` # 2. `multiple-job-starter`不会打印日志,请前往`/${LOG_ROOT_DIR}/data/log/datax/${START_DATE}`查看日志 # 3. 配置调度使用本脚本时如果指定了`-random`选择随机worker,则调度节点必须选择`local-worker` #-------------------------------------------------------------------------------------------------- #set -e BASE_DIR=$( cd "$(dirname "$(realpath "$0")")/.." || exit pwd ) . "${BASE_DIR}"/bin/common/init.sh function usage() { echo -e "${NORM_MGT}Usage: $0 ${NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法${DO_RESET}" echo -e "${NORM_MGT}Usage: $0 ${NORM_GRN}\t<-c< /=>job config> DataX作业配置文件(.json,绝对路径),支持多个,优先级1 ${NORM_GRN}\t<-cd< /=>job config directory> DataX作业配置文件目录(.json文件夹,项目内相对路径或绝对路径),优先级2 ${NORM_GRN}\t<-gc< /=>generator config> DataX作业配置生成器配置文件(.ini,相对路径),支持多个,优先级3 ${NORM_GRN}\t<-gcd< /=>generator config directory> DataX作业配置生成器配置文件目录(.ini文件夹,项目内相对路径或绝对路径),优先级4 ${NORM_CYN}\t[-start-date< /=>start date] 开始日期(用以筛选数据) ${NORM_CYN}\t[-stop-date< /=>stop date] 结束日期(用以筛选数据) ${NORM_CYN}\t[-host< /=>host] 执行作业的主机,非${RELEASE_USER}用户或host和r都未指定则在当前机器执行,指定host优先于随机选择主机 ${NORM_CYN}\t[-random] 随机选择Worker,非${RELEASE_USER}用户或host和random都未指定则在当前机器执行 ${NORM_CYN}\t[-parallel] 并行执行(默认串行) ${NORM_CYN}\t[-skip-datax] 跳过DataX导出作业 ${DO_RESET}" exit "$1" } function parse_args() { for index in $(seq 1 $#); do arg=${*:index:1} case $arg in -c) index=$((index + 1)) JOB_CONFIG_PATH+=("${*:index:1}") ;; -c=*) JOB_CONFIG_PATH+=("${arg#*=}") ;; -cd) index=$((index + 1)) JOB_CONFIG_DIR="${*:index:1}" ;; -cd=*) JOB_CONFIG_DIR="${arg#*=}" ;; -gc) index=$((index + 1)) GENERATOR_CONFIG_PATH+=("${*:index:1}") ;; -gc=*) GENERATOR_CONFIG_PATH+=("${arg#*=}") ;; -gcd) index=$((index + 1)) GENERATOR_CONFIG_DIR="${*:index:1}" ;; -gcd=*) GENERATOR_CONFIG_DIR="${arg#*=}" ;; -start-date) index=$((index + 1)) START_DATE="${*:index:1}" ;; -start-date=*) START_DATE="${arg#*=}" ;; -stop-date) index=$((index + 1)) STOP_DATE="${*:index:1}" ;; -stop-date=*) STOP_DATE="${arg#*=}" ;; -host) index=$((index + 1)) PASS_ON_ARGS+=("-host=${*:index:1}") ;; -host=*) PASS_ON_ARGS+=("-host=${arg#*=}") ;; -random) PASS_ON_ARGS+=("$arg") ;; -skip-datax) PASS_ON_ARGS+=("$arg") ;; -parallel) PARALLEL="true" ;; -h | -H | --h | --H | --help) usage 0 ;; *) ;; esac done pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}" if [ "${#JOB_CONFIG_PATH[@]}" -eq 0 ] && [ -z "${JOB_CONFIG_DIR}" ] && [ "${#GENERATOR_CONFIG_PATH[@]}" -eq 0 ] && [ -z "${GENERATOR_CONFIG_DIR}" ]; then pretty_print "${NORM_RED}请至少提供 ${NORM_GRN}-c、-cd、-gc、-gcd ${NORM_RED}中的一个" usage 1 fi } function prepare() { if [ "$(uname)" = "Linux" ]; then YESTERDAY=$(date -d '-1 day' +%Y%m%d) TODAY=$(date +%Y%m%d) else YESTERDAY=$(date -v-1d +%Y%m%d) TODAY=$(date +%Y%m%d) fi if [ -z "${START_DATE}" ]; then START_DATE=${YESTERDAY} fi if [ -z "${STOP_DATE}" ]; then STOP_DATE=${TODAY} fi PASS_ON_ARGS+=("-start-date=${START_DATE}") PASS_ON_ARGS+=("-stop-date=${STOP_DATE}") if [ "${#JOB_CONFIG_PATH[@]}" -gt 0 ]; then # 传递了`作业配置文件(列表)` for jc in "${JOB_CONFIG_PATH[@]}"; do if [[ "${JC_GC_ARGS[*]}" =~ .*"${jc}".* ]]; then pretty_print "${NORM_YEL}提供了重复的DataX作业配置文件:${NORM_GRN}${jc}" continue fi JC_GC_ARGS+=("-c=${jc}") done elif [ -n "${JOB_CONFIG_DIR}" ]; then # 传递了`作业配置文件目录` if [ ! -d "${JOB_CONFIG_DIR}" ]; then # 目录不存在(可能原因是未在根目录执行脚本,传递的目录一定要是`相对路径`才可以) JCD="${BASE_DIR}/${JOB_CONFIG_DIR}" if [ ! -d "${JCD}" ]; then pretty_print "${NORM_RED}提供的DataX作业配置目录 ${NORM_GRN}${JOB_CONFIG_DIR} ${NORM_RED}不存在" exit 1 fi JOB_CONFIG_DIR="${JCD}" fi for jc in "${JOB_CONFIG_DIR}"/*; do # 需要`jc`都是json文件 if [ -f "${jc}" ]; then JC_GC_ARGS+=("-c=${jc}") fi done elif [ "${#GENERATOR_CONFIG_PATH[@]}" -gt 0 ]; then # 传递了`DataX作业配置生成器配置文件(列表)` for gc in "${GENERATOR_CONFIG_PATH[@]}"; do if [[ "${JC_GC_ARGS[*]}" =~ .*"${gc}".* ]]; then pretty_print "${NORM_YEL}提供了重复的DataX作业配置生成器配置文件:${NORM_GRN}${gc}" continue fi JC_GC_ARGS+=("-gc=${gc}") done else # 传递了`DataX作业配置生成器配置文件目录` if [ ! -d "${GENERATOR_CONFIG_DIR}" ]; then # 目录不存在(可能原因是未在根目录执行脚本,传递的目录一定要是相对路径才可以) GCD="${BASE_DIR}/${GENERATOR_CONFIG_DIR}" if [ ! -d "${GCD}" ]; then pretty_print "${NORM_RED}提供的DataX作业配置生成器配置文件目录 ${NORM_GRN}${GENERATOR_CONFIG_DIR} ${NORM_RED}不存在" exit 1 fi GENERATOR_CONFIG_DIR="${GCD}" fi for gc in "${GENERATOR_CONFIG_DIR}"/*; do # 需要`gc`都是ini文件 if [ -f "${gc}" ]; then JC_GC_ARGS+=("-gc=${gc}") fi done fi } function run_multiple_datax_job() { success_count=0 failure_count=0 export MULTIPLE="true" for arg in "${JC_GC_ARGS[@]}"; do TEMP=$(dirname "${arg#*=}") TEMP=${TEMP#"${BASE_DIR}/"} TEMP=${TEMP#"conf/datax/config/"} TEMP=${TEMP#"conf/datax/generated/"} SRC_DST=$(echo "${TEMP}" | cut -d '/' -f1) PROJECT_LAYER_ENV=$(echo "${TEMP}" | cut -d '/' -f2) DB_ENV=$(echo "${TEMP}" | cut -d '/' -f3) GROUP=$(echo "${TEMP}" | cut -d '/' -f4) case ${arg} in -c=*) # -c=conf/datax/generated/from-to-database-table.json JOB_NAME=$(basename "${arg#*=}" .json) pretty_print "${NORM_MGT}使用DataX作业配置文件 ${NORM_GRN}${arg#*=} ${NORM_MGT}运行" ;; -gc=*) # -gc=conf/datax/config/from-to/project_layer/from-to-database-table.ini JOB_NAME=$(basename "${arg#*=}" .ini) pretty_print "${NORM_MGT}使用DataX作业配置生成器配置文件 ${NORM_GRN}${arg#*=} ${NORM_MGT}运行" ;; *) continue ;; esac if [ "${USER}" == "${RELEASE_USER}" ]; then if [ -n "${GROUP}" ]; then LOG_RELATIVE_PATH="datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${GROUP}/${START_DATE}" elif [ -n "${DB_ENV}" ]; then LOG_RELATIVE_PATH="datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${START_DATE}" else LOG_RELATIVE_PATH="datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${START_DATE}" fi else if [ -n "${GROUP}" ]; then LOG_RELATIVE_PATH="users/${USER}/datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${GROUP}/${START_DATE}" elif [ -n "${DB_ENV}" ]; then LOG_RELATIVE_PATH="users/${USER}/datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${START_DATE}" else LOG_RELATIVE_PATH="users/${USER}/datax/${SRC_DST}/${PROJECT_LAYER_ENV}/${START_DATE}" fi fi LOG_DIR="${LOG_ROOT_DIR}/${LOG_RELATIVE_PATH}" mkdir -p "${LOG_DIR}" LOG_FILE_NAME="${START_DATE}-${JOB_NAME}.log" LOG_FILE="${LOG_DIR}/${LOG_FILE_NAME}" if [[ -n "${IS_RUN_BY_NORMAL_USER}" ]] || [ -z "${PARALLEL}" ] || [ "${CURRENT_HOST}" != "${RELEASE_HOST}" ]; then # 普通用户或未指定并行或非发布主机 pretty_print "${NORM_MGT}日志将写入文件 ${NORM_GRN}${LOG_FILE} ${NORM_MGT}" "${BASE_DIR}"/bin/datax-single-job-starter.sh "${PASS_ON_ARGS[@]}" "${arg}" | tee "${LOG_FILE}" if [ "${PIPESTATUS[0]}" -eq 0 ]; then success_count=$((success_count + 1)) else failure_count=$((failure_count + 1)) fi else # 发布用户且指定并行 pretty_print "${NORM_MGT}日志将写入文件 ${NORM_GRN}${LOG_FILE} ${NORM_MGT}" "${BASE_DIR}"/bin/datax-single-job-starter.sh "${PASS_ON_ARGS[@]}" "${arg}" >"${LOG_FILE}" 2>&1 & success_count=$((success_count + 1)) fi sleep 0.5s done if [[ -n "${IS_RUN_BY_NORMAL_USER}" ]] || [ -z "${PARALLEL}" ]; then # 普通用户或未指定并行 pretty_print "${NORM_MGT}所有DataX作业都已完成,成功 ${NORM_GRN}${success_count}${NORM_MGT} 个,失败 ${NORM_RED}${failure_count}${NORM_MGT} 个" exit ${failure_count} else pretty_print "${NORM_MGT}所有DataX作业都已启动(共启动 ${NORM_GRN}${success_count}${NORM_MGT} 个)" fi } # DataX作业配置文件列表 JOB_CONFIG_PATH=() # DataX作业配置生成器配置文件列表 GENERATOR_CONFIG_PATH=() # 不作任何处理,传递给`datax-single-job-starter.sh`的参数 PASS_ON_ARGS=() # 需作处理,加上`-c/-gc`的`DataX作业配置文件`或`DataX作业配置生成器配置文件` JC_GC_ARGS=() parse_args "${@}" prepare run_multiple_datax_job