#!/bin/bash #-------------------------------------------------------------------------------------------------- # 分布式并行启动多个DataX MySQL-Hive作业 # 1. 可以同时通过4种方式来指定作业,但作业中是否有重复的配置,需要开发者来判断 # 2. 可以传递--override来覆盖脚本内的所有配置,重新传递要执行的作业(方便单独跑失败的作业) # 3. 运行模式:本机串行、随机串行(意义不大)、本机并行(默认模式)、随机并行 #-------------------------------------------------------------------------------------------------- set -e BASE_DIR=$( cd "$(dirname "$(realpath "$0")")/.." || exit pwd ) . "${BASE_DIR}"/bin/common/init.sh function usage() { echo -e "${NORM_MGT}Usage: $0 ${NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法${DO_RESET}" echo -e "${NORM_MGT}Usage: $0 ${NORM_CYN}\t[--override] 如果出现override,则只执行传入的配置,文件里定义的配置被忽略 ${NORM_CYN}\t[-t< /=>table] 需要建分区的表 ${NORM_CYN}\t[-jc< /=>job config] DataX作业配置文件(json) ${NORM_CYN}\t[-jcd< /=>job config directory] DataX作业配置文件(json)夹 ${NORM_CYN}\t[-gc< /=>generator config] DataX作业配置文件生成器的配置文件(ini) ${NORM_CYN}\t[-gcd< /=>generator config directory] DataX作业配置文件生成器的配置文件(ini)夹 ${NORM_CYN}\t[-start-date< /=>start date] 开始日期(用以筛选数据) ${NORM_CYN}\t[-stop-date< /=>stop date] 结束日期(用以筛选数据) ${NORM_CYN}\t[-skip-add-partition] 跳过添加分区 ${NORM_CYN}\t[-skip-datax] 跳过DataX导出作业 ${NORM_CYN}\t[-random] 随机选择Worker(默认本机执行,易造成压力大) ${NORM_CYN}\t[-parallel] 并行执行(默认串行) ${DO_RESET}" exit "$1" } function parse_args() { for index in $(seq 1 $#); do arg=${*:index:1} case $arg in --override) partitioned_tables=() job_config_array=() job_config_directory_array=() generator_config_array=() generator_config_directory_array=() ;; *) ;; esac done for index in $(seq 1 $#); do arg=${*:index:1} case $arg in -t) index=$((index + 1)) TABLE="${*:index:1}" partitioned_tables+=("${TABLE}") ;; -t=*) TABLE="${arg#*=}" partitioned_tables+=("${TABLE}") ;; -jc) index=$((index + 1)) JOB_CONFIG="${*:index:1}" job_config_array+=("${JOB_CONFIG}") ;; -jc=*) JOB_CONFIG="${arg#*=}" job_config_array+=("${JOB_CONFIG}") ;; -jcd) index=$((index + 1)) JCD="${*:index:1}" job_config_directory_array+=("${JCD}") ;; -jcd=*) TABLE="${arg#*=}" job_config_directory_array+=("${JCD}") ;; -gc) index=$((index + 1)) GC="${*:index:1}" generator_config_array+=("${GC}") ;; -gc=*) GC="${arg#*=}" generator_config_array+=("${GC}") ;; -gcd) index=$((index + 1)) GCD="${*:index:1}" generator_config_directory_array+=("${GCD}") ;; -gcd=*) GCD="${arg#*=}" generator_config_directory_array+=("${GCD}") ;; -start-date) index=$((index + 1)) START_DATE="${*:index:1}" ;; -start-date=*) START_DATE="${arg#*=}" ;; -stop-date) index=$((index + 1)) STOP_DATE="${*:index:1}" ;; -stop-date=*) STOP_DATE="${arg#*=}" ;; -skip-add-partition) SKIP_ADD_PARTITION="true" ;; -skip-datax) DEFAULT_ARGS+=("-skip-datax") ;; -random) DEFAULT_ARGS+=("-random") ;; -parallel) DEFAULT_ARGS+=("-parallel") ;; -h | -H | --h | --H | --help) usage 0 ;; *) ;; esac done pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}" } function parse_ddl() { generator_config="${1}" if [ ! -f "${generator_config}" ]; then generator_config_path="${BASE_DIR}/${generator_config}" else generator_config_path="${generator_config}" fi if [ ! -f "${generator_config_path}" ]; then # 没有找到配置文件 DDL="" return fi path=$(grep "path =" "${generator_config_path}") if [ "$(echo "${path}" | grep -c "/dt=\${dt}")" -eq 0 ]; then # 非分区表 DDL="" return fi if [[ "${path}" =~ .*\.db.* ]]; then hive_db_name=$(echo "${path}" | awk -F'/' '{ for(i=1; i<=NF; i++) if($i ~ /\./) { print $i; exit } }' | cut -d '.' -f1) hive_table_name=$(echo "${path}" | awk -F'/' '{ for(i=1; i<=NF; i++) if($i ~ /\./) { print $(i+1); exit } }') else hive_db_name="tmp" hive_table_name=$(echo "${path}" | cut -d '/' -f5) fi DDL="ALTER TABLE ${hive_db_name}.${hive_table_name} ADD IF NOT EXISTS PARTITION(dt=${START_DATE});" } partitioned_tables=( # 示例:`project`_`layer`.`layer`_`project`_`mysql-table-name` ) # DataX mysql-hive配置文件(json) job_config_array=( # 示例:conf/datax/generated/mysql-hive-`mysql-db-name`-`mysql-table-name`.json ) job_config_directory_array=( # 示例:conf/datax/generated ) # DataX作业配置生成器的配置文件 generator_config_array=( # 示例:conf/datax/config/mysql-hdfs/`project`_`layer`/mysql-hive-`mysql-db-name`-`mysql-table-name`.ini # conf/datax/config/mysql-hdfs/bms_ods_test/mysql-hdfs-ik_bms_test-activity_labels.ini # conf/datax/config/mysql-hdfs/bms_ods_test/mysql-hdfs-ik_bms_test-ar_internal_metadata.ini ) generator_config_directory_array=( # 示例:conf/datax/config/mysql-hdfs/`project`_`layer`/ # conf/datax/config/mysql-hdfs/bms_ods # conf/datax/config/mysql-hdfs/bms_ods_test # conf/datax/config/mysql-hdfs/crm_ods_dl # conf/datax/config/mysql-hdfs/jqr_ods # conf/datax/config/mysql-hdfs/skb_ods ) DEFAULT_ARGS=() parse_args "${@}" if [ "$(uname)" = "Linux" ]; then YESTERDAY=$(date -d '-1 day' +%Y%m%d) TODAY=$(date +%Y%m%d) else YESTERDAY=$(date -v-1d +%Y%m%d) TODAY=$(date +%Y%m%d) fi if [ -z "${START_DATE}" ]; then START_DATE=${YESTERDAY} fi if [ -z "${STOP_DATE}" ]; then STOP_DATE=${TODAY} fi DEFAULT_ARGS+=("-start-date=${START_DATE}") DEFAULT_ARGS+=("-stop-date=${STOP_DATE}") HIVE_DDL=() # 显式声明的表 for table in "${partitioned_tables[@]}"; do HIVE_DDL+=("ALTER TABLE ${table} add partition(dt=${START_DATE});") done # 从DataX作业配置生成器配置文件名称中解析出Hive表名 for generator_config in "${generator_config_array[@]}"; do # 形如:conf/datax/config/mysql-hdfs/`project`_`layer`/mysql-hdfs-`mysql-db-name`-`mysql-table-name`.ini parse_ddl "${generator_config}" if [ -n "${DDL}" ]; then HIVE_DDL+=("${DDL}") fi done # 从DataX作业配置生成器配置文件中解析出的表 for generator_config_directory in "${generator_config_directory_array[@]}"; do # 形如:conf/datax/config/mysql-hdfs/`project`_`layer`/ if [ ! -f "${generator_config_directory}" ]; then generator_config_directory="${BASE_DIR}/${generator_config_directory}" fi pretty_print "${NORM_MGT}处理生成器配置文件目录 ${NORM_GRN}${generator_config_directory}" for generator_config in "${generator_config_directory}"/*; do # 形如:conf/datax/config/mysql-hdfs/`project`_`layer`/mysql-hdfs-`mysql-db-name`-`mysql-table-name`.ini parse_ddl "${generator_config}" if [ -n "${DDL}" ]; then HIVE_DDL+=("${DDL}") fi done done if [ -n "${SKIP_ADD_PARTITION}" ]; then pretty_print "${NORM_YEL}跳过添加Hive分区(-skip-add-partition)" else if [ ${#HIVE_DDL[@]} -eq 0 ]; then pretty_print "${NORM_YEL}没有需要创建Hive新分区的表" fi for ddl in "${HIVE_DDL[@]}"; do pretty_print "${NORM_MGT}创建Hive新分区:${NORM_GRN}${ddl}" done if [ "${#HIVE_DDL[@]}" -gt 0 ]; then hive -e "${HIVE_DDL[*]}" fi fi JOB_CONFIG=() for job_config in "${job_config_array[@]}"; do JOB_CONFIG+=("-c=${job_config}") done GENERATOR_CONFIG=() for generator_config in "${generator_config_array[@]}"; do GENERATOR_CONFIG+=("-gc=${generator_config}") done # 运行DataX作业配置文件列表中定义的作业 if [ "${#JOB_CONFIG[@]}" -gt 0 ]; then "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "${JOB_CONFIG[@]}" "${DEFAULT_ARGS[@]}" fi for job_config_directory in "${job_config_directory_array[@]}"; do "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "-cd=${job_config_directory}" "${DEFAULT_ARGS[@]}" done if [ "${#GENERATOR_CONFIG[@]}" -gt 0 ]; then "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "${GENERATOR_CONFIG[@]}" "${DEFAULT_ARGS[@]}" fi for generator_config_directory in "${generator_config_directory_array[@]}"; do "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "-gcd=${generator_config_directory}" "${DEFAULT_ARGS[@]}" done