Răsfoiți Sursa

feat(bin): 新增 datax-hive-import-starter + datax-hdfs-export-starter 入口

bash 壳 source init.sh 初始化 env 后 exec python3 主入口
py 用 argparse 解析老参数平迁集,构造 DataxImport / DataxExport 调 run
hive-import 含 -skip-partition / -t(对应 import 专有能力)
hdfs-export 无分区管理;源 HDFS check_data_exists 暂未搬迁(后续按需补)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
tianyu.chu 1 săptămână în urmă
părinte
comite
5b1c331852

+ 71 - 0
bin/datax-hdfs-export-starter.py

@@ -0,0 +1,71 @@
+#!/usr/bin/env /usr/bin/python3
+# -*- coding:utf-8 -*-
+"""
+DataX hdfs-export 入口:源=HDFS(Hive 表数据),目标=外部系统(ES/Mongo/MySQL/Kafka 等),
+对应 jobs/ads/ 场景。
+
+老参数平迁集(见 kb/90 §2.6;无 -skip-partition / -t):
+  -ini / -inis / -start-date / -stop-date / -host / -random / -parallel / -skip-datax
+
+注:源 HDFS 路径存在性 check(老 check_data_exists 行为)本版本暂未搬迁,
+后续按需补;当前源路径不存在时 datax.py 直接报错。
+"""
+import argparse
+import os
+import sys
+
+project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+sys.path.append(project_root_dir)
+
+from dw_base.datax.entry import DataxExport
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        prog='datax-hdfs-export-starter',
+        description='DataX hdfs-export 入口:Hive/HDFS → 外部系统 (ads 层)',
+    )
+    parser.add_argument('-ini', action='append', default=[], metavar='FILE',
+                        help='DataX ini 单文件(可多次)')
+    parser.add_argument('-inis', action='append', default=[], metavar='DIR',
+                        help='DataX ini 目录,非递归扫 *.ini(可多次)')
+    parser.add_argument('-start-date', required=True, metavar='YYYYMMDD')
+    parser.add_argument('-stop-date', required=True, metavar='YYYYMMDD')
+    parser.add_argument('-host', default=None, metavar='HOSTNAME',
+                        help='显式指定 worker(优先于 -random)')
+    parser.add_argument('-random', action='store_true', dest='use_random',
+                        help='从 conf/workers.ini 加权随机选 worker')
+    parser.add_argument('-parallel', action='store_true',
+                        help='并行执行(默认串行)')
+    parser.add_argument('-skip-datax', action='store_true',
+                        help='只生成 json 不执行 datax.py')
+    args = parser.parse_args()
+
+    print('{script} 收到参数: {argv}'.format(
+        script=os.path.basename(__file__), argv=' '.join(sys.argv[1:]),
+    ))
+
+    exporter = DataxExport(
+        base_dir=project_root_dir,
+        workers_ini_path=os.path.join(project_root_dir, 'conf', 'workers.ini'),
+        release_user=os.environ['RELEASE_USER'],
+        release_root_dir=os.environ['RELEASE_ROOT_DIR'],
+        python3_path=os.environ['PYTHON3_PATH'],
+        datax_home=os.environ['DATAX_HOME'],
+        log_root_dir=os.environ['LOG_ROOT_DIR'],
+    )
+    failed = exporter.run(
+        inis=args.ini,
+        inis_dirs=args.inis,
+        start_date=args.start_date,
+        stop_date=args.stop_date,
+        host=args.host,
+        use_random=args.use_random,
+        parallel=args.parallel,
+        skip_datax=args.skip_datax,
+    )
+    sys.exit(failed)
+
+
+if __name__ == '__main__':
+    main()

+ 10 - 0
bin/datax-hdfs-export-starter.sh

@@ -0,0 +1,10 @@
+#!/bin/bash
+# DataX hdfs-export 入口 bash 壳:初始化 env(含 workers.ini 展开)后 exec 到 Python 主入口。
+# 参数集见 bin/datax-hdfs-export-starter.py。
+CURRENT_DIR=$(pwd)
+BASE_DIR=$(
+  cd "$(dirname "$(realpath "$0")")/.." || exit
+  pwd
+)
+. "${BASE_DIR}"/bin/common/init.sh
+exec "${PYTHON3_PATH}" -u "${BASE_DIR}"/bin/datax-hdfs-export-starter.py "$@"

+ 82 - 0
bin/datax-hive-import-starter.py

@@ -0,0 +1,82 @@
+#!/usr/bin/env /usr/bin/python3
+# -*- coding:utf-8 -*-
+"""
+DataX hive-import 入口:目标=Hive(自动预建分区),对应 jobs/raw/ 场景。
+
+老参数平迁集(见 kb/90 §2.6):
+  -ini <file>          单 ini,可多次
+  -inis <dir>          ini 目录(非递归扫 *.ini),可多次
+  -start-date / -stop-date  yyyyMMdd
+  -host <hostname>     显式指定 worker
+  -random              加权随机选 worker
+  -parallel            并行(默认串行)
+  -skip-datax          只生成 json 不执行
+  -skip-partition      跳过 Hive 分区预建
+  -t <db.table>        显式追加需建分区的表,可多次
+"""
+import argparse
+import os
+import sys
+
+project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+sys.path.append(project_root_dir)
+
+from dw_base.datax.entry import DataxImport
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        prog='datax-hive-import-starter',
+        description='DataX hive-import 入口:PG/其他源 → Hive (raw 层)',
+    )
+    parser.add_argument('-ini', action='append', default=[], metavar='FILE',
+                        help='DataX ini 单文件(可多次)')
+    parser.add_argument('-inis', action='append', default=[], metavar='DIR',
+                        help='DataX ini 目录,非递归扫 *.ini(可多次)')
+    parser.add_argument('-start-date', required=True, metavar='YYYYMMDD')
+    parser.add_argument('-stop-date', required=True, metavar='YYYYMMDD')
+    parser.add_argument('-host', default=None, metavar='HOSTNAME',
+                        help='显式指定 worker(优先于 -random)')
+    parser.add_argument('-random', action='store_true', dest='use_random',
+                        help='从 conf/workers.ini 加权随机选 worker')
+    parser.add_argument('-parallel', action='store_true',
+                        help='并行执行(默认串行)')
+    parser.add_argument('-skip-datax', action='store_true',
+                        help='只生成 json 不执行 datax.py')
+    parser.add_argument('-skip-partition', action='store_true',
+                        help='跳过 Hive 分区预建')
+    parser.add_argument('-t', action='append', default=[], dest='extra_partition_tables',
+                        metavar='DB.TABLE',
+                        help='显式追加需建分区的 Hive 表(可多次)')
+    args = parser.parse_args()
+
+    print('{script} 收到参数: {argv}'.format(
+        script=os.path.basename(__file__), argv=' '.join(sys.argv[1:]),
+    ))
+
+    importer = DataxImport(
+        base_dir=project_root_dir,
+        workers_ini_path=os.path.join(project_root_dir, 'conf', 'workers.ini'),
+        release_user=os.environ['RELEASE_USER'],
+        release_root_dir=os.environ['RELEASE_ROOT_DIR'],
+        python3_path=os.environ['PYTHON3_PATH'],
+        datax_home=os.environ['DATAX_HOME'],
+        log_root_dir=os.environ['LOG_ROOT_DIR'],
+    )
+    failed = importer.run(
+        inis=args.ini,
+        inis_dirs=args.inis,
+        start_date=args.start_date,
+        stop_date=args.stop_date,
+        host=args.host,
+        use_random=args.use_random,
+        parallel=args.parallel,
+        skip_partition=args.skip_partition,
+        skip_datax=args.skip_datax,
+        extra_partition_tables=args.extra_partition_tables,
+    )
+    sys.exit(failed)
+
+
+if __name__ == '__main__':
+    main()

+ 10 - 0
bin/datax-hive-import-starter.sh

@@ -0,0 +1,10 @@
+#!/bin/bash
+# DataX hive-import 入口 bash 壳:初始化 env(含 workers.ini 展开)后 exec 到 Python 主入口。
+# 参数集见 bin/datax-hive-import-starter.py。
+CURRENT_DIR=$(pwd)
+BASE_DIR=$(
+  cd "$(dirname "$(realpath "$0")")/.." || exit
+  pwd
+)
+. "${BASE_DIR}"/bin/common/init.sh
+exec "${PYTHON3_PATH}" -u "${BASE_DIR}"/bin/datax-hive-import-starter.py "$@"