| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- #!/bin/bash
- #--------------------------------------------------------------------------------------------------
- # 分布式并行启动多个DataX MySQL-Hive作业
- # 1. 可以同时通过4种方式来指定作业,但作业中是否有重复的配置,需要开发者来判断
- # 2. 可以传递--override来覆盖脚本内的所有配置,重新传递要执行的作业(方便单独跑失败的作业)
- # 3. 运行模式:本机串行、随机串行(意义不大)、本机并行(默认模式)、随机并行
- #--------------------------------------------------------------------------------------------------
- set -e
- BASE_DIR=$(
- cd "$(dirname "$(realpath "$0")")/.." || exit
- pwd
- )
- . "${BASE_DIR}"/bin/common/init.sh
- function usage() {
- echo -e "${NORM_MGT}Usage: $0
- ${NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法${DO_RESET}"
- echo -e "${NORM_MGT}Usage: $0
- ${NORM_CYN}\t[--override] 如果出现override,则只执行传入的配置,文件里定义的配置被忽略
- ${NORM_CYN}\t[-t< /=>table] 需要建分区的表
- ${NORM_CYN}\t[-jc< /=>job config] DataX作业配置文件(json)
- ${NORM_CYN}\t[-jcd< /=>job config directory] DataX作业配置文件(json)夹
- ${NORM_CYN}\t[-gc< /=>generator config] DataX作业配置文件生成器的配置文件(ini)
- ${NORM_CYN}\t[-gcd< /=>generator config directory] DataX作业配置文件生成器的配置文件(ini)夹
- ${NORM_CYN}\t[-start-date< /=>start date] 开始日期(用以筛选数据)
- ${NORM_CYN}\t[-stop-date< /=>stop date] 结束日期(用以筛选数据)
- ${NORM_CYN}\t[-skip-add-partition] 跳过添加分区
- ${NORM_CYN}\t[-skip-datax] 跳过DataX导出作业
- ${NORM_CYN}\t[-random] 随机选择Worker(默认本机执行,易造成压力大)
- ${NORM_CYN}\t[-parallel] 并行执行(默认串行)
- ${DO_RESET}"
- exit "$1"
- }
- function parse_args() {
- for index in $(seq 1 $#); do
- arg=${*:index:1}
- case $arg in
- --override)
- partitioned_tables=()
- job_config_array=()
- job_config_directory_array=()
- generator_config_array=()
- generator_config_directory_array=()
- ;;
- *) ;;
- esac
- done
- for index in $(seq 1 $#); do
- arg=${*:index:1}
- case $arg in
- -t)
- index=$((index + 1))
- TABLE="${*:index:1}"
- partitioned_tables+=("${TABLE}")
- ;;
- -t=*)
- TABLE="${arg#*=}"
- partitioned_tables+=("${TABLE}")
- ;;
- -jc)
- index=$((index + 1))
- JOB_CONFIG="${*:index:1}"
- job_config_array+=("${JOB_CONFIG}")
- ;;
- -jc=*)
- JOB_CONFIG="${arg#*=}"
- job_config_array+=("${JOB_CONFIG}")
- ;;
- -jcd)
- index=$((index + 1))
- JCD="${*:index:1}"
- job_config_directory_array+=("${JCD}")
- ;;
- -jcd=*)
- TABLE="${arg#*=}"
- job_config_directory_array+=("${JCD}")
- ;;
- -gc)
- index=$((index + 1))
- GC="${*:index:1}"
- generator_config_array+=("${GC}")
- ;;
- -gc=*)
- GC="${arg#*=}"
- generator_config_array+=("${GC}")
- ;;
- -gcd)
- index=$((index + 1))
- GCD="${*:index:1}"
- generator_config_directory_array+=("${GCD}")
- ;;
- -gcd=*)
- GCD="${arg#*=}"
- generator_config_directory_array+=("${GCD}")
- ;;
- -start-date)
- index=$((index + 1))
- START_DATE="${*:index:1}"
- ;;
- -start-date=*)
- START_DATE="${arg#*=}"
- ;;
- -stop-date)
- index=$((index + 1))
- STOP_DATE="${*:index:1}"
- ;;
- -stop-date=*)
- STOP_DATE="${arg#*=}"
- ;;
- -skip-add-partition)
- SKIP_ADD_PARTITION="true"
- ;;
- -skip-datax)
- DEFAULT_ARGS+=("-skip-datax")
- ;;
- -random)
- DEFAULT_ARGS+=("-random")
- ;;
- -parallel)
- DEFAULT_ARGS+=("-parallel")
- ;;
- -h | -H | --h | --H | --help)
- usage 0
- ;;
- *) ;;
- esac
- done
- pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}"
- }
- function parse_ddl() {
- generator_config="${1}"
- if [ ! -f "${generator_config}" ]; then
- generator_config_path="${BASE_DIR}/${generator_config}"
- else
- generator_config_path="${generator_config}"
- fi
- if [ ! -f "${generator_config_path}" ]; then
- # 没有找到配置文件
- DDL=""
- return
- fi
- path=$(grep "path =" "${generator_config_path}")
- if [ "$(echo "${path}" | grep -c "/dt=\${dt}")" -eq 0 ]; then
- # 非分区表
- DDL=""
- return
- fi
- if [[ "${path}" =~ .*\.db.* ]]; then
- hive_db_name=$(echo "${path}" | awk -F'/' '{ for(i=1; i<=NF; i++) if($i ~ /\./) { print $i; exit } }' | cut -d '.' -f1)
- hive_table_name=$(echo "${path}" | awk -F'/' '{ for(i=1; i<=NF; i++) if($i ~ /\./) { print $(i+1); exit } }')
- else
- hive_db_name="tmp"
- hive_table_name=$(echo "${path}" | cut -d '/' -f5)
- fi
- DDL="ALTER TABLE ${hive_db_name}.${hive_table_name} ADD IF NOT EXISTS PARTITION(dt=${START_DATE});"
- }
- partitioned_tables=(
- # 示例:`project`_`layer`.`layer`_`project`_`mysql-table-name`
- )
- # DataX mysql-hive配置文件(json)
- job_config_array=(
- # 示例:conf/datax/generated/mysql-hive-`mysql-db-name`-`mysql-table-name`.json
- )
- job_config_directory_array=(
- # 示例:conf/datax/generated
- )
- # DataX作业配置生成器的配置文件
- generator_config_array=(
- # 示例:conf/datax/config/mysql-hdfs/`project`_`layer`/mysql-hive-`mysql-db-name`-`mysql-table-name`.ini
- # conf/datax/config/mysql-hdfs/bms_ods_test/mysql-hdfs-ik_bms_test-activity_labels.ini
- # conf/datax/config/mysql-hdfs/bms_ods_test/mysql-hdfs-ik_bms_test-ar_internal_metadata.ini
- )
- generator_config_directory_array=(
- # 示例:conf/datax/config/mysql-hdfs/`project`_`layer`/
- # conf/datax/config/mysql-hdfs/bms_ods
- # conf/datax/config/mysql-hdfs/bms_ods_test
- # conf/datax/config/mysql-hdfs/crm_ods_dl
- # conf/datax/config/mysql-hdfs/jqr_ods
- # conf/datax/config/mysql-hdfs/skb_ods
- )
- DEFAULT_ARGS=()
- parse_args "${@}"
- if [ "$(uname)" = "Linux" ]; then
- YESTERDAY=$(date -d '-1 day' +%Y%m%d)
- TODAY=$(date +%Y%m%d)
- else
- YESTERDAY=$(date -v-1d +%Y%m%d)
- TODAY=$(date +%Y%m%d)
- fi
- if [ -z "${START_DATE}" ]; then
- START_DATE=${YESTERDAY}
- fi
- if [ -z "${STOP_DATE}" ]; then
- STOP_DATE=${TODAY}
- fi
- DEFAULT_ARGS+=("-start-date=${START_DATE}")
- DEFAULT_ARGS+=("-stop-date=${STOP_DATE}")
- HIVE_DDL=()
- # 显式声明的表
- for table in "${partitioned_tables[@]}"; do
- HIVE_DDL+=("ALTER TABLE ${table} add partition(dt=${START_DATE});")
- done
- # 从DataX作业配置生成器配置文件名称中解析出Hive表名
- for generator_config in "${generator_config_array[@]}"; do
- # 形如:conf/datax/config/mysql-hdfs/`project`_`layer`/mysql-hdfs-`mysql-db-name`-`mysql-table-name`.ini
- parse_ddl "${generator_config}"
- if [ -n "${DDL}" ]; then
- HIVE_DDL+=("${DDL}")
- fi
- done
- # 从DataX作业配置生成器配置文件中解析出的表
- for generator_config_directory in "${generator_config_directory_array[@]}"; do
- # 形如:conf/datax/config/mysql-hdfs/`project`_`layer`/
- if [ ! -f "${generator_config_directory}" ]; then
- generator_config_directory="${BASE_DIR}/${generator_config_directory}"
- fi
- pretty_print "${NORM_MGT}处理生成器配置文件目录 ${NORM_GRN}${generator_config_directory}"
- for generator_config in "${generator_config_directory}"/*; do
- # 形如:conf/datax/config/mysql-hdfs/`project`_`layer`/mysql-hdfs-`mysql-db-name`-`mysql-table-name`.ini
- parse_ddl "${generator_config}"
- if [ -n "${DDL}" ]; then
- HIVE_DDL+=("${DDL}")
- fi
- done
- done
- if [ -n "${SKIP_ADD_PARTITION}" ]; then
- pretty_print "${NORM_YEL}跳过添加Hive分区(-skip-add-partition)"
- else
- if [ ${#HIVE_DDL[@]} -eq 0 ]; then
- pretty_print "${NORM_YEL}没有需要创建Hive新分区的表"
- fi
- for ddl in "${HIVE_DDL[@]}"; do
- pretty_print "${NORM_MGT}创建Hive新分区:${NORM_GRN}${ddl}"
- done
- if [ "${#HIVE_DDL[@]}" -gt 0 ]; then
- hive -e "${HIVE_DDL[*]}"
- fi
- fi
- JOB_CONFIG=()
- for job_config in "${job_config_array[@]}"; do
- JOB_CONFIG+=("-c=${job_config}")
- done
- GENERATOR_CONFIG=()
- for generator_config in "${generator_config_array[@]}"; do
- GENERATOR_CONFIG+=("-gc=${generator_config}")
- done
- # 运行DataX作业配置文件列表中定义的作业
- if [ "${#JOB_CONFIG[@]}" -gt 0 ]; then
- "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "${JOB_CONFIG[@]}" "${DEFAULT_ARGS[@]}"
- fi
- for job_config_directory in "${job_config_directory_array[@]}"; do
- "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "-cd=${job_config_directory}" "${DEFAULT_ARGS[@]}"
- done
- if [ "${#GENERATOR_CONFIG[@]}" -gt 0 ]; then
- "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "${GENERATOR_CONFIG[@]}" "${DEFAULT_ARGS[@]}"
- fi
- for generator_config_directory in "${generator_config_directory_array[@]}"; do
- "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "-gcd=${generator_config_directory}" "${DEFAULT_ARGS[@]}"
- done
|