| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- #!/usr/bin/env /usr/bin/python3
- # -*- coding:utf-8 -*-
- """
- DataX hdfs-export 入口:源=HDFS(Hive 表数据),目标=外部系统(ES/Mongo/MySQL/Kafka 等),
- 对应 jobs/ads/ 场景。
- 老参数平迁集(见 kb/90 §2.6;无 -skip-partition / -t):
- -ini / -inis / -start-date / -stop-date / -host / -random / -parallel / -skip-datax / -skip-check
- HDFS 源存在性 check 默认开启,missing/empty → 任务失败(与老 silent skip 语义反转,
- 配合 DS 告警"没数据=异常"场景);手动跑批知道分区空可加 -skip-check 跳过。
- """
- import argparse
- import os
- import sys
- from datetime import date, timedelta
- project_root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(project_root_dir)
- from dw_base.datax.entry import DataxExport
- def main():
- parser = argparse.ArgumentParser(
- prog='datax-hdfs-export-starter',
- description='DataX hdfs-export 入口:Hive/HDFS → 外部系统 (ads 层)',
- )
- parser.add_argument('-ini', action='append', default=[], metavar='FILE',
- help='DataX ini 单文件(可多次)')
- parser.add_argument('-inis', action='append', default=[], metavar='DIR',
- help='DataX ini 目录,非递归扫 *.ini(可多次)')
- parser.add_argument('-start-date', default=None, metavar='YYYYMMDD',
- help='默认昨天(对齐老 sh 行为)')
- parser.add_argument('-stop-date', default=None, metavar='YYYYMMDD',
- help='默认今天(对齐老 sh 行为)')
- parser.add_argument('-host', default=None, metavar='HOSTNAME',
- help='显式指定 worker(优先于 -random)')
- parser.add_argument('-random', action='store_true', dest='use_random',
- help='从 conf/workers.ini 加权随机选 worker')
- parser.add_argument('-parallel', action='store_true',
- help='并行执行(默认串行)')
- parser.add_argument('-skip-datax', action='store_true',
- help='只生成 json 不执行 datax.py')
- parser.add_argument('-skip-check', action='store_true',
- help='跳过 HDFS 源路径 check(默认开启 check,missing/empty 算失败;显式关闭后不检查直接交 datax)')
- # L3 speed 覆盖(L1 conf/datax-tuning.conf < L2 ini [speed] 段 < L3 本参数)
- parser.add_argument('-channel', type=int, default=None,
- help='L3 speed.channel 覆盖(不传则走 L2 ini / L1 conf)')
- parser.add_argument('-byte', type=int, default=None,
- help='L3 speed.byte 覆盖(单位 bytes)')
- parser.add_argument('-record', type=int, default=None,
- help='L3 speed.record 覆盖')
- args = parser.parse_args()
- # 默认日期:昨天 → 今天(对齐老 datax-single-job-starter.sh:207-219 行为)
- if not args.start_date:
- args.start_date = (date.today() - timedelta(days=1)).strftime('%Y%m%d')
- if not args.stop_date:
- args.stop_date = date.today().strftime('%Y%m%d')
- print('{script} 收到参数: {argv}'.format(
- script=os.path.basename(__file__), argv=' '.join(sys.argv[1:]),
- ))
- print(' start_date={s} stop_date={e}'.format(s=args.start_date, e=args.stop_date))
- exporter = DataxExport(
- base_dir=project_root_dir,
- workers_ini_path=os.path.join(project_root_dir, 'conf', 'workers.ini'),
- release_user=os.environ['RELEASE_USER'],
- release_root_dir=os.environ['RELEASE_ROOT_DIR'],
- python3_path=os.environ['PYTHON3_PATH'],
- datax_home=os.environ['DATAX_HOME'],
- log_root_dir=os.environ['LOG_ROOT_DIR'],
- )
- speed_overrides = {'channel': args.channel, 'byte': args.byte, 'record': args.record}
- failed = exporter.run(
- inis=args.ini,
- inis_dirs=args.inis,
- start_date=args.start_date,
- stop_date=args.stop_date,
- host=args.host,
- use_random=args.use_random,
- parallel=args.parallel,
- skip_datax=args.skip_datax,
- skip_check=args.skip_check,
- speed_overrides=speed_overrides,
- )
- sys.exit(failed)
- if __name__ == '__main__':
- main()
|