#!/bin/bash #-------------------------------------------------------------------------------------------------- # 启动单个flume-kafka-hdfs作业 # 1. 要求配置文件`conf/flume/kafka-hdfs-${ }.properties`必须存在 # 2. 可在console上查看到作业是否启动成功 # 3. 可通过查看日志`${LOG_ROOT_DIR}/flume-agent/${TODAY}/${JOB_NAME}.log`来确定作业运行情况 #-------------------------------------------------------------------------------------------------- BASE_DIR=$( cd "$(dirname "$(realpath "$0")")/.." || exit pwd ) . "${BASE_DIR}"/bin/common/init.sh LOG_ROOT_DIR="/opt/data/log" function usage() { echo -e "${NORM_MGT}Usage: $0 ${NORM_CYN}\t[-h/-H/--h/--H/--help] 打印脚本使用方法${DO_RESET}" echo -e "${NORM_MGT}Usage: $0 ${NORM_GRN}\t 程序操作:log、monitor、start、start-all、status、stop、stop-all、restart、restart-all ${NORM_GRN}\t 配置名称,配置文件名称要求为kafka-hdfs-.properties ${DO_RESET}" exit "$1" } function log() { status if [ -z "${current_log_file}" ]; then pretty_print "${NORM_RED}未找到任何有效的日志文件" exit 1 else tail -100f "${current_log_file}" fi } function start() { if [ "$(uname)" = "Linux" ]; then TODAY=$(date +%Y%m%d) else TODAY=$(date +%Y%m%d) fi LOG_DIR="${LOG_ROOT_DIR}/flume-agent/${TODAY}" LOG_FILE_PATH="${LOG_DIR}/${CONFIG_NAME}.log" if [ ! -d "${LOG_DIR}" ]; then mkdir -p "${LOG_DIR}" pretty_print "${NORM_MGT}创建日志目录 ${NORM_GRN}${LOG_DIR}" fi count=$(ps -axo command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | wc -l) if [ "${count}" -gt 0 ]; then pretty_print "${NORM_RED}使用配置文件 ${NORM_GRN}${JOB_CONFIG_FILE_NAME} ${NORM_RED}的Flume作业已在运行中" else pretty_print "${NORM_MGT}使用作业名称 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}启动Flume作业" flume-ng agent \ -Xms256m -Xmx4g \ --conf /etc/flume-ng/conf/ \ --conf-file "${JOB_CONFIG_FILE}" \ --name a1 \ -Dflume.root.logger=INFO,console >>"${LOG_FILE_PATH}" 2>&1 & FLUME_APPLICATION_PID=$! pretty_print "${NORM_MGT}Flume作业已启动,pid为 ${NORM_GRN}${FLUME_APPLICATION_PID}${NORM_MGT},日志文件为 ${NORM_GRN}${LOG_FILE_PATH}" fi } function start-all() { for JOB_CONFIG_FILE in ${BASE_DIR}/conf/flume/*.properties; do JOB_CONFIG_FILE_NAME=$(basename ${JOB_CONFIG_FILE}) CONFIG_FULL_NAME=$(basename ${JOB_CONFIG_FILE_NAME} .properties) CONFIG_NAME=$(echo "${CONFIG_FULL_NAME}" | sed "s/kafka-hdfs-//g") start done } function status() { agent_pid=$(ps -axo pid,command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | awk -F ' ' '{print $1}') if [ -n "${agent_pid}" ]; then mapfile -t log_files < <(find "${LOG_ROOT_DIR}"/flume-agent -name "*${CONFIG_NAME}.log" | sort -r) if [ "${#log_files[@]}" -gt 0 ]; then current_log_file="${log_files[0]}" fi pretty_print "${NORM_MGT}Flume agent ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}is running at pid ${NORM_GRN}${agent_pid}" else pretty_print "${NORM_MGT}Flume agent ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}is not running" fi } function stop() { agent_pid=$(ps -axo pid,command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | awk -F ' ' '{print $1}') if [ -z "${agent_pid}" ]; then pretty_print "${NORM_MGT}Flume作业 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}并未运行" return else pretty_print "${NORM_MGT}停止Flume作业 ${NORM_GRN}${CONFIG_NAME}(${agent_pid})" kill -15 "${agent_pid}" fi agent_pid=$(ps -axo pid,command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | awk -F ' ' '{print $1}') if [ -z "${agent_pid}" ]; then pretty_print "${NORM_MGT}Flume作业 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}已停止" else pretty_print "${NORM_MGT}Flume作业 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}停止失败" fi } function stop-all() { for JOB_CONFIG_FILE in ${BASE_DIR}/conf/flume/*.properties; do JOB_CONFIG_FILE_NAME=$(basename ${JOB_CONFIG_FILE}) CONFIG_FULL_NAME=$(basename ${JOB_CONFIG_FILE_NAME} .properties) CONFIG_NAME=$(echo "${CONFIG_FULL_NAME}" | sed "s/kafka-hdfs-//g") stop done } function monitor() { while true; do agent_pid="" status if [ -z "${agent_pid}" ]; then "${BASE_DIR}"/bin/wechat-work-alert.sh \ -key="${SKB_LITTLE_CUTE}" \ -at=13917467529 \ -msg="$(date +'%Y-%m-%d %H:%M:%S') Flume agent (${CONFIG_NAME}) is not running" else pretty_print "${NORM_MGT}Monitor Flume agent by read log file ${NORM_GRN}${current_log_file}${NORM_MGT}" if head -n 1000 "${current_log_file}" | grep -E "gz failed|java.io.IOException|org.apache.flume.ChannelException|java.lang.IllegalStateException"; then "${BASE_DIR}"/bin/wechat-work-alert.sh \ -key="${SKB_LITTLE_CUTE}" \ -at=13917467529 \ -msg="$(date +'%Y-%m-%d %H:%M:%S') Flume agent (${CONFIG_NAME}) may not be running properly, please check log file ${current_log_file} to see what happened" else pretty_print "${NORM_MGT}Flume agent ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}is running properly" fi fi if [ "$(date +%H)" = "00" ]; then break fi pretty_print "${NORM_MGT}Waiting ${NORM_GRN}3600 ${NORM_MGT}seconds for the next check" sleep 3600s done } function pretty_print() { # 设置文本颜色和格式 NORM_RED='\033[0;31m' # 红色 NORM_GRN='\033[0;32m' # 绿色 NORM_CYN='\033[0;36m' # 青色 NORM_MGT='\033[0m' # 重置颜色和格式 # 打印带颜色和格式的消息 echo -e "${1}" } function run() { op="${1}" if [ -z "${op}" ]; then usage 1 fi case ${op} in log | monitor | start | status | stop | restart) CONFIG_NAME="${2}" pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}" if [ -z "${CONFIG_NAME}" ]; then usage 1 fi JOB_CONFIG_FILE_NAME="kafka-hdfs-${CONFIG_NAME}.properties" JOB_CONFIG_FILE="${BASE_DIR}/conf/flume/config/${JOB_CONFIG_FILE_NAME}" if [ ! -f "${JOB_CONFIG_FILE}" ]; then pretty_print "${NORM_RED}Flume作业配置文件 ${NORM_GRN}${JOB_CONFIG_FILE} ${NORM_RED}不存在" exit 1 fi ;; start-all | stop-all | restart-all) ;; -h | -H | --h | --H | --help) usage 0 ;; *) pretty_print "${NORM_RED}Unsupported operation ${NORM_GRN}${op}" usage 1 ;; esac case $op in log) log ;; monitor) monitor ;; start) start ;; start-all) start-all ;; status) status ;; stop) stop ;; stop-all) stop-all ;; restart) stop start ;; restart-all) stop-all start-all ;; esac } run "${@}"