plugin_factory.py 4.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. # -*- coding:utf-8 -*-
  2. from dw_base.datax.datasources.clickhouse_data_source import DS_TYPE_CLICK_HOUSE
  3. from dw_base.datax.datasources.data_source import *
  4. from dw_base.datax.datasources.elasticseach_data_source import DS_TYPE_ELASTICSEARCH
  5. from dw_base.datax.datasources.hbase_data_source import DS_TYPE_HBASE
  6. from dw_base.datax.datasources.hdfs_data_source import DS_TYPE_HDFS
  7. from dw_base.datax.datasources.kafka_data_source import DS_TYPE_KAFKA
  8. from dw_base.datax.datasources.mongo_data_source import DS_TYPE_MONGO
  9. from dw_base.datax.datasources.mysql_data_source import DS_TYPE_MYSQL
  10. from dw_base.datax.datasources.postgresql_data_source import DS_TYPE_POSTGRE_SQL
  11. from dw_base.datax.datax_constants import *
  12. from dw_base.datax.plugins.plugin import Plugin
  13. from dw_base.datax.plugins.reader.clickhouse_reader import ClickHouseReader
  14. from dw_base.datax.plugins.reader.hdfs_reader import HDFSReader
  15. from dw_base.datax.plugins.reader.mongo_reader import MongoReader
  16. from dw_base.datax.plugins.reader.mysql_reader import MySQLReader
  17. from dw_base.datax.plugins.reader.postgresql_reader import PostgreSQLReader
  18. from dw_base.datax.plugins.writer.clickhouse_writer import ClickHouseWriter
  19. from dw_base.datax.plugins.writer.elasticsearch_writer import ElasticsearchWriter
  20. from dw_base.datax.plugins.writer.hbase_writer import HBaseWriter
  21. from dw_base.datax.plugins.writer.hdfs_writer import HDFSWriter
  22. from dw_base.datax.plugins.writer.kafka_writer import KafkaWriter
  23. from dw_base.datax.plugins.writer.mongo_writer import MongoWriter
  24. from dw_base.datax.plugins.writer.mysql_writer import MySQLWriter
  25. from dw_base.datax.plugins.writer.postgresql_writer import PostgreSQLWriter
  26. class PluginFactory:
  27. @staticmethod
  28. def get_plugin(plugin_type: str, base_dir: str, config_parser: ConfigParser, start_time: str,
  29. stop_time: str) -> Plugin:
  30. ds_file = config_parser.get(plugin_type, GEN_CONFIG_KEY_DATA_SOURCE)
  31. # ds_file 形如 {db_type}/{env}-{实例简称},首段即 db_type
  32. ds_type = ds_file.split('/')[0]
  33. if plugin_type == JOB_CONTENT_N_READER:
  34. if ds_type == DS_TYPE_HDFS:
  35. plugin = HDFSReader(base_dir, config_parser, start_time, stop_time)
  36. elif ds_type == DS_TYPE_MONGO:
  37. plugin = MongoReader(base_dir, config_parser, start_time, stop_time)
  38. elif ds_type == DS_TYPE_MYSQL:
  39. plugin = MySQLReader(base_dir, config_parser, start_time, stop_time)
  40. elif ds_type == DS_TYPE_POSTGRE_SQL:
  41. plugin = PostgreSQLReader(base_dir, config_parser, start_time, stop_time)
  42. elif ds_type == DS_TYPE_CLICK_HOUSE:
  43. plugin = ClickHouseReader(base_dir, config_parser, start_time, stop_time)
  44. else:
  45. raise ValueError('DataSource type %s of reader defined in %s is not supported yet' % (ds_type, ds_file))
  46. elif plugin_type == JOB_CONTENT_N_WRITER:
  47. if ds_type == DS_TYPE_ELASTICSEARCH:
  48. plugin = ElasticsearchWriter(base_dir, config_parser, start_time, stop_time)
  49. elif ds_type == DS_TYPE_HBASE:
  50. plugin = HBaseWriter(base_dir, config_parser, start_time, stop_time)
  51. elif ds_type == DS_TYPE_HDFS:
  52. plugin = HDFSWriter(base_dir, config_parser, start_time, stop_time)
  53. elif ds_type == DS_TYPE_KAFKA:
  54. plugin = KafkaWriter(base_dir, config_parser, start_time, stop_time)
  55. elif ds_type == DS_TYPE_MONGO:
  56. plugin = MongoWriter(base_dir, config_parser, start_time, stop_time)
  57. elif ds_type == DS_TYPE_MYSQL:
  58. plugin = MySQLWriter(base_dir, config_parser, start_time, stop_time)
  59. elif ds_type == DS_TYPE_POSTGRE_SQL:
  60. plugin = PostgreSQLWriter(base_dir, config_parser, start_time, stop_time)
  61. elif ds_type == DS_TYPE_ELASTICSEARCH:
  62. plugin = ElasticsearchWriter(base_dir, config_parser, start_time, stop_time)
  63. elif ds_type == DS_TYPE_CLICK_HOUSE:
  64. plugin = ClickHouseWriter(base_dir, config_parser, start_time, stop_time)
  65. else:
  66. raise ValueError('DataSource type %s of writer defined in %s is not supported yet' % (ds_type, ds_file))
  67. else:
  68. raise ValueError('Unsupported plugin type %s' % plugin_type)
  69. return plugin