|
|
@@ -4,11 +4,9 @@ import re
|
|
|
from configparser import ConfigParser
|
|
|
from typing import Dict, List
|
|
|
|
|
|
-from dw_base.common.template_constants import MYSQL_HIVE_CREATE_TABLE_TEMPLATE, MYSQL_HIVE_HBASE_CREATE_TABLE_TEMPLATE
|
|
|
from dw_base.datax.datax_constants import *
|
|
|
from dw_base.datax.plugins.reader.reader import Reader
|
|
|
from dw_base.utils.datetime_utils import local_2_utc, parse_datetime
|
|
|
-from dw_base.utils.file_utils import read_file_content
|
|
|
|
|
|
# mysql reader
|
|
|
MYSQL_READER_NAME = 'mysqlreader'
|
|
|
@@ -190,59 +188,3 @@ class MySQLReader(Reader):
|
|
|
definition.append('utc =')
|
|
|
return '\n'.join(definition)
|
|
|
|
|
|
- @staticmethod
|
|
|
- def generate_hive_ddl(hive_database_name: str,
|
|
|
- hive_table_name: str,
|
|
|
- table_comment: str,
|
|
|
- partitioned: bool,
|
|
|
- columns: List,
|
|
|
- column_types: Dict[str, str]) -> str:
|
|
|
- columns_definition = []
|
|
|
- partition_def = ''
|
|
|
- for column in columns:
|
|
|
- column_name = column.COLUMN_NAME
|
|
|
- column_comment = column.COLUMN_COMMENT
|
|
|
- if MYSQL_KEYWORDS.__contains__(column_name):
|
|
|
- column_name = str(f'`{column_name}`')
|
|
|
- if column_types.__contains__(column_name):
|
|
|
- column_type = str(column_types[column_name]).upper()
|
|
|
- else:
|
|
|
- column_type = "STRING"
|
|
|
- columns_definition.append(f"{column_name} {column_type} COMMENT '{column_comment}'")
|
|
|
- if partitioned is not None and partitioned:
|
|
|
- partition_def = '\nPARTITIONED BY (dt STRING)'
|
|
|
- ddl = read_file_content(MYSQL_HIVE_CREATE_TABLE_TEMPLATE).format(
|
|
|
- hive_database_name, hive_table_name, hive_database_name, hive_table_name,
|
|
|
- ',\n'.join(columns_definition), table_comment, partition_def
|
|
|
- )
|
|
|
- return ddl
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def generate_hive_over_hbase_ddl(hive_database_name: str,
|
|
|
- hive_table_name: str,
|
|
|
- table_comment: str,
|
|
|
- hbase_namespace: str,
|
|
|
- hbase_table_name: str,
|
|
|
- columns: List,
|
|
|
- column_types: Dict[str, str]) -> str:
|
|
|
- columns_definition = []
|
|
|
- hbase_column_mapping_definition = []
|
|
|
- partition_def = ''
|
|
|
- for column in columns:
|
|
|
- column_name = column.COLUMN_NAME
|
|
|
- column_comment = column.COLUMN_COMMENT
|
|
|
- if MYSQL_KEYWORDS.__contains__(column_name):
|
|
|
- column_name = str(f'`{column_name}`')
|
|
|
- if column_types.__contains__(column_name):
|
|
|
- column_type = str(column_types[column_name]).upper()
|
|
|
- else:
|
|
|
- column_type = "STRING"
|
|
|
- columns_definition.append(f"{column_name} {column_type} COMMENT '{column_comment}'")
|
|
|
- hbase_column_mapping_definition.append(f"cf:{column_name}")
|
|
|
- ddl_template = read_file_content(MYSQL_HIVE_HBASE_CREATE_TABLE_TEMPLATE)
|
|
|
- ddl = ddl_template.format(
|
|
|
- hive_database_name, hive_table_name, hive_database_name, hive_table_name,
|
|
|
- ',\n'.join(columns_definition), table_comment, partition_def,
|
|
|
- ',\n'.join(hbase_column_mapping_definition), hbase_namespace, hbase_table_name
|
|
|
- )
|
|
|
- return ddl
|