hdfs_writer.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. # -*- coding:utf-8 -*-
  2. from configparser import ConfigParser
  3. from datetime import datetime, timedelta
  4. from typing import Dict, List
  5. from dw_base.datax.plugins.writer.writer import Writer
  6. # hdfs writer
  7. HDFS_WRITER_NAME = 'hdfswriter'
  8. HDFS_WRITER_PARAMETER_COMPRESS = 'compress'
  9. HDFS_WRITER_PARAMETER_ENCODING = 'encoding'
  10. HDFS_WRITER_PARAMETER_FIELD_DELIMITER = 'fieldDelimiter'
  11. HDFS_WRITER_PARAMETER_FILE_TYPE = 'fileType'
  12. HDFS_WRITER_PARAMETER_PATH = 'path'
  13. HDFS_WRITER_PARAMETER_WRITE_MODE = 'writeMode'
  14. HDFS_WRITER_PARAMETER_FILE_NAME = 'fileName'
  15. class HDFSWriter(Writer):
  16. """
  17. HDFSWriter有4个时间字段:
  18. 1. start_date:表示数据的起始日期,从Reader处传递而来,全量时传递19700101即可
  19. 2. stop_date:表示数据的终止日期,从Reader传递而来
  20. 3. dt:表示分区日期,值为 start_date(业务日);分区内允许含次日漂移数据
  21. (按 ADR-03 raw 不纠正分区漂移,宽窗 [start, stop) 抓到的全部入此分区)
  22. 4. biz_date:表示文件的前缀,当start_date + 1 day = stop_date时,值为start_date,否则值为${start_date}-${stop_date - 1 day}
  23. """
  24. def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
  25. super(HDFSWriter, self).__init__(base_dir, config_parser, start_date, stop_date)
  26. self.plugin_name = HDFS_WRITER_NAME
  27. def load_others(self):
  28. path = self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_PATH)
  29. self.check_config(HDFS_WRITER_PARAMETER_PATH, path)
  30. if path.__contains__('${dt}'):
  31. path = path.replace('${dt}', self.start_date)
  32. self.parameter[HDFS_WRITER_PARAMETER_PATH] = path
  33. self.parameter[HDFS_WRITER_PARAMETER_FILE_TYPE] = \
  34. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FILE_TYPE) or 'text'
  35. if self.config_parser.has_option(self.plugin_type, HDFS_WRITER_PARAMETER_COMPRESS):
  36. self.parameter[HDFS_WRITER_PARAMETER_COMPRESS] = self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_COMPRESS)
  37. self.parameter[HDFS_WRITER_PARAMETER_ENCODING] = \
  38. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_ENCODING) or 'UTF-8'
  39. self.parameter[HDFS_WRITER_PARAMETER_WRITE_MODE] = \
  40. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_WRITE_MODE) or 'append'
  41. self.parameter[HDFS_WRITER_PARAMETER_FIELD_DELIMITER] = \
  42. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FIELD_DELIMITER).replace("\\t", "\t") or '\t'
  43. self.get_file_name()
  44. def get_file_name(self):
  45. start_at = datetime.strptime(self.start_date, '%Y%m%d')
  46. stop_at = datetime.strptime(self.stop_date, '%Y%m%d')
  47. biz_date = self.start_date
  48. if stop_at - timedelta(days=1) > start_at:
  49. biz_date = f'{self.start_date}-{(stop_at - timedelta(days=1)).strftime("%Y%m%d")}'
  50. self.parameter[HDFS_WRITER_PARAMETER_FILE_NAME] = \
  51. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FILE_NAME).replace('${biz_date}', biz_date)
  52. @staticmethod
  53. def generate_definition(hdfs_ds_name: str, hdfs_path: str,
  54. hive_database: str, hive_table_name: str, partitioned: bool,
  55. column_names: List[str], column_types: Dict[str, str]) -> str:
  56. if partitioned:
  57. # 分区表
  58. path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}/dt=%s' % '${dt}'
  59. else:
  60. # 非分区表
  61. path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}'
  62. column = []
  63. column_type = []
  64. for col_name in column_names:
  65. column.append(col_name)
  66. if column_types.__contains__(col_name):
  67. column_type.append(f'{col_name}:{column_types.get(col_name)}')
  68. column_type = ','.join(column_type)
  69. definition = [
  70. '[writer]',
  71. 'dataSource = %s' % hdfs_ds_name,
  72. f'path = {path}',
  73. f'column = {",".join(column)}',
  74. f'columnType = {column_type}',
  75. 'fileType = orc',
  76. # 'fileName = ${biz_date}',
  77. f'fileName = {hive_table_name}',
  78. 'compress = NONE',
  79. 'encoding = utf-8',
  80. ';writeMode支持append、nonConflict和truncate',
  81. 'writeMode = truncate',
  82. r'fieldDelimiter = \t'
  83. ]
  84. return '\n'.join(definition)