Преглед изворни кода

chore: 删 bin/flume-control.sh

tianyu.chu пре 2 недеља
родитељ
комит
281c625bb3
3 измењених фајлова са 2 додато и 194 уклоњено
  1. 0 194
      bin/flume-control.sh
  2. 1 0
      kb/90-重构路线.md
  3. 1 0
      kb/92-重构进度.md

+ 0 - 194
bin/flume-control.sh

@@ -1,194 +0,0 @@
-#!/bin/bash
-#--------------------------------------------------------------------------------------------------
-# 启动单个flume-kafka-hdfs作业
-# 1. 要求配置文件`conf/flume/kafka-hdfs-${  }.properties`必须存在
-# 2. 可在console上查看到作业是否启动成功
-# 3. 可通过查看日志`${LOG_ROOT_DIR}/flume-agent/${TODAY}/${JOB_NAME}.log`来确定作业运行情况
-#--------------------------------------------------------------------------------------------------
-BASE_DIR=$(
-  cd "$(dirname "$(realpath "$0")")/.." || exit
-  pwd
-)
-. "${BASE_DIR}"/bin/common/init.sh
-LOG_ROOT_DIR="/opt/data/log"
-
-function usage() {
-  echo -e "${NORM_MGT}Usage: $0
-  ${NORM_CYN}\t[-h/-H/--h/--H/--help]                                                 打印脚本使用方法${DO_RESET}"
-  echo -e "${NORM_MGT}Usage: $0
-  ${NORM_GRN}\t<log|monitor|start|start-all|status|stop|stop-all|restart|restart-all> 程序操作:log、monitor、start、start-all、status、stop、stop-all、restart、restart-all
-  ${NORM_GRN}\t<config name>                                                          配置名称,配置文件名称要求为kafka-hdfs-<config>.properties
-  ${DO_RESET}"
-  exit "$1"
-}
-
-function log() {
-  status
-  if [ -z "${current_log_file}" ]; then
-    pretty_print "${NORM_RED}未找到任何有效的日志文件"
-    exit 1
-  else
-    tail -100f "${current_log_file}"
-  fi
-}
-
-function start() {
-  if [ "$(uname)" = "Linux" ]; then
-    TODAY=$(date +%Y%m%d)
-  else
-    TODAY=$(date +%Y%m%d)
-  fi
-  LOG_DIR="${LOG_ROOT_DIR}/flume-agent/${TODAY}"
-  LOG_FILE_PATH="${LOG_DIR}/${CONFIG_NAME}.log"
-  if [ ! -d "${LOG_DIR}" ]; then
-    mkdir -p "${LOG_DIR}"
-    pretty_print "${NORM_MGT}创建日志目录 ${NORM_GRN}${LOG_DIR}"
-  fi
-  count=$(ps -axo command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | wc -l)
-  if [ "${count}" -gt 0 ]; then
-    pretty_print "${NORM_RED}使用配置文件 ${NORM_GRN}${JOB_CONFIG_FILE_NAME} ${NORM_RED}的Flume作业已在运行中"
-  else
-    pretty_print "${NORM_MGT}使用作业名称 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}启动Flume作业"
-    flume-ng agent \
-      -Xms256m -Xmx4g \
-      --conf /etc/flume-ng/conf/ \
-      --conf-file "${JOB_CONFIG_FILE}" \
-      --name a1 \
-      -Dflume.root.logger=INFO,console >>"${LOG_FILE_PATH}" 2>&1 &
-    FLUME_APPLICATION_PID=$!
-    pretty_print "${NORM_MGT}Flume作业已启动,pid为 ${NORM_GRN}${FLUME_APPLICATION_PID}${NORM_MGT},日志文件为 ${NORM_GRN}${LOG_FILE_PATH}"
-  fi
-}
-
-function start-all() {
-  for JOB_CONFIG_FILE in ${BASE_DIR}/conf/flume/*.properties; do
-    JOB_CONFIG_FILE_NAME=$(basename ${JOB_CONFIG_FILE})
-    CONFIG_FULL_NAME=$(basename ${JOB_CONFIG_FILE_NAME} .properties)
-    CONFIG_NAME=$(echo "${CONFIG_FULL_NAME}" | sed "s/kafka-hdfs-//g")
-    start
-  done
-}
-
-function status() {
-  agent_pid=$(ps -axo pid,command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | awk -F ' ' '{print $1}')
-  if [ -n "${agent_pid}" ]; then
-    mapfile -t log_files < <(find "${LOG_ROOT_DIR}"/flume-agent -name "*${CONFIG_NAME}.log" | sort -r)
-    if [ "${#log_files[@]}" -gt 0 ]; then
-      current_log_file="${log_files[0]}"
-    fi
-    pretty_print "${NORM_MGT}Flume agent ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}is running at pid ${NORM_GRN}${agent_pid}"
-  else
-    pretty_print "${NORM_MGT}Flume agent ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}is not running"
-  fi
-}
-
-function stop() {
-  agent_pid=$(ps -axo pid,command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | awk -F ' ' '{print $1}')
-  if [ -z "${agent_pid}" ]; then
-    pretty_print "${NORM_MGT}Flume作业 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}并未运行"
-    return
-  else
-    pretty_print "${NORM_MGT}停止Flume作业 ${NORM_GRN}${CONFIG_NAME}(${agent_pid})"
-    kill -15 "${agent_pid}"
-  fi
-  agent_pid=$(ps -axo pid,command | grep "${JOB_CONFIG_FILE_NAME}" | grep -v grep | awk -F ' ' '{print $1}')
-  if [ -z "${agent_pid}" ]; then
-    pretty_print "${NORM_MGT}Flume作业 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}已停止"
-  else
-    pretty_print "${NORM_MGT}Flume作业 ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}停止失败"
-  fi
-}
-
-function stop-all() {
-  for JOB_CONFIG_FILE in ${BASE_DIR}/conf/flume/*.properties; do
-    JOB_CONFIG_FILE_NAME=$(basename ${JOB_CONFIG_FILE})
-    CONFIG_FULL_NAME=$(basename ${JOB_CONFIG_FILE_NAME} .properties)
-    CONFIG_NAME=$(echo "${CONFIG_FULL_NAME}" | sed "s/kafka-hdfs-//g")
-    stop
-  done
-}
-
-function monitor() {
-  while true; do
-    agent_pid=""
-    status
-    if [ -z "${agent_pid}" ]; then
-      "${BASE_DIR}"/bin/wechat-work-alert.sh \
-        -key="${SKB_LITTLE_CUTE}" \
-        -at=13917467529 \
-        -msg="$(date +'%Y-%m-%d %H:%M:%S') Flume agent (${CONFIG_NAME}) is not running"
-    else
-      pretty_print "${NORM_MGT}Monitor Flume agent by read log file ${NORM_GRN}${current_log_file}${NORM_MGT}"
-      if head -n 1000 "${current_log_file}" | grep -E "gz failed|java.io.IOException|org.apache.flume.ChannelException|java.lang.IllegalStateException"; then
-        "${BASE_DIR}"/bin/wechat-work-alert.sh \
-          -key="${SKB_LITTLE_CUTE}" \
-          -at=13917467529 \
-          -msg="$(date +'%Y-%m-%d %H:%M:%S') Flume agent (${CONFIG_NAME}) may not be running properly, please check log file ${current_log_file} to see what happened"
-      else
-        pretty_print "${NORM_MGT}Flume agent ${NORM_GRN}${CONFIG_NAME} ${NORM_MGT}is running properly"
-      fi
-    fi
-    if [ "$(date +%H)" = "00" ]; then
-      break
-    fi
-    pretty_print "${NORM_MGT}Waiting ${NORM_GRN}3600 ${NORM_MGT}seconds for the next check"
-    sleep 3600s
-  done
-}
-
-function pretty_print() {
-    # 设置文本颜色和格式
-    NORM_RED='\033[0;31m'  # 红色
-    NORM_GRN='\033[0;32m'  # 绿色
-    NORM_CYN='\033[0;36m'  # 青色
-    NORM_MGT='\033[0m'   # 重置颜色和格式
-    # 打印带颜色和格式的消息
-    echo -e "${1}"
-}
-
-function run() {
-  op="${1}"
-  if [ -z "${op}" ]; then
-    usage 1
-  fi
-  case ${op} in
-  log | monitor | start | status | stop | restart)
-    CONFIG_NAME="${2}"
-    pretty_print "${NORM_MGT}${0} 收到参数:${NORM_GRN}${*}"
-    if [ -z "${CONFIG_NAME}" ]; then
-      usage 1
-    fi
-    JOB_CONFIG_FILE_NAME="kafka-hdfs-${CONFIG_NAME}.properties"
-    JOB_CONFIG_FILE="${BASE_DIR}/conf/flume/config/${JOB_CONFIG_FILE_NAME}"
-    if [ ! -f "${JOB_CONFIG_FILE}" ]; then
-      pretty_print "${NORM_RED}Flume作业配置文件 ${NORM_GRN}${JOB_CONFIG_FILE} ${NORM_RED}不存在"
-      exit 1
-    fi
-    ;;
-  start-all | stop-all | restart-all) ;;
-  -h | -H | --h | --H | --help) usage 0 ;;
-  *)
-    pretty_print "${NORM_RED}Unsupported operation ${NORM_GRN}${op}"
-    usage 1
-    ;;
-  esac
-  case $op in
-  log) log ;;
-  monitor) monitor ;;
-  start) start ;;
-  start-all) start-all ;;
-  status) status ;;
-  stop) stop ;;
-  stop-all) stop-all ;;
-  restart)
-    stop
-    start
-    ;;
-  restart-all)
-    stop-all
-    start-all
-    ;;
-  esac
-}
-
-run "${@}"

+ 1 - 0
kb/90-重构路线.md

@@ -629,6 +629,7 @@ sql = "... WHERE TABLE_SCHEMA='%s' ..." % (database, table_name)
 | `dw_base/{validation,ml,flink,elasticsearch}/` 四空壳模块 | 整目录删(仅 56 字节空 `__init__.py`,零引用) | 2026-04-20,见 `92-进度.md` |
 | `dw_base/database/mongodb_utils.py` | 整文件删(184 行 = 19 行 MongoClient 薄包装 + 165 行注释,零外部引用,需要时 `MongoClient(uri)` 一行重写) | 2026-04-20,见 `92-进度.md` |
 | `conf/datax/` 下老项目遗留 ini / datasource 样例 | 整批挪入 `conf/bak/datax/{config,datasource}/`,由 `.gitignore:6 conf/bak` 拦截不入库 | 项目初始化 `8d2ade5`(2026-04-17) |
+| `bin/flume-control.sh` | 整文件删(194 行)。职责:管理单个 Flume-Kafka-HDFS 作业生命周期(log/monitor/start/status/stop/restart/start-all/stop-all/restart-all 一组子命令)。实际状态:顶部 shebang 损坏(`ho#!/bin/bash`)、依赖已删除的 `bin/wechat-work-alert.sh`、L64 与 L162 `conf/flume/*.properties` vs `conf/flume/config/*.properties` 路径自相矛盾,整体已不可跑。按需重建时参考:`conf/flume/config/*.properties` 为作业配置约定、`flume-ng agent --conf-file` 为启动命令、启动前需 `mkdir -p ${LOG_ROOT_DIR}/flume-agent/${TODAY}`、监控循环 `head -n 1000 log | grep -E "gz failed\|java.io.IOException\|org.apache.flume.ChannelException\|java.lang.IllegalStateException"` 命中则发企微告警。Kafka→HDFS 接入通道重建时按新 `conf/alerter.ini` 告警外配重写,不沿用老 SKB_LITTLE_CUTE / 手机号硬编码 | 2026-04-21,见 `92-进度.md` |
 
 > 代码里残留的 `conf/datax/config/` replace 死逻辑 + `conf/datax/generated` 默认值,属于 §2.x 路径硬编码清理范畴(改名为 `conf/datax-json/` + 删 replace),不在本节。
 

+ 1 - 0
kb/92-重构进度.md

@@ -187,3 +187,4 @@
 | 2026-04-21 | **kb/31 表头定稿 + 合并 common/business + 取消 UDF 准入规则**:(a) kb/31 表头改为 `函数编号 \| 函数名 \| 分类 \| 入参 \| 返回 \| 描述 \| 示例 \| 补注释状态`,函数编号由 Codex 在 `spark_common_udf.py` 以顺序注释注入(不依赖行号,避免改动污染其他函数);(b) business UDF 纳入 kb/31 第二段(原"business 在子目录 README 维护,不走此表"规划取消);(c) UDF 准入规则取消(无执行主体,规则无意义)。联动 `kb/90 §2.12`:删"业务 UDF 进 common 准入标准缺失"问题项、同步新表头、删"business 不走此表"表述、删"注册准入规则"整点、关系节去"准入规则"字样;`kb/92` L181 历史条目 (c) 改软标注指向本条。另 `kb/30 §5` 删"jobs/ 里没有 CREATE TABLE 可改"冗余句与"归档策略"防御性条款 | — |
 | 2026-04-21 | **聚簇 A.1 环境变量外配到 `conf/env.sh`(bash + Python 单源)**:新建 `conf/env.sh` 统一 5 vars —— `RELEASE_USER="bigdata"`(替换 `"alvis"`)/ `RELEASE_ROOT_DIR="/home/bigdata/release"` / `PYTHON3_PATH` / `DATAX_HOME`(保持原有 `${DATAX_HOME:-/opt/datax}` 条件赋值)/ `LOG_ROOT_DIR="${HOME}/log"`。Python 侧新建 `dw_base/utils/env_loader.bootstrap_env()` 通过 `subprocess.run(['bash', '-c', '. env.sh && env -0'])` 让 bash 自己解析 env.sh,再 `os.environ.setdefault(...)` 注入(避免重写一套解析器导致 bash/py 双解析漂移;setdefault 允许 shell 侧已 export 的值优先)。`dw_base/__init__.py` 顶部加 `bootstrap_env()` 调用 + 读 `os.environ['RELEASE_USER' / 'RELEASE_ROOT_DIR' / 'LOG_ROOT_DIR']`,删 `/opt/data/log` vs `${HOME}/data/log` 的 whoami 分流(对齐 kb/90 §7.2.1 的单值决策)。`bin/common/init.sh` 顶部加 `. conf/env.sh` + 删对应硬编码变量 + 删 whoami 分流 + 删 `export LOG_ROOT_DIR / PYTHON3_PATH / RELEASE_ROOT_DIR / DATAX_HOME`(env.sh 已 export);`bin/publish.sh` 加 `. conf/env.sh` + 删 L15 `RELEASE_ROOT_DIR="/home/alvis/release"` 硬编码。联动 kb/90 §2.1 删 5 行已完成硬编码(DATAX_HOME / PYTHON3_PATH / RELEASE_USER / RELEASE_ROOT_DIR / LOG_ROOT_DIR)+ §2.2 config 结构删 `env.py`(单源不做双份)+ §7.2.1 大幅瘦身(去掉现状/为什么/代码改动三段,仅保留日志路径 `{module}/{dt}/{file}.log` 约定)+ 本文阶段 2 checklist 4 处 [x] | — |
 | 2026-04-21 | **`dw_base/common/__init__.py` 撤销删除(反转先前空壳判断)**:早先 `ls` 只看到 `__init__.py` 0 字节就判定"空壳"、按 CLAUDE.md "空 __init__.py + 无 README 下次清理直接删"规则挂入 A.1 执行清单第 9 项。`grep` 查证发现 `dw_base/common/` 下 `config_constants.py` / `container.py` / `template_constants.py` 分别被 `bin/spark-sql-starter.py` / `bin/datax-job-config-generator.py` / `dw_base/utils/log_utils.py` / `dw_base/datax/plugins/reader/mysql_reader.py` import,`__init__.py` 0 字节是 Python 正常的 package 标识(`dw_base/__init__.py` 含 bootstrap 逻辑,保持 regular package 一致性优于切 namespace package)。结论:不删。教训:空壳判断必须 `ls` 整个目录 + grep 所有子模块引用,不能只看 `__init__.py` 字节数 | — |
+| 2026-04-21 | **删除 `bin/flume-control.sh`(194 行;事实不可用)**:脚本顶部 shebang 损坏(`ho#!/bin/bash`)+ 依赖已在 2026-04-20 删除的 `bin/wechat-work-alert.sh` + L64 `conf/flume/*.properties` 与 L162 `conf/flume/config/*.properties` 路径自相矛盾,实际已跑不起来。决定:整文件删 + Kafka→HDFS 接入通道的设计理念归档到 kb/90 §5.2 历史档案,按需重建时以该档为参考(不沿用老 `SKB_LITTLE_CUTE` / 手机号硬编码告警,重建时按 `conf/alerter.ini` 外配走) | — |