datax-multiple-hive-job-starter.sh 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. #!/bin/bash
  2. #--------------------------------------------------------------------------------------------------
  3. # 分布式并行启动多个DataX MySQL-Hive作业
  4. # 1. 可以同时通过4种方式来指定作业,但作业中是否有重复的配置,需要开发者来判断
  5. # 2. 可以传递--override来覆盖脚本内的所有配置,重新传递要执行的作业(方便单独跑失败的作业)
  6. # 3. 运行模式:本机串行、随机串行(意义不大)、本机并行(默认模式)、随机并行
  7. #--------------------------------------------------------------------------------------------------
  8. set -e
  9. BASE_DIR=$(
  10. cd "$(dirname "$(realpath "$0")")/.." || exit
  11. pwd
  12. )
  13. . "${BASE_DIR}"/bin/common/init.sh
  14. function usage() {
  15. echo -e "${NORM_MGT}Usage: $0
  16. ${NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法${DO_RESET}"
  17. echo -e "${NORM_MGT}Usage: $0
  18. ${NORM_CYN}\t[--override] 如果出现override,则只执行传入的配置,文件里定义的配置被忽略
  19. ${NORM_CYN}\t[-t< /=>table] 需要建分区的表
  20. ${NORM_CYN}\t[-jc< /=>job config] DataX作业配置文件(json)
  21. ${NORM_CYN}\t[-jcd< /=>job config directory] DataX作业配置文件(json)夹
  22. ${NORM_CYN}\t[-gc< /=>generator config] DataX作业配置文件生成器的配置文件(ini)
  23. ${NORM_CYN}\t[-gcd< /=>generator config directory] DataX作业配置文件生成器的配置文件(ini)夹
  24. ${NORM_CYN}\t[-start-date< /=>start date] 开始日期(用以筛选数据)
  25. ${NORM_CYN}\t[-stop-date< /=>stop date] 结束日期(用以筛选数据)
  26. ${NORM_CYN}\t[-skip-add-partition] 跳过添加分区
  27. ${NORM_CYN}\t[-skip-datax] 跳过DataX导出作业
  28. ${NORM_CYN}\t[-random] 随机选择Worker(默认本机执行,易造成压力大)
  29. ${NORM_CYN}\t[-parallel] 并行执行(默认串行)
  30. ${DO_RESET}"
  31. exit "$1"
  32. }
  33. function parse_args() {
  34. for index in $(seq 1 $#); do
  35. arg=${*:index:1}
  36. case $arg in
  37. --override)
  38. partitioned_tables=()
  39. job_config_array=()
  40. job_config_directory_array=()
  41. generator_config_array=()
  42. generator_config_directory_array=()
  43. ;;
  44. *) ;;
  45. esac
  46. done
  47. for index in $(seq 1 $#); do
  48. arg=${*:index:1}
  49. case $arg in
  50. -t)
  51. index=$((index + 1))
  52. TABLE="${*:index:1}"
  53. partitioned_tables+=("${TABLE}")
  54. ;;
  55. -t=*)
  56. TABLE="${arg#*=}"
  57. partitioned_tables+=("${TABLE}")
  58. ;;
  59. -jc)
  60. index=$((index + 1))
  61. JOB_CONFIG="${*:index:1}"
  62. job_config_array+=("${JOB_CONFIG}")
  63. ;;
  64. -jc=*)
  65. JOB_CONFIG="${arg#*=}"
  66. job_config_array+=("${JOB_CONFIG}")
  67. ;;
  68. -jcd)
  69. index=$((index + 1))
  70. JCD="${*:index:1}"
  71. job_config_directory_array+=("${JCD}")
  72. ;;
  73. -jcd=*)
  74. TABLE="${arg#*=}"
  75. job_config_directory_array+=("${JCD}")
  76. ;;
  77. -gc)
  78. index=$((index + 1))
  79. GC="${*:index:1}"
  80. generator_config_array+=("${GC}")
  81. ;;
  82. -gc=*)
  83. GC="${arg#*=}"
  84. generator_config_array+=("${GC}")
  85. ;;
  86. -gcd)
  87. index=$((index + 1))
  88. GCD="${*:index:1}"
  89. generator_config_directory_array+=("${GCD}")
  90. ;;
  91. -gcd=*)
  92. GCD="${arg#*=}"
  93. generator_config_directory_array+=("${GCD}")
  94. ;;
  95. -start-date)
  96. index=$((index + 1))
  97. START_DATE="${*:index:1}"
  98. ;;
  99. -start-date=*)
  100. START_DATE="${arg#*=}"
  101. ;;
  102. -stop-date)
  103. index=$((index + 1))
  104. STOP_DATE="${*:index:1}"
  105. ;;
  106. -stop-date=*)
  107. STOP_DATE="${arg#*=}"
  108. ;;
  109. -skip-add-partition)
  110. SKIP_ADD_PARTITION="true"
  111. ;;
  112. -skip-datax)
  113. DEFAULT_ARGS+=("-skip-datax")
  114. ;;
  115. -random)
  116. DEFAULT_ARGS+=("-random")
  117. ;;
  118. -parallel)
  119. DEFAULT_ARGS+=("-parallel")
  120. ;;
  121. -h | -H | --h | --H | --help)
  122. usage 0
  123. ;;
  124. *) ;;
  125. esac
  126. done
  127. pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}"
  128. }
  129. function parse_ddl() {
  130. generator_config="${1}"
  131. if [ ! -f "${generator_config}" ]; then
  132. generator_config_path="${BASE_DIR}/${generator_config}"
  133. else
  134. generator_config_path="${generator_config}"
  135. fi
  136. if [ ! -f "${generator_config_path}" ]; then
  137. # 没有找到配置文件
  138. DDL=""
  139. return
  140. fi
  141. path=$(grep "path =" "${generator_config_path}")
  142. if [ "$(echo "${path}" | grep -c "/dt=\${dt}")" -eq 0 ]; then
  143. # 非分区表
  144. DDL=""
  145. return
  146. fi
  147. if [[ "${path}" =~ .*\.db.* ]]; then
  148. hive_db_name=$(echo "${path}" | awk -F'/' '{ for(i=1; i<=NF; i++) if($i ~ /\./) { print $i; exit } }' | cut -d '.' -f1)
  149. hive_table_name=$(echo "${path}" | awk -F'/' '{ for(i=1; i<=NF; i++) if($i ~ /\./) { print $(i+1); exit } }')
  150. else
  151. hive_db_name="tmp"
  152. hive_table_name=$(echo "${path}" | cut -d '/' -f5)
  153. fi
  154. DDL="ALTER TABLE ${hive_db_name}.${hive_table_name} ADD IF NOT EXISTS PARTITION(dt=${START_DATE});"
  155. }
  156. partitioned_tables=(
  157. # 示例:`project`_`layer`.`layer`_`project`_`mysql-table-name`
  158. )
  159. # DataX mysql-hive配置文件(json)
  160. job_config_array=(
  161. # 示例:conf/datax/generated/mysql-hive-`mysql-db-name`-`mysql-table-name`.json
  162. )
  163. job_config_directory_array=(
  164. # 示例:conf/datax/generated
  165. )
  166. # DataX作业配置生成器的配置文件
  167. generator_config_array=(
  168. # 示例:conf/datax/config/mysql-hdfs/`project`_`layer`/mysql-hive-`mysql-db-name`-`mysql-table-name`.ini
  169. # conf/datax/config/mysql-hdfs/bms_ods_test/mysql-hdfs-ik_bms_test-activity_labels.ini
  170. # conf/datax/config/mysql-hdfs/bms_ods_test/mysql-hdfs-ik_bms_test-ar_internal_metadata.ini
  171. )
  172. generator_config_directory_array=(
  173. # 示例:conf/datax/config/mysql-hdfs/`project`_`layer`/
  174. # conf/datax/config/mysql-hdfs/bms_ods
  175. # conf/datax/config/mysql-hdfs/bms_ods_test
  176. # conf/datax/config/mysql-hdfs/crm_ods_dl
  177. # conf/datax/config/mysql-hdfs/jqr_ods
  178. # conf/datax/config/mysql-hdfs/skb_ods
  179. )
  180. DEFAULT_ARGS=()
  181. parse_args "${@}"
  182. if [ "$(uname)" = "Linux" ]; then
  183. YESTERDAY=$(date -d '-1 day' +%Y%m%d)
  184. TODAY=$(date +%Y%m%d)
  185. else
  186. YESTERDAY=$(date -v-1d +%Y%m%d)
  187. TODAY=$(date +%Y%m%d)
  188. fi
  189. if [ -z "${START_DATE}" ]; then
  190. START_DATE=${YESTERDAY}
  191. fi
  192. if [ -z "${STOP_DATE}" ]; then
  193. STOP_DATE=${TODAY}
  194. fi
  195. DEFAULT_ARGS+=("-start-date=${START_DATE}")
  196. DEFAULT_ARGS+=("-stop-date=${STOP_DATE}")
  197. HIVE_DDL=()
  198. # 显式声明的表
  199. for table in "${partitioned_tables[@]}"; do
  200. HIVE_DDL+=("ALTER TABLE ${table} add partition(dt=${START_DATE});")
  201. done
  202. # 从DataX作业配置生成器配置文件名称中解析出Hive表名
  203. for generator_config in "${generator_config_array[@]}"; do
  204. # 形如:conf/datax/config/mysql-hdfs/`project`_`layer`/mysql-hdfs-`mysql-db-name`-`mysql-table-name`.ini
  205. parse_ddl "${generator_config}"
  206. if [ -n "${DDL}" ]; then
  207. HIVE_DDL+=("${DDL}")
  208. fi
  209. done
  210. # 从DataX作业配置生成器配置文件中解析出的表
  211. for generator_config_directory in "${generator_config_directory_array[@]}"; do
  212. # 形如:conf/datax/config/mysql-hdfs/`project`_`layer`/
  213. if [ ! -f "${generator_config_directory}" ]; then
  214. generator_config_directory="${BASE_DIR}/${generator_config_directory}"
  215. fi
  216. pretty_print "${NORM_MGT}处理生成器配置文件目录 ${NORM_GRN}${generator_config_directory}"
  217. for generator_config in "${generator_config_directory}"/*; do
  218. # 形如:conf/datax/config/mysql-hdfs/`project`_`layer`/mysql-hdfs-`mysql-db-name`-`mysql-table-name`.ini
  219. parse_ddl "${generator_config}"
  220. if [ -n "${DDL}" ]; then
  221. HIVE_DDL+=("${DDL}")
  222. fi
  223. done
  224. done
  225. if [ -n "${SKIP_ADD_PARTITION}" ]; then
  226. pretty_print "${NORM_YEL}跳过添加Hive分区(-skip-add-partition)"
  227. else
  228. if [ ${#HIVE_DDL[@]} -eq 0 ]; then
  229. pretty_print "${NORM_YEL}没有需要创建Hive新分区的表"
  230. fi
  231. for ddl in "${HIVE_DDL[@]}"; do
  232. pretty_print "${NORM_MGT}创建Hive新分区:${NORM_GRN}${ddl}"
  233. done
  234. if [ "${#HIVE_DDL[@]}" -gt 0 ]; then
  235. hive -e "${HIVE_DDL[*]}"
  236. fi
  237. fi
  238. JOB_CONFIG=()
  239. for job_config in "${job_config_array[@]}"; do
  240. JOB_CONFIG+=("-c=${job_config}")
  241. done
  242. GENERATOR_CONFIG=()
  243. for generator_config in "${generator_config_array[@]}"; do
  244. GENERATOR_CONFIG+=("-gc=${generator_config}")
  245. done
  246. # 运行DataX作业配置文件列表中定义的作业
  247. if [ "${#JOB_CONFIG[@]}" -gt 0 ]; then
  248. "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "${JOB_CONFIG[@]}" "${DEFAULT_ARGS[@]}"
  249. fi
  250. for job_config_directory in "${job_config_directory_array[@]}"; do
  251. "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "-cd=${job_config_directory}" "${DEFAULT_ARGS[@]}"
  252. done
  253. if [ "${#GENERATOR_CONFIG[@]}" -gt 0 ]; then
  254. "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "${GENERATOR_CONFIG[@]}" "${DEFAULT_ARGS[@]}"
  255. fi
  256. for generator_config_directory in "${generator_config_directory_array[@]}"; do
  257. "${BASE_DIR}"/bin/datax-multiple-job-starter.sh "-gcd=${generator_config_directory}" "${DEFAULT_ARGS[@]}"
  258. done