# -*- coding:utf-8 -*- from typing import List, Dict from dw_base.hive.hive_constants import COLUMN_NAME_TYPE_DICT, COLUMN_NAME_COMMENT_DICT def get_hive_database_name(project: str, layer: str, env: str) -> str: """ 获取Hive数据库名称 Args: project: 所属项目 layer: 所属层次 env: 所属环境 Returns: Hive数据库名称 """ if project and project != '': if layer and layer != '': database = f'{project}_{layer}' else: database = f'{project}_ods' elif layer and layer != '': database = layer else: database = 'tmp' if database != 'tmp' and env and env != '': database = f'{database}_{env}' return database def get_hive_table_prefix(project: str, layer: str, version: str) -> str: """ 获取表名前缀 Args: project: 所属项目 layer: 所属层次 version: 所属版本 Returns: 表名前缀 """ if layer and layer != '': if project and project != '': prefix = f'{layer}_{project}' else: prefix = layer if version and version != '': prefix = f'{prefix}_{version}' elif project and project != '': prefix = project else: prefix = 'tmp' return prefix def get_hive_create_table_ddl(database: str, table: str, columns: List[str], columns_with_types: Dict, comment: str = '', is_external: bool = False, is_partitioned: bool = False): """ 生成Hive建表语句 Args: database: 数据库名称 table: 表名称 columns: 字段 comment: 表注释 is_external: 是否是外部表 is_partitioned: 是否是分区表 Returns: """ ddl = "DROP TABLE IF EXISTS {0};\nCREATE {1}TABLE IF NOT EXISTS {2}\n(\n{3}\n)\n\tCOMMENT '{4}'\n{5}\tSTORED AS ORC\n;" if is_external: argument1 = 'EXTERNAL ' else: argument1 = '' if database is None: table_with_database = table else: table_with_database = f'{database}.{table}' max_column_length = max(map(len, columns)) column_defs = [] for col in columns: padded_col = col.ljust(max_column_length, ' ') if columns_with_types.__contains__(col): col_type = columns_with_types.get(col) else: col_type = COLUMN_NAME_TYPE_DICT.get(col, 'STRING') col_comment = COLUMN_NAME_COMMENT_DICT.get(col, '') column_defs.append(f" {padded_col} {col_type} COMMENT '{col_comment}'") argument3 = ',\n'.join(column_defs) if is_partitioned: argument5 = '\tPARTITIONED BY (dt STRING)\n' else: argument5 = '' return ddl.format(table_with_database, argument1, table_with_database, argument3, comment, argument5) def get_hive_create_table_ddl_sop(database: str, table: str, columns: List[str], columns_with_types: Dict, comment: str = '', is_external: bool = False, is_partitioned: bool = False): """ 生成Hive建表语句 Args: database: 数据库名称 table: 表名称 columns: 字段 comment: 表注释 is_external: 是否是外部表 is_partitioned: 是否是分区表 Returns: """ ddl = "DROP TABLE IF EXISTS {0};\nCREATE {1}TABLE IF NOT EXISTS {2}\n(\n{3}\n)\n COMMENT '{4}'\n{5} STORED AS ORC\n;" if is_external: argument1 = 'EXTERNAL ' else: argument1 = '' if database is None: table_with_database = table else: table_with_database = f'{database}.{table}' max_column_length = max(map(len, columns)) column_defs = [] for col in columns: padded_col = col.ljust(max_column_length, ' ') if columns_with_types.__contains__(col): col_type = columns_with_types.get(col) else: col_type = COLUMN_NAME_TYPE_DICT.get(col, 'STRING') col_comment = COLUMN_NAME_COMMENT_DICT.get(col, '') column_defs.append(f" {padded_col} {col_type} COMMENT '{col_comment}'") argument3 = ',\n'.join(column_defs) if is_partitioned: argument5 = ' PARTITIONED BY (dt STRING)\n' else: argument5 = '' return ddl.format(table_with_database, argument1, table_with_database, argument3, comment, argument5)