# -*- coding:utf-8 -*- import re from configparser import ConfigParser from typing import Dict, List from dw_base.datax.plugins.writer.writer import Writer # hbase writer HBASE_WRITER_NAME = 'hbaseapiwriter' HBASE_WRITER_PARAMETER_COLUMN_FAMILY = 'columnFamily' HBASE_WRITER_PARAMETER_CONF = 'conf' HBASE_WRITER_PARAMETER_END_KEY = 'endKey' HBASE_WRITER_PARAMETER_NAMESPACE = 'namespace' HBASE_WRITER_PARAMETER_REGION_NUMBER = 'regionNumber' HBASE_WRITER_PARAMETER_ROW_KEY_COLUMN = 'rowKeyColumn' HBASE_WRITER_PARAMETER_START_KEY = 'startKey' HBASE_WRITER_PARAMETER_TABLE = 'table' HBASE_WRITER_PARAMETER_TRUNCATE = 'truncate' HBASE_WRITER_PARAMETER_WRITE_BATCH_SIZE = 'writeBatchSize' class HBaseWriter(Writer): def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None): super(HBaseWriter, self).__init__(base_dir, config_parser, start_date, stop_date) self.plugin_name = HBASE_WRITER_NAME def load_data_source(self): ds_dict: Dict[str, str] = self.datasource.parse() conf = {} for key, value in ds_dict.items(): conf[key] = value self.parameter[HBASE_WRITER_PARAMETER_CONF] = conf def load_others(self): # 放在datasource 部分 # conf = self.config_parser.get(self.plugin_type, 'conf') # if conf: # self.parameter[HBASE_WRITER_PARAMETER_CONF] = json.loads(conf) # self.parameter[HBASE_WRITER_PARAMETER_CONF] = 'conf' end_key = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_END_KEY) if end_key: self.parameter[HBASE_WRITER_PARAMETER_END_KEY] = end_key start_key = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_START_KEY) if start_key: self.parameter[HBASE_WRITER_PARAMETER_START_KEY] = start_key self.parameter[HBASE_WRITER_PARAMETER_REGION_NUMBER] = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_REGION_NUMBER) or 3 namespace = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_NAMESPACE) self.parameter[HBASE_WRITER_PARAMETER_NAMESPACE] = namespace table = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_TABLE) self.check_config(self.plugin_type, table) if table: self.parameter[HBASE_WRITER_PARAMETER_TABLE] = table truncate = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_TRUNCATE) self.parameter[HBASE_WRITER_PARAMETER_TRUNCATE] = truncate and truncate.lower() == 'true' batch_size = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_WRITE_BATCH_SIZE) try: self.parameter[HBASE_WRITER_PARAMETER_WRITE_BATCH_SIZE] = int(batch_size) except: self.parameter[HBASE_WRITER_PARAMETER_WRITE_BATCH_SIZE] = 10000 def load_column(self): super(HBaseWriter, self).load_column() self.parameter[HBASE_WRITER_PARAMETER_COLUMN_FAMILY] = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_COLUMN_FAMILY) row_key_columns = [] row_key_column_definition = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_ROW_KEY_COLUMN) \ .split(',') # type: [str] separator_pattern = 'separator\((.+?)\)' reverse_pattern = 'reverse\((.+?)\)' for row_key_column in row_key_column_definition: separator_matcher = re.search(separator_pattern, row_key_column) reverse_matcher = re.match(reverse_pattern, row_key_column) if separator_matcher: separator = separator_matcher.group(1) row_key_columns.append( { "index": -1, "value": separator, "type": "string" } ) continue elif reverse_matcher: row_key_column = reverse_matcher.group(1) row_key_index = -1 for index in range(len(self.columns)): col = self.columns[index] # type: str if col.endswith(':' + row_key_column): row_key_index = index break if row_key_index != -1: row_key_columns.append( { "index": row_key_index, "reverse": True, "type": "string" } ) else: raise Exception('specified row key column %s not found in columns' % row_key_column) else: row_key_index = -1 for index in range(len(self.columns)): col = self.columns[index] # type: str if col.endswith(':' + row_key_column): row_key_index = index break if row_key_index != -1: row_key_columns.append( { "index": row_key_index, "type": "string" } ) else: raise Exception('specified row key column %s not found in columns' % row_key_column) self.parameter[HBASE_WRITER_PARAMETER_ROW_KEY_COLUMN] = row_key_columns @staticmethod def generate_definition(hbase_ds_name: str, hbase_namespace: str, hbase_table_name: str, hive_table_name: str, hive_table_comment: str, column_family: str, column_names: List[str], column_types: Dict[str, str], row_key_columns: List[str]) -> str: column = [] column_type = [] for col_name in column_names: column.append(col_name) if column_types.__contains__(col_name): column_type.append(f'{col_name}:{column_types.get(col_name)}') column_type = ','.join(column_type) definition = [ '[writer]', f'# {hive_table_name}: {hive_table_comment}', 'dataSource = %s' % hbase_ds_name, 'namespace = %s' % hbase_namespace, 'table = %s' % hbase_table_name, f'columnFamily = {column_family}', f'column = {",".join(column)}', f'columnType = {column_type}', f'rowKeyColumn = {",".join(row_key_columns)}', 'truncate = false', 'startKey = 00', 'endKey = 99', 'regionNumber = 101', 'writeBatchSize = 100000', ] return '\n'.join(definition)