import sys import os import re abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) from dw_base.scheduler.mg2es.hive_sql import HiveSQL from dw_base.utils.config_utils import parse_args def merge_hdfs_file(group, db, dt): hive = HiveSQL('192.168.30.3', 10000, 'alvis', None, db) if group == 'cts' and db == 'dwd' and dt is not None: tbl_list = hive.query("show tables") for tbl in tbl_list: tbl_name = tbl[0] if tbl_name.startswith('cts_') and (tbl_name.endswith('_ex') or tbl_name.endswith('_im')) and 'mirror_country' not in tbl_name: sql = f"alter table {db}.{tbl_name} partition (dt='{dt}') concatenate" # print(sql) hive.execute(sql) else: tbl_list = hive.query("show tables") for tbl in tbl_list: tbl_name = tbl[0] if tbl_name.startswith(f'{group}_'): partition_list = hive.query(f"show partitions {db}.{tbl_name}") if partition_list is None or len(partition_list) == 0: continue for partition in partition_list: pt = partition[0] pt_arr = pt.split('/') pt_str = '' for p in pt_arr: p_key = p.split('=')[0] p_value = p.split('=')[1] pt_str = f"{pt_str}{p_key}='{p_value}'," pt_str = pt_str[:-1] hive.execute(f"alter table {db}.{tbl_name} partition ({pt_str}) concatenate") if __name__ == '__main__': CONFIG, _ = parse_args(sys.argv[1:]) group = CONFIG.get('group') db = CONFIG.get('db') dt = CONFIG.get('dt') merge_hdfs_file(group, db, dt)