# -*- coding:utf-8 -*- from dw_base.datax.datasources.clickhouse_data_source import DS_TYPE_CLICK_HOUSE from dw_base.datax.datasources.data_source import * from dw_base.datax.datasources.elasticseach_data_source import DS_TYPE_ELASTICSEARCH from dw_base.datax.datasources.hbase_data_source import DS_TYPE_HBASE from dw_base.datax.datasources.hdfs_data_source import DS_TYPE_HDFS from dw_base.datax.datasources.kafka_data_source import DS_TYPE_KAFKA from dw_base.datax.datasources.mongo_data_source import DS_TYPE_MONGO from dw_base.datax.datasources.mysql_data_source import DS_TYPE_MYSQL from dw_base.datax.datasources.postgresql_data_source import DS_TYPE_POSTGRE_SQL from dw_base.datax.datax_constants import * from dw_base.datax.plugins.plugin import Plugin from dw_base.datax.plugins.reader.clickhouse_reader import ClickHouseReader from dw_base.datax.plugins.reader.hdfs_reader import HDFSReader from dw_base.datax.plugins.reader.mongo_reader import MongoReader from dw_base.datax.plugins.reader.mysql_reader import MySQLReader from dw_base.datax.plugins.reader.postgresql_reader import PostgreSQLReader from dw_base.datax.plugins.writer.clickhouse_writer import ClickHouseWriter from dw_base.datax.plugins.writer.elasticsearch_writer import ElasticsearchWriter from dw_base.datax.plugins.writer.hbase_writer import HBaseWriter from dw_base.datax.plugins.writer.hdfs_writer import HDFSWriter from dw_base.datax.plugins.writer.kafka_writer import KafkaWriter from dw_base.datax.plugins.writer.mongo_writer import MongoWriter from dw_base.datax.plugins.writer.mysql_writer import MySQLWriter from dw_base.datax.plugins.writer.postgresql_writer import PostgreSQLWriter class PluginFactory: @staticmethod def get_plugin(plugin_type: str, base_dir: str, config_parser: ConfigParser, start_time: str, stop_time: str) -> Plugin: ds_file = config_parser.get(plugin_type, GEN_CONFIG_KEY_DATA_SOURCE) ds_type = ds_file.split('/')[-1].split('-')[0] if plugin_type == JOB_CONTENT_N_READER: if ds_type == DS_TYPE_HDFS: plugin = HDFSReader(base_dir, config_parser, start_time, stop_time) elif ds_type == DS_TYPE_MONGO: plugin = MongoReader(base_dir, config_parser, start_time, stop_time) elif ds_type == DS_TYPE_MYSQL: plugin = MySQLReader(base_dir, config_parser, start_time, stop_time) elif ds_type == DS_TYPE_POSTGRE_SQL: plugin = PostgreSQLReader(base_dir, config_parser, start_time, stop_time) elif ds_type == DS_TYPE_CLICK_HOUSE: plugin = ClickHouseReader(base_dir, config_parser, start_time, stop_time) else: raise ValueError('DataSource type %s of reader defined in %s is not supported yet' % (ds_type, ds_file)) elif plugin_type == JOB_CONTENT_N_WRITER: if ds_type == DS_TYPE_ELASTICSEARCH: plugin = ElasticsearchWriter(base_dir, config_parser, start_time, stop_time) elif ds_type == DS_TYPE_HBASE: plugin = HBaseWriter(base_dir, config_parser, start_time, stop_time) elif ds_type == DS_TYPE_HDFS: plugin = HDFSWriter(base_dir, config_parser, start_time, stop_time) elif ds_type == DS_TYPE_KAFKA: plugin = KafkaWriter(base_dir, config_parser, start_time, stop_time) elif ds_type == DS_TYPE_MONGO: plugin = MongoWriter(base_dir, config_parser, start_time, stop_time) elif ds_type == DS_TYPE_MYSQL: plugin = MySQLWriter(base_dir, config_parser, start_time, stop_time) elif ds_type == DS_TYPE_POSTGRE_SQL: plugin = PostgreSQLWriter(base_dir, config_parser, start_time, stop_time) elif ds_type == DS_TYPE_ELASTICSEARCH: plugin = ElasticsearchWriter(base_dir, config_parser, start_time, stop_time) elif ds_type == DS_TYPE_CLICK_HOUSE: plugin = ClickHouseWriter(base_dir, config_parser, start_time, stop_time) else: raise ValueError('DataSource type %s of writer defined in %s is not supported yet' % (ds_type, ds_file)) else: raise ValueError('Unsupported plugin type %s' % plugin_type) return plugin