#!/bin/bash #-------------------------------------------------------------------------------------------------- # 启动单个DataX作业 # 1. 注意确定Python3的路径`PYTHON3_PATH'、DataX的安装目录`DATAX_HOME`以及DataX Workers的声明——`datax_workers` # 2. 可以用-c传入生成好的DataX作业Json配置文件(绝对路径),或-gc传入ini文件(相对路径) # 3. ini文件的名称格式为:源系统类型-目标系统类型-数据库名称-数据集名称.ini #-------------------------------------------------------------------------------------------------- CURRENT_DIR=$(pwd) BASE_DIR=$( cd "$(dirname "$(realpath "$0")")/.." || exit pwd ) . "${BASE_DIR}"/bin/common/init.sh function usage() { echo -e "${NORM_MGT}Usage: $0 ${NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法${DO_RESET}" echo -e "${NORM_MGT}Usage: $0 ${NORM_GRN}\t<-c< /=>job config> DataX作业配置文件(.json,绝对路径) ${NORM_GRN}\t<-gc< /=>generator config> DataX作业配置生成器配置文件(.ini,项目内相对路径或绝对路径),-c优先 ${NORM_CYN}\t[-start-date< /=>start date] 开始日期(用以筛选数据) ${NORM_CYN}\t[-stop-date< /=>stoP date] 结束日期(用以筛选数据) ${NORM_CYN}\t[-host< /=>host] 执行作业的主机,非${RELEASE_USER}用户或host和random都未指定则在当前机器执行,指定host优先于随机选择主机 ${NORM_CYN}\t[-random] 随机选择主机,非${RELEASE_USER}用户或host和random都未指定则在当前机器执行 ${NORM_CYN}\t[-skip-datax] 跳过DataX导出作业 ${DO_RESET}" exit "$1" } function select_worker() { pretty_print "${NORM_MGT}本次作业执行用户为 ${NORM_GRN}${USER}" if [ -z "${IS_RUN_BY_RELEASE_USER}" ]; then # 非 ${RELEASE_USER} 用户只能在本机执行 selected_worker=${CURRENT_HOST} pretty_print "${NORM_MGT}非 ${NORM_GRN}${RELEASE_USER}${NORM_MGT} 用户限制以本机 ${NORM_GRN}${CURRENT_HOST} ${NORM_MGT}为Worker运行作业" elif [ -z "${IS_RUN_IN_RELEASE_DIR}" ]; then # 非 ${RELEASE_USER} 用户只能在本机执行 selected_worker=${CURRENT_HOST} pretty_print "${NORM_MGT}非发布目录 (${NORM_GRN}${RELEASE_ROOT_DIR}${NORM_MGT}) 下限制以本机 ${NORM_GRN}${CURRENT_HOST} ${NORM_MGT}为Worker运行作业" else if [ -n "${HOST}" ]; then pretty_print "${NORM_MGT}用户指定执行Worker为 ${NORM_GRN}${HOST}" selected_worker=${HOST} elif [ -n "${IS_RANDOM}" ]; then # 生成一个>=0, <数组长度的随机数 worker_index=$((RANDOM % ${#DATAX_WORKERS_QUEUE[@]})) selected_worker=${DATAX_WORKERS_QUEUE[${worker_index}]} pretty_print "${NORM_MGT}用户指定随机选择Worker, 执行Worker为 ${NORM_GRN}${selected_worker}" else # 只能在本机执行的情况 selected_worker=${CURRENT_HOST} pretty_print "${NORM_MGT}用户既未指定Worker,也未选择随机决定Worker, 执行Worker为本机 ${NORM_GRN}${CURRENT_HOST}" fi fi } function generate_job_config() { if [ -z "${JOB_CONFIG_PATH}" ]; then # 未提供`DataX作业配置文件` # 由提供的`DataX作业配置生成器配置文件`生成`DataX作业配置文件` if [ "${selected_worker}" == "${CURRENT_HOST}" ]; then ${PYTHON3_PATH} -u "${BASE_DIR}"/bin/datax-job-config-generator.py \ -c "${GENERATOR_CONFIG_PATH}" \ -start-date "${START_DATE}" \ -stop-date "${STOP_DATE}" else ssh "${selected_worker}" "${PYTHON3_PATH}" -u "${BASE_DIR}"/bin/datax-job-config-generator.py \ -c "${GENERATOR_CONFIG_PATH}" \ -start-date "${START_DATE}" \ -stop-date "${STOP_DATE}" fi # shellcheck disable=SC2181 if [ "$?" -ne 0 ]; then pretty_print "${NORM_MGT}使用配置文件 ${NORM_GRN}${GENERATOR_CONFIG_RELATIVE_PATH} ${NORM_MGT}生成DataX作业配置文件失败" # fi exit 1 fi TEMP=$(dirname "${GENERATOR_CONFIG_PATH}") TEMP=${TEMP#"${BASE_DIR}/"} TEMP=${TEMP#"conf/datax/config/"} SRC_DST=$(echo "${TEMP}" | cut -d '/' -f1) PROJECT_LAYER_ENV=$(echo "${TEMP}" | cut -d '/' -f2) DB_ENV=$(echo "${TEMP}" | cut -d '/' -f3) GROUP=$(echo "${TEMP}" | cut -d '/' -f4) NEW=$(echo "${TEMP}" | cut -d '/' -f5) # 修改生成的作业名称,能够识别多级目录 JOB_NAME=$(basename "${GENERATOR_CONFIG_PATH}" .ini) JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${TEMP}/${JOB_NAME}.json" # if [ -n "${NEW}" ]; then # JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${GROUP}/${NEW}/${JOB_NAME}.json" # elif [ -n "${GROUP}" ]; then # JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${GROUP}/${JOB_NAME}.json" # elif [ -n "${DB_ENV}" ]; then # JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${JOB_NAME}.json" # else # JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${SRC_DST}/${PROJECT_LAYER_ENV}/${JOB_NAME}.json" # fi JOB_CONFIG_PATH="${BASE_DIR}/${JOB_CONFIG_RELATIVE_PATH}" else # 提供了`DataX作业配置文件` if [ ! -f "${JOB_CONFIG_PATH}" ]; then # 如果脚本不是在根目录下执行,相对路径的配置文件是找不到的,因此要变成绝对路径 JOB_CONFIG_RELATIVE_PATH=${JOB_CONFIG_PATH} JOB_CONFIG_PATH="${BASE_DIR}/${JOB_CONFIG_RELATIVE_PATH}" elif [[ "${JOB_CONFIG_PATH}" =~ "${BASE_DIR}".* ]]; then # `DataX作业配置文件`是绝对路径 JOB_CONFIG_RELATIVE_PATH=${JOB_CONFIG_PATH#"${BASE_DIR}/"} else # 执行目录下可找到的相对路径,加上当前目录 JOB_CONFIG_PATH="${CURRENT_DIR}/${JOB_CONFIG_PATH}" JOB_CONFIG_RELATIVE_PATH=${JOB_CONFIG_PATH#"${BASE_DIR}/"} fi # PROJECT_LAYER_ENV=$(basename "$(dirname "${JOB_CONFIG_PATH}")") # SRC_DST=$(basename "$(dirname "$(dirname "${JOB_CONFIG_PATH}")")") TEMP=$(dirname "${JOB_CONFIG_PATH}") TEMP=${TEMP#"${BASE_DIR}/"} TEMP=${TEMP#"conf/datax/generated/"} SRC_DST=$(echo "${TEMP}" | cut -d '/' -f1) PROJECT_LAYER_ENV=$(echo "${TEMP}" | cut -d '/' -f2) DB_ENV=$(echo "${TEMP}" | cut -d '/' -f3) GROUP=$(echo "${TEMP}" | cut -d '/' -f4) JOB_NAME=$(basename "${JOB_CONFIG_PATH}" .json) fi datax_run_command="${PYTHON3_PATH} -u ${DATAX_HOME}/bin/datax.py ${JOB_CONFIG_PATH}" } function check_data_exists() { DATA_EXISTS="true" if [[ "${JOB_CONFIG_PATH}" =~ ${BASE_DIR}/.*/.*hdfs[-_].* ]]; then path=$(jq -r '.job.content[0].reader.parameter.path' "${JOB_CONFIG_PATH}") if ! hadoop fs -test -e "${path}"; then pretty_print "${NORM_MGT}HDFS 路径 ${NORM_GRN}${path}${NORM_MGT} 不存在,DataX不需要执行" exit 0 fi if ! hadoop fs -test -e "${path}/*"; then pretty_print "${NORM_MGT}HDFS 路径 ${NORM_GRN}${path}${NORM_MGT} 中没有数据,DataX不需要执行" DATA_EXISTS="" fi disk_usage=$(hadoop fs -du -s "${path}" | cut -d ' ' -f1) if [ "${disk_usage}" -eq 0 ]; then pretty_print "${NORM_MGT}HDFS 路径 ${NORM_GRN}${path}${NORM_MGT} 中没有数据,DataX不需要执行" DATA_EXISTS="" fi fi } function parse_args() { for index in $(seq 1 $#); do arg=${*:index:1} case $arg in -c) index=$((index + 1)) JOB_CONFIG_PATH="${*:index:1}" ;; -c=*) JOB_CONFIG_PATH=${arg#*=} ;; -gc) index=$((index + 1)) GENERATOR_CONFIG_PATH="${*:index:1}" ;; -gc=*) GENERATOR_CONFIG_PATH=${arg#*=} ;; -start-date) index=$((index + 1)) START_DATE="${*:index:1}" ;; -start-date=*) START_DATE="${arg#*=}" ;; -stop-date) index=$((index + 1)) STOP_DATE="${*:index:1}" ;; -stop-date=*) STOP_DATE="${arg#*=}" ;; -host) index=$((index + 1)) HOST="${*:index:1}" ;; -host=*) HOST="${arg#*=}" ;; -random) IS_RANDOM='true' ;; -skip-datax) SKIP_DATAX='true' ;; -h | -H | --h | --H | --help) usage 0 ;; *) ;; esac done pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}" } function prepare() { if [ -z "${JOB_CONFIG_PATH}" ] && [ -z "${GENERATOR_CONFIG_PATH}" ]; then pretty_print "${NORM_RED}请使用 -gc 提供DataX作业配置生成器配置文件(.ini文件)或使用 -c 提供DataX作业配置文件(.json文件)" usage 1 fi if [ "$(uname)" = "Linux" ]; then YESTERDAY=$(date -d '-1 day' +%Y%m%d) TODAY=$(date +%Y%m%d) else YESTERDAY=$(date -v-1d +%Y%m%d) TODAY=$(date +%Y%m%d) fi if [ -z "${START_DATE}" ]; then START_DATE=${YESTERDAY} fi if [ -z "${STOP_DATE}" ]; then STOP_DATE=${TODAY} fi if [ -n "${JOB_CONFIG_PATH}" ]; then JOB_CONFIG_RELATIVE_PATH=${JOB_CONFIG_PATH#"${BASE_DIR}/"} pretty_print "${NORM_MGT}使用配置文件 ${NORM_GRN}${JOB_CONFIG_RELATIVE_PATH} ${NORM_MGT}运行DataX作业" fi if [ -n "${GENERATOR_CONFIG_PATH}" ]; then GENERATOR_CONFIG_RELATIVE_PATH=${GENERATOR_CONFIG_PATH#"${BASE_DIR}/"} pretty_print "${NORM_MGT}使用DataX作业配置生成器配置文件 ${NORM_GRN}${GENERATOR_CONFIG_RELATIVE_PATH} ${NORM_MGT}运行DataX作业" fi } function run_single_datax_job() { generate_job_config if [[ "${JOB_CONFIG_PATH}" =~ .*/hdfs-kafka/.* ]]; then pretty_print "${NORM_GRN}写kafka的ini文件中,writer部分的注意事项:" pretty_print "${NORM_GRN}1. column,与reader的column在顺序上应一一对应,可在此处实现列名的变换" pretty_print "${NORM_GRN}2. columnType,支持的显式类型(类型书写不区分大小写,结构体子字段名称区分大小写)有:" pretty_print "${NORM_GRN}(1)id,表示该字段的值是es document的id" pretty_print "${NORM_GRN}(2)array,如果字段在hive中的定义是array,但其element是数值(包括0开头的字符串数值)、bool的,应显式指定其类型;其他element类型本就是string或者数值、bool,则可以不指明" pretty_print "${NORM_GRN}(3)array>,如果字段在hive中的定义是array>,应当显式指定,其结构体属性的名称和类型,名称和类型的分隔符为@,属性间的分隔符为#,例如:array>,如果字段是已经序列化的命名结构体数组,则不需要指明类型" pretty_print "${NORM_GRN}(4)struct,如果字段在hive中的定义是struct,应当显式指定,其结构体属性的名称和类型,名称和类型的分隔符为@,例如:struct,如果字段是已经序列化的命名结构体,则不需要指明类型" pretty_print "${NORM_GRN}(5)json,如果字段是序列化后的json,应指明需要进行反序列化" pretty_print "${NORM_GRN}(6)其他类型,比如int、long、bigint、bool、double等,都不需要指明类型" pretty_print "${NORM_GRN}3. columnMapping,同hive es mapping表tblproperties['es.mapping.names']" pretty_print "${NORM_GRN}注:不管columnType还是结构体属性字段名称,都会使用columnMapping进行字段名称转换" pretty_print "${NORM_RED}重要:array的element、struct的字段值,一定不要包含英文逗号,否则会出现切分的值不对的情况!!!" fi if [ "${selected_worker}" = "${CURRENT_HOST}" ]; then if [ -z "${SKIP_DATAX}" ]; then check_data_exists if [ "${DATA_EXISTS}" = 'true' ]; then pretty_print "${NORM_MGT}在 ${NORM_GRN}${selected_worker} ${NORM_MGT}上执行以下命令: ${NORM_GRN}${datax_run_command}" echo -en "${NORM_GRN}" # 本机执行 ${datax_run_command} fi else pretty_print "${NORM_MGT}DataX job was set to skipped" fi else if [ -z "${SKIP_DATAX}" ]; then check_data_exists if [ "${DATA_EXISTS}" = 'true' ]; then pretty_print "${NORM_MGT}在 ${NORM_GRN}${CURRENT_HOST} ${NORM_MGT}上执行以下命令: ${NORM_GRN}ssh ${RELEASE_USER}@${selected_worker} ${datax_run_command}" echo -en "${NORM_GRN}" # shellcheck disable=SC2029 ssh "${selected_worker}" "${datax_run_command}" fi else pretty_print "${NORM_MGT}DataX job was set to skipped" fi fi } parse_args "${@}" prepare select_worker run_single_datax_job