hdfs_writer.py 4.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # -*- coding:utf-8 -*-
  2. from configparser import ConfigParser
  3. from datetime import datetime, timedelta
  4. from typing import Dict, List
  5. from dw_base.datax.plugins.writer.writer import Writer
  6. # hdfs writer
  7. HDFS_WRITER_NAME = 'hdfswriter'
  8. HDFS_WRITER_PARAMETER_COMPRESS = 'compress'
  9. HDFS_WRITER_PARAMETER_ENCODING = 'encoding'
  10. HDFS_WRITER_PARAMETER_FIELD_DELIMITER = 'fieldDelimiter'
  11. HDFS_WRITER_PARAMETER_FILE_TYPE = 'fileType'
  12. HDFS_WRITER_PARAMETER_PATH = 'path'
  13. HDFS_WRITER_PARAMETER_WRITE_MODE = 'writeMode'
  14. HDFS_WRITER_PARAMETER_FILE_NAME = 'fileName'
  15. class HDFSWriter(Writer):
  16. """
  17. HDFSWriter有4个时间字段:
  18. 1. start_date:表示数据的起始日期,从Reader处传递而来,全量时传递19700101即可
  19. 2. stop_date:表示数据的终止日期,从Reader传递而来
  20. 3. dt:表示分区日期,值为stop_date - 1 day
  21. 4. biz_date:表示文件的前缀,当start_date + 1 day = stop_date时,值为start_date,否则值为${start_date}-${stop_date - 1 day}
  22. """
  23. def __init__(self, base_dir: str, config_parser: ConfigParser, start_date: str = None, stop_date: str = None):
  24. super(HDFSWriter, self).__init__(base_dir, config_parser, start_date, stop_date)
  25. self.plugin_name = HDFS_WRITER_NAME
  26. def load_others(self):
  27. path = self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_PATH)
  28. self.check_config(HDFS_WRITER_PARAMETER_PATH, path)
  29. if path.__contains__('${dt}'):
  30. stop_at = datetime.strptime(self.stop_date, '%Y%m%d')
  31. dt = (stop_at - timedelta(days=1)).strftime('%Y%m%d')
  32. path = path.replace('${dt}', dt)
  33. self.parameter[HDFS_WRITER_PARAMETER_PATH] = path
  34. self.parameter[HDFS_WRITER_PARAMETER_FILE_TYPE] = \
  35. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FILE_TYPE) or 'text'
  36. if self.config_parser.has_option(self.plugin_type, HDFS_WRITER_PARAMETER_COMPRESS):
  37. self.parameter[HDFS_WRITER_PARAMETER_COMPRESS] = self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_COMPRESS)
  38. self.parameter[HDFS_WRITER_PARAMETER_ENCODING] = \
  39. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_ENCODING) or 'UTF-8'
  40. self.parameter[HDFS_WRITER_PARAMETER_WRITE_MODE] = \
  41. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_WRITE_MODE) or 'append'
  42. self.parameter[HDFS_WRITER_PARAMETER_FIELD_DELIMITER] = \
  43. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FIELD_DELIMITER).replace("\\t", "\t") or '\t'
  44. self.get_file_name()
  45. def get_file_name(self):
  46. start_at = datetime.strptime(self.start_date, '%Y%m%d')
  47. stop_at = datetime.strptime(self.stop_date, '%Y%m%d')
  48. biz_date = self.start_date
  49. if stop_at - timedelta(days=1) > start_at:
  50. biz_date = f'{self.start_date}-{(stop_at - timedelta(days=1)).strftime("%Y%m%d")}'
  51. self.parameter[HDFS_WRITER_PARAMETER_FILE_NAME] = \
  52. self.config_parser.get(self.plugin_type, HDFS_WRITER_PARAMETER_FILE_NAME).replace('${biz_date}', biz_date)
  53. @staticmethod
  54. def generate_definition(hdfs_ds_name: str, hdfs_path: str,
  55. hive_database: str, hive_table_name: str, partitioned: bool,
  56. column_names: List[str], column_types: Dict[str, str]) -> str:
  57. if partitioned:
  58. # 分区表
  59. path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}/dt=%s' % '${dt}'
  60. else:
  61. # 非分区表
  62. path = f'{hdfs_path}/{hive_database}.db/{hive_table_name}'
  63. column = []
  64. column_type = []
  65. for col_name in column_names:
  66. column.append(col_name)
  67. if column_types.__contains__(col_name):
  68. column_type.append(f'{col_name}:{column_types.get(col_name)}')
  69. column_type = ','.join(column_type)
  70. definition = [
  71. '[writer]',
  72. 'dataSource = %s' % hdfs_ds_name,
  73. f'path = {path}',
  74. f'column = {",".join(column)}',
  75. f'columnType = {column_type}',
  76. 'fileType = orc',
  77. # 'fileName = ${biz_date}',
  78. f'fileName = {hive_table_name}',
  79. 'compress = NONE',
  80. 'encoding = utf-8',
  81. ';writeMode支持append、nonConflict和truncate',
  82. 'writeMode = truncate',
  83. r'fieldDelimiter = \t'
  84. ]
  85. return '\n'.join(definition)