| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- # -*- coding:utf-8 -*-
- from typing import List, Dict
- from dw_base.hive.hive_constants import COLUMN_NAME_TYPE_DICT, COLUMN_NAME_COMMENT_DICT
- def get_hive_database_name(project: str, layer: str, env: str) -> str:
- """
- 获取Hive数据库名称
- Args:
- project: 所属项目
- layer: 所属层次
- env: 所属环境
- Returns: Hive数据库名称
- """
- if project and project != '':
- if layer and layer != '':
- database = f'{project}_{layer}'
- else:
- database = f'{project}_ods'
- elif layer and layer != '':
- database = layer
- else:
- database = 'tmp'
- if database != 'tmp' and env and env != '':
- database = f'{database}_{env}'
- return database
- def get_hive_table_prefix(project: str, layer: str, version: str) -> str:
- """
- 获取表名前缀
- Args:
- project: 所属项目
- layer: 所属层次
- version: 所属版本
- Returns: 表名前缀
- """
- if layer and layer != '':
- if project and project != '':
- prefix = f'{layer}_{project}'
- else:
- prefix = layer
- if version and version != '':
- prefix = f'{prefix}_{version}'
- elif project and project != '':
- prefix = project
- else:
- prefix = 'tmp'
- return prefix
- def get_hive_create_table_ddl(database: str,
- table: str,
- columns: List[str],
- columns_with_types: Dict,
- comment: str = '',
- is_external: bool = False,
- is_partitioned: bool = False):
- """
- 生成Hive建表语句
- Args:
- database: 数据库名称
- table: 表名称
- columns: 字段
- comment: 表注释
- is_external: 是否是外部表
- is_partitioned: 是否是分区表
- Returns:
- """
- ddl = "DROP TABLE IF EXISTS {0};\nCREATE {1}TABLE IF NOT EXISTS {2}\n(\n{3}\n)\n\tCOMMENT '{4}'\n{5}\tSTORED AS ORC\n;"
- if is_external:
- argument1 = 'EXTERNAL '
- else:
- argument1 = ''
- if database is None:
- table_with_database = table
- else:
- table_with_database = f'{database}.{table}'
- max_column_length = max(map(len, columns))
- column_defs = []
- for col in columns:
- padded_col = col.ljust(max_column_length, ' ')
- if columns_with_types.__contains__(col):
- col_type = columns_with_types.get(col)
- else:
- col_type = COLUMN_NAME_TYPE_DICT.get(col, 'STRING')
- col_comment = COLUMN_NAME_COMMENT_DICT.get(col, '')
- column_defs.append(f" {padded_col} {col_type} COMMENT '{col_comment}'")
- argument3 = ',\n'.join(column_defs)
- if is_partitioned:
- argument5 = '\tPARTITIONED BY (dt STRING)\n'
- else:
- argument5 = ''
- return ddl.format(table_with_database, argument1, table_with_database, argument3, comment, argument5)
- def get_hive_create_table_ddl_sop(database: str,
- table: str,
- columns: List[str],
- columns_with_types: Dict,
- comment: str = '',
- is_external: bool = False,
- is_partitioned: bool = False):
- """
- 生成Hive建表语句
- Args:
- database: 数据库名称
- table: 表名称
- columns: 字段
- comment: 表注释
- is_external: 是否是外部表
- is_partitioned: 是否是分区表
- Returns:
- """
- ddl = "DROP TABLE IF EXISTS {0};\nCREATE {1}TABLE IF NOT EXISTS {2}\n(\n{3}\n)\n COMMENT '{4}'\n{5} STORED AS ORC\n;"
- if is_external:
- argument1 = 'EXTERNAL '
- else:
- argument1 = ''
- if database is None:
- table_with_database = table
- else:
- table_with_database = f'{database}.{table}'
- max_column_length = max(map(len, columns))
- column_defs = []
- for col in columns:
- padded_col = col.ljust(max_column_length, ' ')
- if columns_with_types.__contains__(col):
- col_type = columns_with_types.get(col)
- else:
- col_type = COLUMN_NAME_TYPE_DICT.get(col, 'STRING')
- col_comment = COLUMN_NAME_COMMENT_DICT.get(col, '')
- column_defs.append(f" {padded_col} {col_type} COMMENT '{col_comment}'")
- argument3 = ',\n'.join(column_defs)
- if is_partitioned:
- argument5 = ' PARTITIONED BY (dt STRING)\n'
- else:
- argument5 = ''
- return ddl.format(table_with_database, argument1, table_with_database, argument3, comment, argument5)
|