| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- # -*- coding:utf-8 -*-
- import re
- from configparser import ConfigParser
- from typing import Dict, List
- from dw_base.datax.plugins.writer.writer import Writer
- # hbase writer
- HBASE_WRITER_NAME = 'hbaseapiwriter'
- HBASE_WRITER_PARAMETER_COLUMN_FAMILY = 'columnFamily'
- HBASE_WRITER_PARAMETER_CONF = 'conf'
- HBASE_WRITER_PARAMETER_END_KEY = 'endKey'
- HBASE_WRITER_PARAMETER_NAMESPACE = 'namespace'
- HBASE_WRITER_PARAMETER_REGION_NUMBER = 'regionNumber'
- HBASE_WRITER_PARAMETER_ROW_KEY_COLUMN = 'rowKeyColumn'
- HBASE_WRITER_PARAMETER_START_KEY = 'startKey'
- HBASE_WRITER_PARAMETER_TABLE = 'table'
- HBASE_WRITER_PARAMETER_TRUNCATE = 'truncate'
- HBASE_WRITER_PARAMETER_WRITE_BATCH_SIZE = 'writeBatchSize'
- class HBaseWriter(Writer):
- def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
- super(HBaseWriter, self).__init__(base_dir, config_parser, start_date, stop_date)
- self.plugin_name = HBASE_WRITER_NAME
- def load_data_source(self):
- ds_dict: Dict[str, str] = self.datasource.parse()
- conf = {}
- for key, value in ds_dict.items():
- conf[key] = value
- self.parameter[HBASE_WRITER_PARAMETER_CONF] = conf
- def load_others(self):
- # 放在datasource 部分
- # conf = self.config_parser.get(self.plugin_type, 'conf')
- # if conf:
- # self.parameter[HBASE_WRITER_PARAMETER_CONF] = json.loads(conf)
- # self.parameter[HBASE_WRITER_PARAMETER_CONF] = 'conf'
- end_key = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_END_KEY)
- if end_key:
- self.parameter[HBASE_WRITER_PARAMETER_END_KEY] = end_key
- start_key = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_START_KEY)
- if start_key:
- self.parameter[HBASE_WRITER_PARAMETER_START_KEY] = start_key
- self.parameter[HBASE_WRITER_PARAMETER_REGION_NUMBER] = self.config_parser.get(self.plugin_type,
- HBASE_WRITER_PARAMETER_REGION_NUMBER) or 3
- namespace = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_NAMESPACE)
- self.parameter[HBASE_WRITER_PARAMETER_NAMESPACE] = namespace
- table = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_TABLE)
- self.check_config(self.plugin_type, table)
- if table:
- self.parameter[HBASE_WRITER_PARAMETER_TABLE] = table
- truncate = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_TRUNCATE)
- self.parameter[HBASE_WRITER_PARAMETER_TRUNCATE] = truncate and truncate.lower() == 'true'
- batch_size = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_WRITE_BATCH_SIZE)
- try:
- self.parameter[HBASE_WRITER_PARAMETER_WRITE_BATCH_SIZE] = int(batch_size)
- except:
- self.parameter[HBASE_WRITER_PARAMETER_WRITE_BATCH_SIZE] = 10000
- def load_column(self):
- super(HBaseWriter, self).load_column()
- self.parameter[HBASE_WRITER_PARAMETER_COLUMN_FAMILY] = self.config_parser.get(self.plugin_type,
- HBASE_WRITER_PARAMETER_COLUMN_FAMILY)
- row_key_columns = []
- row_key_column_definition = self.config_parser.get(self.plugin_type,
- HBASE_WRITER_PARAMETER_ROW_KEY_COLUMN) \
- .split(',') # type: [str]
- separator_pattern = 'separator\((.+?)\)'
- reverse_pattern = 'reverse\((.+?)\)'
- for row_key_column in row_key_column_definition:
- separator_matcher = re.search(separator_pattern, row_key_column)
- reverse_matcher = re.match(reverse_pattern, row_key_column)
- if separator_matcher:
- separator = separator_matcher.group(1)
- row_key_columns.append(
- {
- "index": -1,
- "value": separator,
- "type": "string"
- }
- )
- continue
- elif reverse_matcher:
- row_key_column = reverse_matcher.group(1)
- row_key_index = -1
- for index in range(len(self.columns)):
- col = self.columns[index] # type: str
- if col.endswith(':' + row_key_column):
- row_key_index = index
- break
- if row_key_index != -1:
- row_key_columns.append(
- {
- "index": row_key_index,
- "reverse": True,
- "type": "string"
- }
- )
- else:
- raise Exception('specified row key column %s not found in columns' % row_key_column)
- else:
- row_key_index = -1
- for index in range(len(self.columns)):
- col = self.columns[index] # type: str
- if col.endswith(':' + row_key_column):
- row_key_index = index
- break
- if row_key_index != -1:
- row_key_columns.append(
- {
- "index": row_key_index,
- "type": "string"
- }
- )
- else:
- raise Exception('specified row key column %s not found in columns' % row_key_column)
- self.parameter[HBASE_WRITER_PARAMETER_ROW_KEY_COLUMN] = row_key_columns
- @staticmethod
- def generate_definition(hbase_ds_name: str,
- hbase_namespace: str,
- hbase_table_name: str,
- hive_table_name: str,
- hive_table_comment: str,
- column_family: str,
- column_names: List[str],
- column_types: Dict[str, str],
- row_key_columns: List[str]) -> str:
- column = []
- column_type = []
- for col_name in column_names:
- column.append(col_name)
- if column_types.__contains__(col_name):
- column_type.append(f'{col_name}:{column_types.get(col_name)}')
- column_type = ','.join(column_type)
- definition = [
- '[writer]',
- f'# {hive_table_name}: {hive_table_comment}',
- 'dataSource = %s' % hbase_ds_name,
- 'namespace = %s' % hbase_namespace,
- 'table = %s' % hbase_table_name,
- f'columnFamily = {column_family}',
- f'column = {",".join(column)}',
- f'columnType = {column_type}',
- f'rowKeyColumn = {",".join(row_key_columns)}',
- 'truncate = false',
- 'startKey = 00',
- 'endKey = 99',
- 'regionNumber = 101',
- 'writeBatchSize = 100000',
- ]
- return '\n'.join(definition)
|