| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 |
- import sys
- import os
- import re
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- from dw_base.scheduler.mg2es.hive_sql import HiveSQL
- from dw_base.utils.config_utils import parse_args
- def merge_hdfs_file(group, db, dt):
- hive = HiveSQL('192.168.30.3', 10000, 'alvis', None, db)
- if group == 'cts' and db == 'dwd' and dt is not None:
- tbl_list = hive.query("show tables")
- for tbl in tbl_list:
- tbl_name = tbl[0]
- if tbl_name.startswith('cts_') and (tbl_name.endswith('_ex') or tbl_name.endswith('_im')) and 'mirror_country' not in tbl_name:
- sql = f"alter table {db}.{tbl_name} partition (dt='{dt}') concatenate"
- # print(sql)
- hive.execute(sql)
- else:
- tbl_list = hive.query("show tables")
- for tbl in tbl_list:
- tbl_name = tbl[0]
- if tbl_name.startswith(f'{group}_'):
- partition_list = hive.query(f"show partitions {db}.{tbl_name}")
- if partition_list is None or len(partition_list) == 0:
- continue
- for partition in partition_list:
- pt = partition[0]
- pt_arr = pt.split('/')
- pt_str = ''
- for p in pt_arr:
- p_key = p.split('=')[0]
- p_value = p.split('=')[1]
- pt_str = f"{pt_str}{p_key}='{p_value}',"
- pt_str = pt_str[:-1]
- hive.execute(f"alter table {db}.{tbl_name} partition ({pt_str}) concatenate")
- if __name__ == '__main__':
- CONFIG, _ = parse_args(sys.argv[1:])
- group = CONFIG.get('group')
- db = CONFIG.get('db')
- dt = CONFIG.get('dt')
- merge_hdfs_file(group, db, dt)
|