hive_file_merge.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. import sys
  2. import os
  3. import re
  4. abspath = os.path.abspath(__file__)
  5. root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
  6. sys.path.append(root_path)
  7. from dw_base.scheduler.mg2es.hive_sql import HiveSQL
  8. from dw_base.utils.config_utils import parse_args
  9. def merge_hdfs_file(group, db, dt):
  10. hive = HiveSQL('192.168.30.3', 10000, 'alvis', None, db)
  11. if group == 'cts' and db == 'dwd' and dt is not None:
  12. tbl_list = hive.query("show tables")
  13. for tbl in tbl_list:
  14. tbl_name = tbl[0]
  15. if tbl_name.startswith('cts_') and (tbl_name.endswith('_ex') or tbl_name.endswith('_im')) and 'mirror_country' not in tbl_name:
  16. sql = f"alter table {db}.{tbl_name} partition (dt='{dt}') concatenate"
  17. # print(sql)
  18. hive.execute(sql)
  19. else:
  20. tbl_list = hive.query("show tables")
  21. for tbl in tbl_list:
  22. tbl_name = tbl[0]
  23. if tbl_name.startswith(f'{group}_'):
  24. partition_list = hive.query(f"show partitions {db}.{tbl_name}")
  25. if partition_list is None or len(partition_list) == 0:
  26. continue
  27. for partition in partition_list:
  28. pt = partition[0]
  29. pt_arr = pt.split('/')
  30. pt_str = ''
  31. for p in pt_arr:
  32. p_key = p.split('=')[0]
  33. p_value = p.split('=')[1]
  34. pt_str = f"{pt_str}{p_key}='{p_value}',"
  35. pt_str = pt_str[:-1]
  36. hive.execute(f"alter table {db}.{tbl_name} partition ({pt_str}) concatenate")
  37. if __name__ == '__main__':
  38. CONFIG, _ = parse_args(sys.argv[1:])
  39. group = CONFIG.get('group')
  40. db = CONFIG.get('db')
  41. dt = CONFIG.get('dt')
  42. merge_hdfs_file(group, db, dt)