#!/usr/bin/env /usr/bin/python3 # -*- coding:utf-8 -*- """ DataX hdfs-export 入口:源=HDFS(Hive 表数据),目标=外部系统(ES/Mongo/MySQL/Kafka 等), 对应 jobs/ads/ 场景。 老参数平迁集(见 kb/90 §2.6;无 -skip-partition / -t): -ini / -inis / -start-date / -stop-date / -host / -random / -parallel / -skip-datax 注:源 HDFS 路径存在性 check(老 check_data_exists 行为)本版本暂未搬迁, 后续按需补;当前源路径不存在时 datax.py 直接报错。 """ import argparse import os import sys from datetime import date, timedelta project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root_dir) from dw_base.datax.entry import DataxExport def main(): parser = argparse.ArgumentParser( prog='datax-hdfs-export-starter', description='DataX hdfs-export 入口:Hive/HDFS → 外部系统 (ads 层)', ) parser.add_argument('-ini', action='append', default=[], metavar='FILE', help='DataX ini 单文件(可多次)') parser.add_argument('-inis', action='append', default=[], metavar='DIR', help='DataX ini 目录,非递归扫 *.ini(可多次)') parser.add_argument('-start-date', default=None, metavar='YYYYMMDD', help='默认昨天(对齐老 sh 行为)') parser.add_argument('-stop-date', default=None, metavar='YYYYMMDD', help='默认今天(对齐老 sh 行为)') parser.add_argument('-host', default=None, metavar='HOSTNAME', help='显式指定 worker(优先于 -random)') parser.add_argument('-random', action='store_true', dest='use_random', help='从 conf/workers.ini 加权随机选 worker') parser.add_argument('-parallel', action='store_true', help='并行执行(默认串行)') parser.add_argument('-skip-datax', action='store_true', help='只生成 json 不执行 datax.py') args = parser.parse_args() # 默认日期:昨天 → 今天(对齐老 datax-single-job-starter.sh:207-219 行为) if not args.start_date: args.start_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d') if not args.stop_date: args.stop_date = date.today().strftime('%Y%m%d') print('{script} 收到参数: {argv}'.format( script=os.path.basename(__file__), argv=' '.join(sys.argv[1:]), )) print(' start_date={s} stop_date={e}'.format(s=args.start_date, e=args.stop_date)) exporter = DataxExport( base_dir=project_root_dir, workers_ini_path=os.path.join(project_root_dir, 'conf', 'workers.ini'), release_user=os.environ['RELEASE_USER'], release_root_dir=os.environ['RELEASE_ROOT_DIR'], python3_path=os.environ['PYTHON3_PATH'], datax_home=os.environ['DATAX_HOME'], log_root_dir=os.environ['LOG_ROOT_DIR'], ) failed = exporter.run( inis=args.ini, inis_dirs=args.inis, start_date=args.start_date, stop_date=args.stop_date, host=args.host, use_random=args.use_random, parallel=args.parallel, skip_datax=args.skip_datax, ) sys.exit(failed) if __name__ == '__main__': main()