plugin_factory.py 4.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. # -*- coding:utf-8 -*-
  2. from dw_base.datax.datasources.clickhouse_data_source import DS_TYPE_CLICK_HOUSE
  3. from dw_base.datax.datasources.data_source import *
  4. from dw_base.datax.datasources.elasticseach_data_source import DS_TYPE_ELASTICSEARCH
  5. from dw_base.datax.datasources.hbase_data_source import DS_TYPE_HBASE
  6. from dw_base.datax.datasources.hdfs_data_source import DS_TYPE_HDFS
  7. from dw_base.datax.datasources.kafka_data_source import DS_TYPE_KAFKA
  8. from dw_base.datax.datasources.mongo_data_source import DS_TYPE_MONGO
  9. from dw_base.datax.datasources.mysql_data_source import DS_TYPE_MYSQL
  10. from dw_base.datax.datasources.postgresql_data_source import DS_TYPE_POSTGRE_SQL
  11. from dw_base.datax.datax_constants import *
  12. from dw_base.datax.plugins.plugin import Plugin
  13. from dw_base.datax.plugins.reader.clickhouse_reader import ClickHouseReader
  14. from dw_base.datax.plugins.reader.hdfs_reader import HDFSReader
  15. from dw_base.datax.plugins.reader.mongo_reader import MongoReader
  16. from dw_base.datax.plugins.reader.mysql_reader import MySQLReader
  17. from dw_base.datax.plugins.reader.postgresql_reader import PostgreSQLReader
  18. from dw_base.datax.plugins.writer.clickhouse_writer import ClickHouseWriter
  19. from dw_base.datax.plugins.writer.elasticsearch_writer import ElasticsearchWriter
  20. from dw_base.datax.plugins.writer.hbase_writer import HBaseWriter
  21. from dw_base.datax.plugins.writer.hdfs_writer import HDFSWriter
  22. from dw_base.datax.plugins.writer.kafka_writer import KafkaWriter
  23. from dw_base.datax.plugins.writer.mongo_writer import MongoWriter
  24. from dw_base.datax.plugins.writer.mysql_writer import MySQLWriter
  25. from dw_base.datax.plugins.writer.postgresql_writer import PostgreSQLWriter
  26. class PluginFactory:
  27. @staticmethod
  28. def get_plugin(plugin_type: str, base_dir: str, config_parser: ConfigParser, start_time: str,
  29. stop_time: str) -> Plugin:
  30. ds_file = config_parser.get(plugin_type, GEN_CONFIG_KEY_DATA_SOURCE)
  31. ds_type = ds_file.split('/')[-1].split('-')[0]
  32. if plugin_type == JOB_CONTENT_N_READER:
  33. if ds_type == DS_TYPE_HDFS:
  34. plugin = HDFSReader(base_dir, config_parser, start_time, stop_time)
  35. elif ds_type == DS_TYPE_MONGO:
  36. plugin = MongoReader(base_dir, config_parser, start_time, stop_time)
  37. elif ds_type == DS_TYPE_MYSQL:
  38. plugin = MySQLReader(base_dir, config_parser, start_time, stop_time)
  39. elif ds_type == DS_TYPE_POSTGRE_SQL:
  40. plugin = PostgreSQLReader(base_dir, config_parser, start_time, stop_time)
  41. elif ds_type == DS_TYPE_CLICK_HOUSE:
  42. plugin = ClickHouseReader(base_dir, config_parser, start_time, stop_time)
  43. else:
  44. raise ValueError('DataSource type %s of reader defined in %s is not supported yet' % (ds_type, ds_file))
  45. elif plugin_type == JOB_CONTENT_N_WRITER:
  46. if ds_type == DS_TYPE_ELASTICSEARCH:
  47. plugin = ElasticsearchWriter(base_dir, config_parser, start_time, stop_time)
  48. elif ds_type == DS_TYPE_HBASE:
  49. plugin = HBaseWriter(base_dir, config_parser, start_time, stop_time)
  50. elif ds_type == DS_TYPE_HDFS:
  51. plugin = HDFSWriter(base_dir, config_parser, start_time, stop_time)
  52. elif ds_type == DS_TYPE_KAFKA:
  53. plugin = KafkaWriter(base_dir, config_parser, start_time, stop_time)
  54. elif ds_type == DS_TYPE_MONGO:
  55. plugin = MongoWriter(base_dir, config_parser, start_time, stop_time)
  56. elif ds_type == DS_TYPE_MYSQL:
  57. plugin = MySQLWriter(base_dir, config_parser, start_time, stop_time)
  58. elif ds_type == DS_TYPE_POSTGRE_SQL:
  59. plugin = PostgreSQLWriter(base_dir, config_parser, start_time, stop_time)
  60. elif ds_type == DS_TYPE_ELASTICSEARCH:
  61. plugin = ElasticsearchWriter(base_dir, config_parser, start_time, stop_time)
  62. elif ds_type == DS_TYPE_CLICK_HOUSE:
  63. plugin = ClickHouseWriter(base_dir, config_parser, start_time, stop_time)
  64. else:
  65. raise ValueError('DataSource type %s of writer defined in %s is not supported yet' % (ds_type, ds_file))
  66. else:
  67. raise ValueError('Unsupported plugin type %s' % plugin_type)
  68. return plugin