| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- #!/bin/bash
- #--------------------------------------------------------------------------------------------------
- # 启动单个flume-kafka-hdfs作业
- # 1. 要求配置文件`conf/flume/kafka-hdfs-${ }.properties`必须存在
- # 2. 可在console上查看到作业是否启动成功
- # 3. 可通过查看日志`${LOG_ROOT_DIR}/flume-agent/${TODAY}/${JOB_NAME}.log`来确定作业运行情况
- #--------------------------------------------------------------------------------------------------
- BASE_DIR=$(
- cd "$(dirname "$(realpath "$0")")/.." || exit
- pwd
- )
- . "${BASE_DIR}"/bin/common/init.sh
- LOG_ROOT_DIR="/opt/data/log"
- function usage() {
- echo -e "${NORM_MGT}Usage: $0
- ${NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法${DO_RESET}"
- echo -e "${NORM_MGT}Usage: $0
- ${NORM_GRN}\t<log|monitor|start|start-all|status|stop|stop-all|restart|restart-all> 程序操作:log、monitor、start、start-all、status、stop、stop-all、restart、restart-all
- ${NORM_GRN}\t<config name> 配置名称,配置文件名称要求为kafka-hdfs-<config>.properties
- ${DO_RESET}"
- exit "$1"
- }
- function log() {
- status
- if [ -z "${current_log_file}" ]; then
- pretty_print "${NORM_RED}未找到任何有效的日志文件"
- exit 1
- else
- tail -100f "${current_log_file}"
- fi
- }
- function start() {
- if [ "$(uname)" = "Linux" ]; then
- TODAY=$(date +%Y%m%d)
- else
- TODAY=$(date +%Y%m%d)
- fi
- LOG_DIR="${LOG_ROOT_DIR}/flume-agent/${TODAY}"
- LOG_FILE_PATH="${LOG_DIR}/${CONFIG_NAME}.log"
- if [ ! -d "${LOG_DIR}" ]; then
- mkdir -p "${LOG_DIR}"
- pretty_print "${NORM_MGT}创建日志目录 ${NORM_GRN}${LOG_DIR}"
- fi
- count=$(ps -axo command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | wc -l)
- if [ "${count}" -gt 0 ]; then
- pretty_print "${NORM_RED}使用配置文件 ${NORM_GRN}${JOB_CONFIG_FILE_NAME} ${NORM_RED}的Flume作业已在运行中"
- else
- pretty_print "${NORM_MGT}使用作业名称 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}启动Flume作业"
- flume-ng agent \
- -Xms256m -Xmx4g \
- --conf /etc/flume-ng/conf/ \
- --conf-file "${JOB_CONFIG_FILE}" \
- --name a1 \
- -Dflume.root.logger=INFO,console >>"${LOG_FILE_PATH}" 2>&1 &
- FLUME_APPLICATION_PID=$!
- pretty_print "${NORM_MGT}Flume作业已启动,pid为 ${NORM_GRN}${FLUME_APPLICATION_PID}${NORM_MGT},日志文件为 ${NORM_GRN}${LOG_FILE_PATH}"
- fi
- }
- function start-all() {
- for JOB_CONFIG_FILE in ${BASE_DIR}/conf/flume/*.properties; do
- JOB_CONFIG_FILE_NAME=$(basename ${JOB_CONFIG_FILE})
- CONFIG_FULL_NAME=$(basename ${JOB_CONFIG_FILE_NAME} .properties)
- CONFIG_NAME=$(echo "${CONFIG_FULL_NAME}" | sed "s/kafka-hdfs-//g")
- start
- done
- }
- function status() {
- agent_pid=$(ps -axo pid,command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | awk -F ' ' '{print $1}')
- if [ -n "${agent_pid}" ]; then
- mapfile -t log_files < <(find "${LOG_ROOT_DIR}"/flume-agent -name "*${CONFIG_NAME}.log" | sort -r)
- if [ "${#log_files[@]}" -gt 0 ]; then
- current_log_file="${log_files[0]}"
- fi
- pretty_print "${NORM_MGT}Flume agent ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}is running at pid ${NORM_GRN}${agent_pid}"
- else
- pretty_print "${NORM_MGT}Flume agent ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}is not running"
- fi
- }
- function stop() {
- agent_pid=$(ps -axo pid,command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | awk -F ' ' '{print $1}')
- if [ -z "${agent_pid}" ]; then
- pretty_print "${NORM_MGT}Flume作业 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}并未运行"
- return
- else
- pretty_print "${NORM_MGT}停止Flume作业 ${NORM_GRN}${CONFIG_NAME}(${agent_pid})"
- kill -15 "${agent_pid}"
- fi
- agent_pid=$(ps -axo pid,command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | awk -F ' ' '{print $1}')
- if [ -z "${agent_pid}" ]; then
- pretty_print "${NORM_MGT}Flume作业 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}已停止"
- else
- pretty_print "${NORM_MGT}Flume作业 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}停止失败"
- fi
- }
- function stop-all() {
- for JOB_CONFIG_FILE in ${BASE_DIR}/conf/flume/*.properties; do
- JOB_CONFIG_FILE_NAME=$(basename ${JOB_CONFIG_FILE})
- CONFIG_FULL_NAME=$(basename ${JOB_CONFIG_FILE_NAME} .properties)
- CONFIG_NAME=$(echo "${CONFIG_FULL_NAME}" | sed "s/kafka-hdfs-//g")
- stop
- done
- }
- function monitor() {
- while true; do
- agent_pid=""
- status
- if [ -z "${agent_pid}" ]; then
- "${BASE_DIR}"/bin/wechat-work-alert.sh \
- -key="${SKB_LITTLE_CUTE}" \
- -at=13917467529 \
- -msg="$(date +'%Y-%m-%d %H:%M:%S') Flume agent (${CONFIG_NAME}) is not running"
- else
- pretty_print "${NORM_MGT}Monitor Flume agent by read log file ${NORM_GRN}${current_log_file}${NORM_MGT}"
- if head -n 1000 "${current_log_file}" | grep -E "gz failed|java.io.IOException|org.apache.flume.ChannelException|java.lang.IllegalStateException"; then
- "${BASE_DIR}"/bin/wechat-work-alert.sh \
- -key="${SKB_LITTLE_CUTE}" \
- -at=13917467529 \
- -msg="$(date +'%Y-%m-%d %H:%M:%S') Flume agent (${CONFIG_NAME}) may not be running properly, please check log file ${current_log_file} to see what happened"
- else
- pretty_print "${NORM_MGT}Flume agent ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}is running properly"
- fi
- fi
- if [ "$(date +%H)" = "00" ]; then
- break
- fi
- pretty_print "${NORM_MGT}Waiting ${NORM_GRN}3600 ${NORM_MGT}seconds for the next check"
- sleep 3600s
- done
- }
- function pretty_print() {
- # 设置文本颜色和格式
- NORM_RED='\033[0;31m' # 红色
- NORM_GRN='\033[0;32m' # 绿色
- NORM_CYN='\033[0;36m' # 青色
- NORM_MGT='\033[0m' # 重置颜色和格式
- # 打印带颜色和格式的消息
- echo -e "${1}"
- }
- function run() {
- op="${1}"
- if [ -z "${op}" ]; then
- usage 1
- fi
- case ${op} in
- log | monitor | start | status | stop | restart)
- CONFIG_NAME="${2}"
- pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}"
- if [ -z "${CONFIG_NAME}" ]; then
- usage 1
- fi
- JOB_CONFIG_FILE_NAME="kafka-hdfs-${CONFIG_NAME}.properties"
- JOB_CONFIG_FILE="${BASE_DIR}/conf/flume/config/${JOB_CONFIG_FILE_NAME}"
- if [ ! -f "${JOB_CONFIG_FILE}" ]; then
- pretty_print "${NORM_RED}Flume作业配置文件 ${NORM_GRN}${JOB_CONFIG_FILE} ${NORM_RED}不存在"
- exit 1
- fi
- ;;
- start-all | stop-all | restart-all) ;;
- -h | -H | --h | --H | --help) usage 0 ;;
- *)
- pretty_print "${NORM_RED}Unsupported operation ${NORM_GRN}${op}"
- usage 1
- ;;
- esac
- case $op in
- log) log ;;
- monitor) monitor ;;
- start) start ;;
- start-all) start-all ;;
- status) status ;;
- stop) stop ;;
- stop-all) stop-all ;;
- restart)
- stop
- start
- ;;
- restart-all)
- stop-all
- start-all
- ;;
- esac
- }
- run "${@}"
|