hbase_writer.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # -*- coding:utf-8 -*-
  2. import re
  3. from configparser import ConfigParser
  4. from typing import Dict, List
  5. from dw_base.datax.plugins.writer.writer import Writer
  6. # hbase writer
  7. HBASE_WRITER_NAME = 'hbaseapiwriter'
  8. HBASE_WRITER_PARAMETER_COLUMN_FAMILY = 'columnFamily'
  9. HBASE_WRITER_PARAMETER_CONF = 'conf'
  10. HBASE_WRITER_PARAMETER_END_KEY = 'endKey'
  11. HBASE_WRITER_PARAMETER_NAMESPACE = 'namespace'
  12. HBASE_WRITER_PARAMETER_REGION_NUMBER = 'regionNumber'
  13. HBASE_WRITER_PARAMETER_ROW_KEY_COLUMN = 'rowKeyColumn'
  14. HBASE_WRITER_PARAMETER_START_KEY = 'startKey'
  15. HBASE_WRITER_PARAMETER_TABLE = 'table'
  16. HBASE_WRITER_PARAMETER_TRUNCATE = 'truncate'
  17. HBASE_WRITER_PARAMETER_WRITE_BATCH_SIZE = 'writeBatchSize'
  18. class HBaseWriter(Writer):
  19. def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
  20. super(HBaseWriter, self).__init__(base_dir, config_parser, start_date, stop_date)
  21. self.plugin_name = HBASE_WRITER_NAME
  22. def load_data_source(self):
  23. ds_dict: Dict[str, str] = self.datasource.parse()
  24. conf = {}
  25. for key, value in ds_dict.items():
  26. conf[key] = value
  27. self.parameter[HBASE_WRITER_PARAMETER_CONF] = conf
  28. def load_others(self):
  29. # 放在datasource 部分
  30. # conf = self.config_parser.get(self.plugin_type, 'conf')
  31. # if conf:
  32. # self.parameter[HBASE_WRITER_PARAMETER_CONF] = json.loads(conf)
  33. # self.parameter[HBASE_WRITER_PARAMETER_CONF] = 'conf'
  34. end_key = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_END_KEY)
  35. if end_key:
  36. self.parameter[HBASE_WRITER_PARAMETER_END_KEY] = end_key
  37. start_key = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_START_KEY)
  38. if start_key:
  39. self.parameter[HBASE_WRITER_PARAMETER_START_KEY] = start_key
  40. self.parameter[HBASE_WRITER_PARAMETER_REGION_NUMBER] = self.config_parser.get(self.plugin_type,
  41. HBASE_WRITER_PARAMETER_REGION_NUMBER) or 3
  42. namespace = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_NAMESPACE)
  43. self.parameter[HBASE_WRITER_PARAMETER_NAMESPACE] = namespace
  44. table = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_TABLE)
  45. self.check_config(self.plugin_type, table)
  46. if table:
  47. self.parameter[HBASE_WRITER_PARAMETER_TABLE] = table
  48. truncate = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_TRUNCATE)
  49. self.parameter[HBASE_WRITER_PARAMETER_TRUNCATE] = truncate and truncate.lower() == 'true'
  50. batch_size = self.config_parser.get(self.plugin_type, HBASE_WRITER_PARAMETER_WRITE_BATCH_SIZE)
  51. try:
  52. self.parameter[HBASE_WRITER_PARAMETER_WRITE_BATCH_SIZE] = int(batch_size)
  53. except:
  54. self.parameter[HBASE_WRITER_PARAMETER_WRITE_BATCH_SIZE] = 10000
  55. def load_column(self):
  56. super(HBaseWriter, self).load_column()
  57. self.parameter[HBASE_WRITER_PARAMETER_COLUMN_FAMILY] = self.config_parser.get(self.plugin_type,
  58. HBASE_WRITER_PARAMETER_COLUMN_FAMILY)
  59. row_key_columns = []
  60. row_key_column_definition = self.config_parser.get(self.plugin_type,
  61. HBASE_WRITER_PARAMETER_ROW_KEY_COLUMN) \
  62. .split(',') # type: [str]
  63. separator_pattern = 'separator\((.+?)\)'
  64. reverse_pattern = 'reverse\((.+?)\)'
  65. for row_key_column in row_key_column_definition:
  66. separator_matcher = re.search(separator_pattern, row_key_column)
  67. reverse_matcher = re.match(reverse_pattern, row_key_column)
  68. if separator_matcher:
  69. separator = separator_matcher.group(1)
  70. row_key_columns.append(
  71. {
  72. "index": -1,
  73. "value": separator,
  74. "type": "string"
  75. }
  76. )
  77. continue
  78. elif reverse_matcher:
  79. row_key_column = reverse_matcher.group(1)
  80. row_key_index = -1
  81. for index in range(len(self.columns)):
  82. col = self.columns[index] # type: str
  83. if col.endswith(':' + row_key_column):
  84. row_key_index = index
  85. break
  86. if row_key_index != -1:
  87. row_key_columns.append(
  88. {
  89. "index": row_key_index,
  90. "reverse": True,
  91. "type": "string"
  92. }
  93. )
  94. else:
  95. raise Exception('specified row key column %s not found in columns' % row_key_column)
  96. else:
  97. row_key_index = -1
  98. for index in range(len(self.columns)):
  99. col = self.columns[index] # type: str
  100. if col.endswith(':' + row_key_column):
  101. row_key_index = index
  102. break
  103. if row_key_index != -1:
  104. row_key_columns.append(
  105. {
  106. "index": row_key_index,
  107. "type": "string"
  108. }
  109. )
  110. else:
  111. raise Exception('specified row key column %s not found in columns' % row_key_column)
  112. self.parameter[HBASE_WRITER_PARAMETER_ROW_KEY_COLUMN] = row_key_columns
  113. @staticmethod
  114. def generate_definition(hbase_ds_name: str,
  115. hbase_namespace: str,
  116. hbase_table_name: str,
  117. hive_table_name: str,
  118. hive_table_comment: str,
  119. column_family: str,
  120. column_names: List[str],
  121. column_types: Dict[str, str],
  122. row_key_columns: List[str]) -> str:
  123. column = []
  124. column_type = []
  125. for col_name in column_names:
  126. column.append(col_name)
  127. if column_types.__contains__(col_name):
  128. column_type.append(f'{col_name}:{column_types.get(col_name)}')
  129. column_type = ','.join(column_type)
  130. definition = [
  131. '[writer]',
  132. f'# {hive_table_name}: {hive_table_comment}',
  133. 'dataSource = %s' % hbase_ds_name,
  134. 'namespace = %s' % hbase_namespace,
  135. 'table = %s' % hbase_table_name,
  136. f'columnFamily = {column_family}',
  137. f'column = {",".join(column)}',
  138. f'columnType = {column_type}',
  139. f'rowKeyColumn = {",".join(row_key_columns)}',
  140. 'truncate = false',
  141. 'startKey = 00',
  142. 'endKey = 99',
  143. 'regionNumber = 101',
  144. 'writeBatchSize = 100000',
  145. ]
  146. return '\n'.join(definition)