hdfs_writer.py 4.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # -*- coding:utf-8 -*-
  2. from configparser import ConfigParser
  3. from datetime import datetime, timedelta
  4. from typing import Dict, List
  5. from dw_base.datax.plugins.writer.writer import Writer
  6. # hdfs writer
  7. HDFS_WRITER_NAME = 'hdfswriter'
  8. HDFS_WRITER_PARAMETER_COMPRESS = 'compress'
  9. HDFS_WRITER_PARAMETER_ENCODING = 'encoding'
  10. HDFS_WRITER_PARAMETER_FIELD_DELIMITER = 'fieldDelimiter'
  11. HDFS_WRITER_PARAMETER_FILE_TYPE = 'fileType'
  12. HDFS_WRITER_PARAMETER_PATH = 'path'
  13. HDFS_WRITER_PARAMETER_WRITE_MODE = 'writeMode'
  14. HDFS_WRITER_PARAMETER_FILE_NAME = 'fileName'
  15. class HDFSWriter(Writer):
  16. """
  17. HDFSWriter有4个时间字段:
  18. 1. start_date:表示数据的起始日期,从Reader处传递而来,全量时传递19700101即可
  19. 2. stop_date:表示数据的终止日期,从Reader传递而来
  20. 3. dt:表示分区日期,值为stop_date - 1 day
  21. 4. biz_date:表示文件的前缀,当start_date + 1 day = stop_date时,值为start_date,否则值为${start_date}-${stop_date - 1 day}
  22. """
  23. def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
  24. super(HDFSWriter, self).__init__(base_dir, config_parser, start_date, stop_date)
  25. self.plugin_name = HDFS_WRITER_NAME
  26. def load_others(self):
  27. path = self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_PATH)
  28. self.check_config(HDFS_WRITER_PARAMETER_PATH, path)
  29. if path.__contains__('${dt}'):
  30. stop_at = datetime.strptime(self.stop_date, '%Y%m%d')
  31. dt = (stop_at - timedelta(days=1)).strftime('%Y%m%d')
  32. path = path.replace('${dt}', dt)
  33. self.parameter[HDFS_WRITER_PARAMETER_PATH] = path
  34. self.parameter[HDFS_WRITER_PARAMETER_FILE_TYPE] = \
  35. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FILE_TYPE) or 'text'
  36. if self.config_parser.has_option(self.plugin_type, HDFS_WRITER_PARAMETER_COMPRESS):
  37. self.parameter[HDFS_WRITER_PARAMETER_COMPRESS] = self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_COMPRESS)
  38. self.parameter[HDFS_WRITER_PARAMETER_ENCODING] = \
  39. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_ENCODING) or 'UTF-8'
  40. self.parameter[HDFS_WRITER_PARAMETER_WRITE_MODE] = \
  41. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_WRITE_MODE) or 'append'
  42. self.parameter[HDFS_WRITER_PARAMETER_FIELD_DELIMITER] = \
  43. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FIELD_DELIMITER).replace("\\t", "\t") or '\t'
  44. self.parameter['kerberosPrincipal'] = 'hdfs@LIXIAOYUN.COM'
  45. self.get_file_name()
  46. def get_file_name(self):
  47. start_at = datetime.strptime(self.start_date, '%Y%m%d')
  48. stop_at = datetime.strptime(self.stop_date, '%Y%m%d')
  49. biz_date = self.start_date
  50. if stop_at - timedelta(days=1) > start_at:
  51. biz_date = f'{self.start_date}-{(stop_at - timedelta(days=1)).strftime("%Y%m%d")}'
  52. self.parameter[HDFS_WRITER_PARAMETER_FILE_NAME] = \
  53. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FILE_NAME).replace('${biz_date}', biz_date)
  54. @staticmethod
  55. def generate_definition(hdfs_ds_name: str, hdfs_path: str,
  56. hive_database: str, hive_table_name: str, partitioned: bool,
  57. column_names: List[str], column_types: Dict[str, str]) -> str:
  58. if partitioned:
  59. # 分区表
  60. path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}/dt=%s' % '${dt}'
  61. else:
  62. # 非分区表
  63. path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}'
  64. column = []
  65. column_type = []
  66. for col_name in column_names:
  67. column.append(col_name)
  68. if column_types.__contains__(col_name):
  69. column_type.append(f'{col_name}:{column_types.get(col_name)}')
  70. column_type = ','.join(column_type)
  71. definition = [
  72. '[writer]',
  73. 'dataSource = %s' % hdfs_ds_name,
  74. f'path = {path}',
  75. f'column = {",".join(column)}',
  76. f'columnType = {column_type}',
  77. 'fileType = orc',
  78. # 'fileName = ${biz_date}',
  79. f'fileName = {hive_table_name}',
  80. 'compress = NONE',
  81. 'encoding = utf-8',
  82. ';writeMode支持append、nonConflict和truncate',
  83. 'writeMode = truncate',
  84. r'fieldDelimiter = \t'
  85. ]
  86. return '\n'.join(definition)