hdfs_reader.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. # -*- coding:utf-8 -*-
  2. from configparser import ConfigParser
  3. from typing import List, Dict
  4. from dw_base.datax.plugins.reader.reader import Reader
  5. # hdfs reader
  6. HDFS_READER_NAME = 'hdfsreader'
  7. HDFS_READER_PARAMETER_COMPRESS = 'compress'
  8. HDFS_READER_PARAMETER_ENCODING = 'encoding'
  9. HDFS_READER_PARAMETER_FIELD_DELIMITER = 'fieldDelimiter'
  10. HDFS_READER_PARAMETER_FILE_TYPE = 'fileType'
  11. HDFS_READER_PARAMETER_NULL_FORMAT = "nullFormat"
  12. HDFS_READER_PARAMETER_PATH = 'path'
  13. class HDFSReader(Reader):
  14. def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
  15. super(HDFSReader, self).__init__(base_dir, config_parser, start_date, stop_date)
  16. self.plugin_name = HDFS_READER_NAME
  17. def load_others(self):
  18. path = self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_PATH)
  19. self.check_config(HDFS_READER_PARAMETER_PATH, path)
  20. if path.__contains__('${start_date}'):
  21. path = path.replace('${start_date}', self.start_date)
  22. if path.__contains__('${start-date}'):
  23. path = path.replace('${start-date}', self.start_date)
  24. if path.__contains__('${dt}'):
  25. path = path.replace('${dt}', self.start_date)
  26. self.parameter[HDFS_READER_PARAMETER_PATH] = path
  27. self.parameter[HDFS_READER_PARAMETER_FILE_TYPE] = \
  28. self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_FILE_TYPE) or 'text'
  29. self.parameter[HDFS_READER_PARAMETER_ENCODING] = \
  30. self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_ENCODING) or 'UTF-8'
  31. self.parameter[HDFS_READER_PARAMETER_COMPRESS] = \
  32. self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_NULL_FORMAT) or ''
  33. self.parameter[HDFS_READER_PARAMETER_NULL_FORMAT] = \
  34. self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_NULL_FORMAT) or ''
  35. self.parameter[HDFS_READER_PARAMETER_FIELD_DELIMITER] = \
  36. self.config_parser.get(self.plugin_type, HDFS_READER_PARAMETER_FIELD_DELIMITER) or '\t'
  37. self.parameter[HDFS_READER_PARAMETER_FIELD_DELIMITER] = \
  38. self.parameter[HDFS_READER_PARAMETER_FIELD_DELIMITER].replace("\\t", "\t")
  39. @staticmethod
  40. def generate_definition(hdfs_ds_name: str, hdfs_path: str,
  41. hive_database: str, hive_table_name: str, hive_table_comment: str, partitioned: bool,
  42. column_names: List[str], column_types: Dict[str, str]) -> str:
  43. if partitioned:
  44. # 分区表
  45. path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}/dt=%s' % '${dt}'
  46. else:
  47. # 非分区表
  48. path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}'
  49. column = []
  50. column_type = []
  51. for col_name in column_names:
  52. column.append(col_name)
  53. if column_types.__contains__(col_name):
  54. curr_type = column_types.get(col_name)
  55. if curr_type.lower() != 'string':
  56. column_type.append(f'{col_name}:{curr_type.upper()}')
  57. column_type = ','.join(column_type)
  58. definition = [
  59. f'# {hive_table_name}: {hive_table_comment}',
  60. '[reader]',
  61. 'dataSource = %s' % hdfs_ds_name,
  62. f'column = {",".join(column)}',
  63. f'columnType = {column_type}',
  64. f'path = {path}',
  65. 'fileType = orc',
  66. 'encoding = UTF-8',
  67. r'fieldDelimiter = \t',
  68. 'nullFormat ='
  69. ]
  70. return '\n'.join(definition)