flume-control.sh 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. #!/bin/bash
  2. #--------------------------------------------------------------------------------------------------
  3. # 启动单个flume-kafka-hdfs作业
  4. # 1. 要求配置文件`conf/flume/kafka-hdfs-${ }.properties`必须存在
  5. # 2. 可在console上查看到作业是否启动成功
  6. # 3. 可通过查看日志`${LOG_ROOT_DIR}/flume-agent/${TODAY}/${JOB_NAME}.log`来确定作业运行情况
  7. #--------------------------------------------------------------------------------------------------
  8. BASE_DIR=$(
  9. cd "$(dirname "$(realpath "$0")")/.." || exit
  10. pwd
  11. )
  12. . "${BASE_DIR}"/bin/common/init.sh
  13. LOG_ROOT_DIR="/opt/data/log"
  14. function usage() {
  15. echo -e "${NORM_MGT}Usage: $0
  16. ${NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法${DO_RESET}"
  17. echo -e "${NORM_MGT}Usage: $0
  18. ${NORM_GRN}\t<log|monitor|start|start-all|status|stop|stop-all|restart|restart-all> 程序操作:log、monitor、start、start-all、status、stop、stop-all、restart、restart-all
  19. ${NORM_GRN}\t<config name> 配置名称,配置文件名称要求为kafka-hdfs-<config>.properties
  20. ${DO_RESET}"
  21. exit "$1"
  22. }
  23. function log() {
  24. status
  25. if [ -z "${current_log_file}" ]; then
  26. pretty_print "${NORM_RED}未找到任何有效的日志文件"
  27. exit 1
  28. else
  29. tail -100f "${current_log_file}"
  30. fi
  31. }
  32. function start() {
  33. if [ "$(uname)" = "Linux" ]; then
  34. TODAY=$(date +%Y%m%d)
  35. else
  36. TODAY=$(date +%Y%m%d)
  37. fi
  38. LOG_DIR="${LOG_ROOT_DIR}/flume-agent/${TODAY}"
  39. LOG_FILE_PATH="${LOG_DIR}/${CONFIG_NAME}.log"
  40. if [ ! -d "${LOG_DIR}" ]; then
  41. mkdir -p "${LOG_DIR}"
  42. pretty_print "${NORM_MGT}创建日志目录 ${NORM_GRN}${LOG_DIR}"
  43. fi
  44. count=$(ps -axo command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | wc -l)
  45. if [ "${count}" -gt 0 ]; then
  46. pretty_print "${NORM_RED}使用配置文件 ${NORM_GRN}${JOB_CONFIG_FILE_NAME} ${NORM_RED}的Flume作业已在运行中"
  47. else
  48. pretty_print "${NORM_MGT}使用作业名称 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}启动Flume作业"
  49. flume-ng agent \
  50. -Xms256m -Xmx4g \
  51. --conf /etc/flume-ng/conf/ \
  52. --conf-file "${JOB_CONFIG_FILE}" \
  53. --name a1 \
  54. -Dflume.root.logger=INFO,console >>"${LOG_FILE_PATH}" 2>&1 &
  55. FLUME_APPLICATION_PID=$!
  56. pretty_print "${NORM_MGT}Flume作业已启动,pid为 ${NORM_GRN}${FLUME_APPLICATION_PID}${NORM_MGT},日志文件为 ${NORM_GRN}${LOG_FILE_PATH}"
  57. fi
  58. }
  59. function start-all() {
  60. for JOB_CONFIG_FILE in ${BASE_DIR}/conf/flume/*.properties; do
  61. JOB_CONFIG_FILE_NAME=$(basename ${JOB_CONFIG_FILE})
  62. CONFIG_FULL_NAME=$(basename ${JOB_CONFIG_FILE_NAME} .properties)
  63. CONFIG_NAME=$(echo "${CONFIG_FULL_NAME}" | sed "s/kafka-hdfs-//g")
  64. start
  65. done
  66. }
  67. function status() {
  68. agent_pid=$(ps -axo pid,command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | awk -F ' ' '{print $1}')
  69. if [ -n "${agent_pid}" ]; then
  70. mapfile -t log_files < <(find "${LOG_ROOT_DIR}"/flume-agent -name "*${CONFIG_NAME}.log" | sort -r)
  71. if [ "${#log_files[@]}" -gt 0 ]; then
  72. current_log_file="${log_files[0]}"
  73. fi
  74. pretty_print "${NORM_MGT}Flume agent ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}is running at pid ${NORM_GRN}${agent_pid}"
  75. else
  76. pretty_print "${NORM_MGT}Flume agent ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}is not running"
  77. fi
  78. }
  79. function stop() {
  80. agent_pid=$(ps -axo pid,command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | awk -F ' ' '{print $1}')
  81. if [ -z "${agent_pid}" ]; then
  82. pretty_print "${NORM_MGT}Flume作业 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}并未运行"
  83. return
  84. else
  85. pretty_print "${NORM_MGT}停止Flume作业 ${NORM_GRN}${CONFIG_NAME}(${agent_pid})"
  86. kill -15 "${agent_pid}"
  87. fi
  88. agent_pid=$(ps -axo pid,command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | awk -F ' ' '{print $1}')
  89. if [ -z "${agent_pid}" ]; then
  90. pretty_print "${NORM_MGT}Flume作业 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}已停止"
  91. else
  92. pretty_print "${NORM_MGT}Flume作业 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}停止失败"
  93. fi
  94. }
  95. function stop-all() {
  96. for JOB_CONFIG_FILE in ${BASE_DIR}/conf/flume/*.properties; do
  97. JOB_CONFIG_FILE_NAME=$(basename ${JOB_CONFIG_FILE})
  98. CONFIG_FULL_NAME=$(basename ${JOB_CONFIG_FILE_NAME} .properties)
  99. CONFIG_NAME=$(echo "${CONFIG_FULL_NAME}" | sed "s/kafka-hdfs-//g")
  100. stop
  101. done
  102. }
  103. function monitor() {
  104. while true; do
  105. agent_pid=""
  106. status
  107. if [ -z "${agent_pid}" ]; then
  108. "${BASE_DIR}"/bin/wechat-work-alert.sh \
  109. -key="${SKB_LITTLE_CUTE}" \
  110. -at=13917467529 \
  111. -msg="$(date +'%Y-%m-%d %H:%M:%S') Flume agent (${CONFIG_NAME}) is not running"
  112. else
  113. pretty_print "${NORM_MGT}Monitor Flume agent by read log file ${NORM_GRN}${current_log_file}${NORM_MGT}"
  114. if head -n 1000 "${current_log_file}" | grep -E "gz failed|java.io.IOException|org.apache.flume.ChannelException|java.lang.IllegalStateException"; then
  115. "${BASE_DIR}"/bin/wechat-work-alert.sh \
  116. -key="${SKB_LITTLE_CUTE}" \
  117. -at=13917467529 \
  118. -msg="$(date +'%Y-%m-%d %H:%M:%S') Flume agent (${CONFIG_NAME}) may not be running properly, please check log file ${current_log_file} to see what happened"
  119. else
  120. pretty_print "${NORM_MGT}Flume agent ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}is running properly"
  121. fi
  122. fi
  123. if [ "$(date +%H)" = "00" ]; then
  124. break
  125. fi
  126. pretty_print "${NORM_MGT}Waiting ${NORM_GRN}3600 ${NORM_MGT}seconds for the next check"
  127. sleep 3600s
  128. done
  129. }
  130. function pretty_print() {
  131. # 设置文本颜色和格式
  132. NORM_RED='\033[0;31m' # 红色
  133. NORM_GRN='\033[0;32m' # 绿色
  134. NORM_CYN='\033[0;36m' # 青色
  135. NORM_MGT='\033[0m' # 重置颜色和格式
  136. # 打印带颜色和格式的消息
  137. echo -e "${1}"
  138. }
  139. function run() {
  140. op="${1}"
  141. if [ -z "${op}" ]; then
  142. usage 1
  143. fi
  144. case ${op} in
  145. log | monitor | start | status | stop | restart)
  146. CONFIG_NAME="${2}"
  147. pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}"
  148. if [ -z "${CONFIG_NAME}" ]; then
  149. usage 1
  150. fi
  151. JOB_CONFIG_FILE_NAME="kafka-hdfs-${CONFIG_NAME}.properties"
  152. JOB_CONFIG_FILE="${BASE_DIR}/conf/flume/config/${JOB_CONFIG_FILE_NAME}"
  153. if [ ! -f "${JOB_CONFIG_FILE}" ]; then
  154. pretty_print "${NORM_RED}Flume作业配置文件 ${NORM_GRN}${JOB_CONFIG_FILE} ${NORM_RED}不存在"
  155. exit 1
  156. fi
  157. ;;
  158. start-all | stop-all | restart-all) ;;
  159. -h | -H | --h | --H | --help) usage 0 ;;
  160. *)
  161. pretty_print "${NORM_RED}Unsupported operation ${NORM_GRN}${op}"
  162. usage 1
  163. ;;
  164. esac
  165. case $op in
  166. log) log ;;
  167. monitor) monitor ;;
  168. start) start ;;
  169. start-all) start-all ;;
  170. status) status ;;
  171. stop) stop ;;
  172. stop-all) stop-all ;;
  173. restart)
  174. stop
  175. start
  176. ;;
  177. restart-all)
  178. stop-all
  179. start-all
  180. ;;
  181. esac
  182. }
  183. run "${@}"