datax-single-job-starter.sh 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. #!/bin/bash
  2. #--------------------------------------------------------------------------------------------------
  3. # 启动单个DataX作业
  4. # 1. 注意确定Python3的路径`PYTHON3_PATH'、DataX的安装目录`DATAX_HOME`以及DataX Workers的声明——`datax_workers`
  5. # 2. 可以用-c传入生成好的DataX作业Json配置文件(绝对路径),或-gc传入ini文件(相对路径)
  6. # 3. ini文件的名称格式为:源系统类型-目标系统类型-数据库名称-数据集名称.ini
  7. #--------------------------------------------------------------------------------------------------
  8. CURRENT_DIR=$(pwd)
  9. BASE_DIR=$(
  10. cd "$(dirname "$(realpath "$0")")/.." || exit
  11. pwd
  12. )
  13. . "${BASE_DIR}"/bin/common/init.sh
  14. function usage() {
  15. echo -e "${NORM_MGT}Usage: $0
  16. ${NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法${DO_RESET}"
  17. echo -e "${NORM_MGT}Usage: $0
  18. ${NORM_GRN}\t<-c< /=>job config> DataX作业配置文件(.json,绝对路径)
  19. ${NORM_GRN}\t<-gc< /=>generator config> DataX作业配置生成器配置文件(.ini,项目内相对路径或绝对路径),-c优先
  20. ${NORM_CYN}\t[-start-date< /=>start date] 开始日期(用以筛选数据)
  21. ${NORM_CYN}\t[-stop-date< /=>stoP date] 结束日期(用以筛选数据)
  22. ${NORM_CYN}\t[-host< /=>host] 执行作业的主机,非${RELEASE_USER}用户或host和random都未指定则在当前机器执行,指定host优先于随机选择主机
  23. ${NORM_CYN}\t[-random] 随机选择主机,非${RELEASE_USER}用户或host和random都未指定则在当前机器执行
  24. ${NORM_CYN}\t[-skip-datax] 跳过DataX导出作业
  25. ${DO_RESET}"
  26. exit "$1"
  27. }
  28. function select_worker() {
  29. pretty_print "${NORM_MGT}本次作业执行用户为 ${NORM_GRN}${USER}"
  30. if [ -z "${IS_RUN_BY_RELEASE_USER}" ]; then
  31. # 非 ${RELEASE_USER} 用户只能在本机执行
  32. selected_worker=${CURRENT_HOST}
  33. pretty_print "${NORM_MGT}非 ${NORM_GRN}${RELEASE_USER}${NORM_MGT} 用户限制以本机 ${NORM_GRN}${CURRENT_HOST} ${NORM_MGT}为Worker运行作业"
  34. elif [ -z "${IS_RUN_IN_RELEASE_DIR}" ]; then
  35. # 非 ${RELEASE_USER} 用户只能在本机执行
  36. selected_worker=${CURRENT_HOST}
  37. pretty_print "${NORM_MGT}非发布目录 (${NORM_GRN}${RELEASE_ROOT_DIR}${NORM_MGT}) 下限制以本机 ${NORM_GRN}${CURRENT_HOST} ${NORM_MGT}为Worker运行作业"
  38. else
  39. if [ -n "${HOST}" ]; then
  40. pretty_print "${NORM_MGT}用户指定执行Worker为 ${NORM_GRN}${HOST}"
  41. selected_worker=${HOST}
  42. elif [ -n "${IS_RANDOM}" ]; then
  43. # 生成一个>=0, <数组长度的随机数
  44. worker_index=$((RANDOM % ${#DATAX_WORKERS_QUEUE[@]}))
  45. selected_worker=${DATAX_WORKERS_QUEUE[${worker_index}]}
  46. pretty_print "${NORM_MGT}用户指定随机选择Worker, 执行Worker为 ${NORM_GRN}${selected_worker}"
  47. else
  48. # 只能在本机执行的情况
  49. selected_worker=${CURRENT_HOST}
  50. pretty_print "${NORM_MGT}用户既未指定Worker,也未选择随机决定Worker, 执行Worker为本机 ${NORM_GRN}${CURRENT_HOST}"
  51. fi
  52. fi
  53. }
  54. function generate_job_config() {
  55. if [ -z "${JOB_CONFIG_PATH}" ]; then
  56. # 未提供`DataX作业配置文件`
  57. # 由提供的`DataX作业配置生成器配置文件`生成`DataX作业配置文件`
  58. if [ "${selected_worker}" == "${CURRENT_HOST}" ]; then
  59. ${PYTHON3_PATH} -u "${BASE_DIR}"/bin/datax-job-config-generator.py \
  60. -c "${GENERATOR_CONFIG_PATH}" \
  61. -start-date "${START_DATE}" \
  62. -stop-date "${STOP_DATE}"
  63. else
  64. ssh "${selected_worker}" "${PYTHON3_PATH}" -u "${BASE_DIR}"/bin/datax-job-config-generator.py \
  65. -c "${GENERATOR_CONFIG_PATH}" \
  66. -start-date "${START_DATE}" \
  67. -stop-date "${STOP_DATE}"
  68. fi
  69. # shellcheck disable=SC2181
  70. if [ "$?" -ne 0 ]; then
  71. pretty_print "${NORM_MGT}使用配置文件 ${NORM_GRN}${GENERATOR_CONFIG_RELATIVE_PATH} ${NORM_MGT}生成DataX作业配置文件失败"
  72. # fi
  73. exit 1
  74. fi
  75. TEMP=$(dirname "${GENERATOR_CONFIG_PATH}")
  76. TEMP=${TEMP#"${BASE_DIR}/"}
  77. TEMP=${TEMP#"conf/datax/config/"}
  78. SRC_DST=$(echo "${TEMP}" | cut -d '/' -f1)
  79. PROJECT_LAYER_ENV=$(echo "${TEMP}" | cut -d '/' -f2)
  80. DB_ENV=$(echo "${TEMP}" | cut -d '/' -f3)
  81. GROUP=$(echo "${TEMP}" | cut -d '/' -f4)
  82. NEW=$(echo "${TEMP}" | cut -d '/' -f5)
  83. # 修改生成的作业名称,能够识别多级目录
  84. JOB_NAME=$(basename "${GENERATOR_CONFIG_PATH}" .ini)
  85. JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${TEMP}/${JOB_NAME}.json"
  86. # if [ -n "${NEW}" ]; then
  87. # JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${GROUP}/${NEW}/${JOB_NAME}.json"
  88. # elif [ -n "${GROUP}" ]; then
  89. # JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${GROUP}/${JOB_NAME}.json"
  90. # elif [ -n "${DB_ENV}" ]; then
  91. # JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${SRC_DST}/${PROJECT_LAYER_ENV}/${DB_ENV}/${JOB_NAME}.json"
  92. # else
  93. # JOB_CONFIG_RELATIVE_PATH="conf/datax/generated/${SRC_DST}/${PROJECT_LAYER_ENV}/${JOB_NAME}.json"
  94. # fi
  95. JOB_CONFIG_PATH="${BASE_DIR}/${JOB_CONFIG_RELATIVE_PATH}"
  96. else
  97. # 提供了`DataX作业配置文件`
  98. if [ ! -f "${JOB_CONFIG_PATH}" ]; then
  99. # 如果脚本不是在根目录下执行,相对路径的配置文件是找不到的,因此要变成绝对路径
  100. JOB_CONFIG_RELATIVE_PATH=${JOB_CONFIG_PATH}
  101. JOB_CONFIG_PATH="${BASE_DIR}/${JOB_CONFIG_RELATIVE_PATH}"
  102. elif [[ "${JOB_CONFIG_PATH}" =~ "${BASE_DIR}".* ]]; then
  103. # `DataX作业配置文件`是绝对路径
  104. JOB_CONFIG_RELATIVE_PATH=${JOB_CONFIG_PATH#"${BASE_DIR}/"}
  105. else
  106. # 执行目录下可找到的相对路径,加上当前目录
  107. JOB_CONFIG_PATH="${CURRENT_DIR}/${JOB_CONFIG_PATH}"
  108. JOB_CONFIG_RELATIVE_PATH=${JOB_CONFIG_PATH#"${BASE_DIR}/"}
  109. fi
  110. # PROJECT_LAYER_ENV=$(basename "$(dirname "${JOB_CONFIG_PATH}")")
  111. # SRC_DST=$(basename "$(dirname "$(dirname "${JOB_CONFIG_PATH}")")")
  112. TEMP=$(dirname "${JOB_CONFIG_PATH}")
  113. TEMP=${TEMP#"${BASE_DIR}/"}
  114. TEMP=${TEMP#"conf/datax/generated/"}
  115. SRC_DST=$(echo "${TEMP}" | cut -d '/' -f1)
  116. PROJECT_LAYER_ENV=$(echo "${TEMP}" | cut -d '/' -f2)
  117. DB_ENV=$(echo "${TEMP}" | cut -d '/' -f3)
  118. GROUP=$(echo "${TEMP}" | cut -d '/' -f4)
  119. JOB_NAME=$(basename "${JOB_CONFIG_PATH}" .json)
  120. fi
  121. datax_run_command="${PYTHON3_PATH} -u ${DATAX_HOME}/bin/datax.py ${JOB_CONFIG_PATH}"
  122. }
  123. function check_data_exists() {
  124. DATA_EXISTS="true"
  125. if [[ "${JOB_CONFIG_PATH}" =~ ${BASE_DIR}/.*/.*hdfs[-_].* ]]; then
  126. path=$(jq -r '.job.content[0].reader.parameter.path' "${JOB_CONFIG_PATH}")
  127. if ! hadoop fs -test -e "${path}"; then
  128. pretty_print "${NORM_MGT}HDFS 路径 ${NORM_GRN}${path}${NORM_MGT} 不存在,DataX不需要执行"
  129. exit 0
  130. fi
  131. if ! hadoop fs -test -e "${path}/*"; then
  132. pretty_print "${NORM_MGT}HDFS 路径 ${NORM_GRN}${path}${NORM_MGT} 中没有数据,DataX不需要执行"
  133. DATA_EXISTS=""
  134. fi
  135. disk_usage=$(hadoop fs -du -s "${path}" | cut -d ' ' -f1)
  136. if [ "${disk_usage}" -eq 0 ]; then
  137. pretty_print "${NORM_MGT}HDFS 路径 ${NORM_GRN}${path}${NORM_MGT} 中没有数据,DataX不需要执行"
  138. DATA_EXISTS=""
  139. fi
  140. fi
  141. }
  142. function parse_args() {
  143. for index in $(seq 1 $#); do
  144. arg=${*:index:1}
  145. case $arg in
  146. -c)
  147. index=$((index + 1))
  148. JOB_CONFIG_PATH="${*:index:1}"
  149. ;;
  150. -c=*)
  151. JOB_CONFIG_PATH=${arg#*=}
  152. ;;
  153. -gc)
  154. index=$((index + 1))
  155. GENERATOR_CONFIG_PATH="${*:index:1}"
  156. ;;
  157. -gc=*)
  158. GENERATOR_CONFIG_PATH=${arg#*=}
  159. ;;
  160. -start-date)
  161. index=$((index + 1))
  162. START_DATE="${*:index:1}"
  163. ;;
  164. -start-date=*)
  165. START_DATE="${arg#*=}"
  166. ;;
  167. -stop-date)
  168. index=$((index + 1))
  169. STOP_DATE="${*:index:1}"
  170. ;;
  171. -stop-date=*)
  172. STOP_DATE="${arg#*=}"
  173. ;;
  174. -host)
  175. index=$((index + 1))
  176. HOST="${*:index:1}"
  177. ;;
  178. -host=*)
  179. HOST="${arg#*=}"
  180. ;;
  181. -random)
  182. IS_RANDOM='true'
  183. ;;
  184. -skip-datax)
  185. SKIP_DATAX='true'
  186. ;;
  187. -h | -H | --h | --H | --help)
  188. usage 0
  189. ;;
  190. *) ;;
  191. esac
  192. done
  193. pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}"
  194. }
  195. function prepare() {
  196. if [ -z "${JOB_CONFIG_PATH}" ] && [ -z "${GENERATOR_CONFIG_PATH}" ]; then
  197. pretty_print "${NORM_RED}请使用 -gc 提供DataX作业配置生成器配置文件(.ini文件)或使用 -c 提供DataX作业配置文件(.json文件)"
  198. usage 1
  199. fi
  200. if [ "$(uname)" = "Linux" ]; then
  201. YESTERDAY=$(date -d '-1 day' +%Y%m%d)
  202. TODAY=$(date +%Y%m%d)
  203. else
  204. YESTERDAY=$(date -v-1d +%Y%m%d)
  205. TODAY=$(date +%Y%m%d)
  206. fi
  207. if [ -z "${START_DATE}" ]; then
  208. START_DATE=${YESTERDAY}
  209. fi
  210. if [ -z "${STOP_DATE}" ]; then
  211. STOP_DATE=${TODAY}
  212. fi
  213. if [ -n "${JOB_CONFIG_PATH}" ]; then
  214. JOB_CONFIG_RELATIVE_PATH=${JOB_CONFIG_PATH#"${BASE_DIR}/"}
  215. pretty_print "${NORM_MGT}使用配置文件 ${NORM_GRN}${JOB_CONFIG_RELATIVE_PATH} ${NORM_MGT}运行DataX作业"
  216. fi
  217. if [ -n "${GENERATOR_CONFIG_PATH}" ]; then
  218. GENERATOR_CONFIG_RELATIVE_PATH=${GENERATOR_CONFIG_PATH#"${BASE_DIR}/"}
  219. pretty_print "${NORM_MGT}使用DataX作业配置生成器配置文件 ${NORM_GRN}${GENERATOR_CONFIG_RELATIVE_PATH} ${NORM_MGT}运行DataX作业"
  220. fi
  221. }
  222. function run_single_datax_job() {
  223. generate_job_config
  224. if [[ "${JOB_CONFIG_PATH}" =~ .*/hdfs-kafka/.* ]]; then
  225. pretty_print "${NORM_GRN}写kafka的ini文件中,writer部分的注意事项:"
  226. pretty_print "${NORM_GRN}1. column,与reader的column在顺序上应一一对应,可在此处实现列名的变换"
  227. pretty_print "${NORM_GRN}2. columnType,支持的显式类型(类型书写不区分大小写,结构体子字段名称区分大小写)有:"
  228. pretty_print "${NORM_GRN}(1)id,表示该字段的值是es document的id"
  229. pretty_print "${NORM_GRN}(2)array<string>,如果字段在hive中的定义是array<string>,但其element是数值(包括0开头的字符串数值)、bool的,应显式指定其类型;其他element类型本就是string或者数值、bool,则可以不指明"
  230. pretty_print "${NORM_GRN}(3)array<struct<?,?,?>>,如果字段在hive中的定义是array<struct<?,?,?>>,应当显式指定,其结构体属性的名称和类型,名称和类型的分隔符为@,属性间的分隔符为#,例如:array<struct<col1@bigint#col2@string>>,如果字段是已经序列化的命名结构体数组,则不需要指明类型"
  231. pretty_print "${NORM_GRN}(4)struct<?,?,?>,如果字段在hive中的定义是struct<?,?,?>,应当显式指定,其结构体属性的名称和类型,名称和类型的分隔符为@,例如:struct<col1@bigint#col2@string>,如果字段是已经序列化的命名结构体,则不需要指明类型"
  232. pretty_print "${NORM_GRN}(5)json,如果字段是序列化后的json,应指明需要进行反序列化"
  233. pretty_print "${NORM_GRN}(6)其他类型,比如int、long、bigint、bool、double等,都不需要指明类型"
  234. pretty_print "${NORM_GRN}3. columnMapping,同hive es mapping表tblproperties['es.mapping.names']"
  235. pretty_print "${NORM_GRN}注:不管columnType还是结构体属性字段名称,都会使用columnMapping进行字段名称转换"
  236. pretty_print "${NORM_RED}重要:array的element、struct的字段值,一定不要包含英文逗号,否则会出现切分的值不对的情况!!!"
  237. fi
  238. if [ "${selected_worker}" = "${CURRENT_HOST}" ]; then
  239. if [ -z "${SKIP_DATAX}" ]; then
  240. check_data_exists
  241. if [ "${DATA_EXISTS}" = 'true' ]; then
  242. pretty_print "${NORM_MGT}在 ${NORM_GRN}${selected_worker} ${NORM_MGT}上执行以下命令:
  243. ${NORM_GRN}${datax_run_command}"
  244. echo -en "${NORM_GRN}"
  245. # 本机执行
  246. ${datax_run_command}
  247. fi
  248. else
  249. pretty_print "${NORM_MGT}DataX job was set to skipped"
  250. fi
  251. else
  252. if [ -z "${SKIP_DATAX}" ]; then
  253. check_data_exists
  254. if [ "${DATA_EXISTS}" = 'true' ]; then
  255. pretty_print "${NORM_MGT}在 ${NORM_GRN}${CURRENT_HOST} ${NORM_MGT}上执行以下命令:
  256. ${NORM_GRN}ssh ${RELEASE_USER}@${selected_worker} ${datax_run_command}"
  257. echo -en "${NORM_GRN}"
  258. # shellcheck disable=SC2029
  259. ssh "${selected_worker}" "${datax_run_command}"
  260. fi
  261. else
  262. pretty_print "${NORM_MGT}DataX job was set to skipped"
  263. fi
  264. fi
  265. }
  266. parse_args "${@}"
  267. prepare
  268. select_worker
  269. run_single_datax_job