| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- # -*- coding:utf-8 -*-
- from dw_base.datax.datasources.clickhouse_data_source import DS_TYPE_CLICK_HOUSE
- from dw_base.datax.datasources.data_source import *
- from dw_base.datax.datasources.elasticseach_data_source import DS_TYPE_ELASTICSEARCH
- from dw_base.datax.datasources.hbase_data_source import DS_TYPE_HBASE
- from dw_base.datax.datasources.hdfs_data_source import DS_TYPE_HDFS
- from dw_base.datax.datasources.kafka_data_source import DS_TYPE_KAFKA
- from dw_base.datax.datasources.mongo_data_source import DS_TYPE_MONGO
- from dw_base.datax.datasources.mysql_data_source import DS_TYPE_MYSQL
- from dw_base.datax.datasources.postgresql_data_source import DS_TYPE_POSTGRE_SQL
- from dw_base.datax.datax_constants import *
- from dw_base.datax.plugins.plugin import Plugin
- from dw_base.datax.plugins.reader.clickhouse_reader import ClickHouseReader
- from dw_base.datax.plugins.reader.hdfs_reader import HDFSReader
- from dw_base.datax.plugins.reader.mongo_reader import MongoReader
- from dw_base.datax.plugins.reader.mysql_reader import MySQLReader
- from dw_base.datax.plugins.reader.postgresql_reader import PostgreSQLReader
- from dw_base.datax.plugins.writer.clickhouse_writer import ClickHouseWriter
- from dw_base.datax.plugins.writer.elasticsearch_writer import ElasticsearchWriter
- from dw_base.datax.plugins.writer.hbase_writer import HBaseWriter
- from dw_base.datax.plugins.writer.hdfs_writer import HDFSWriter
- from dw_base.datax.plugins.writer.kafka_writer import KafkaWriter
- from dw_base.datax.plugins.writer.mongo_writer import MongoWriter
- from dw_base.datax.plugins.writer.mysql_writer import MySQLWriter
- from dw_base.datax.plugins.writer.postgresql_writer import PostgreSQLWriter
- class PluginFactory:
- @staticmethod
- def get_plugin(plugin_type: str, base_dir: str, config_parser: ConfigParser, start_time: str,
- stop_time: str) -> Plugin:
- ds_file = config_parser.get(plugin_type, GEN_CONFIG_KEY_DATA_SOURCE)
- ds_type = ds_file.split('/')[-1].split('-')[0]
- if plugin_type == JOB_CONTENT_N_READER:
- if ds_type == DS_TYPE_HDFS:
- plugin = HDFSReader(base_dir, config_parser, start_time, stop_time)
- elif ds_type == DS_TYPE_MONGO:
- plugin = MongoReader(base_dir, config_parser, start_time, stop_time)
- elif ds_type == DS_TYPE_MYSQL:
- plugin = MySQLReader(base_dir, config_parser, start_time, stop_time)
- elif ds_type == DS_TYPE_POSTGRE_SQL:
- plugin = PostgreSQLReader(base_dir, config_parser, start_time, stop_time)
- elif ds_type == DS_TYPE_CLICK_HOUSE:
- plugin = ClickHouseReader(base_dir, config_parser, start_time, stop_time)
- else:
- raise ValueError('DataSource type %s of reader defined in %s is not supported yet' % (ds_type, ds_file))
- elif plugin_type == JOB_CONTENT_N_WRITER:
- if ds_type == DS_TYPE_ELASTICSEARCH:
- plugin = ElasticsearchWriter(base_dir, config_parser, start_time, stop_time)
- elif ds_type == DS_TYPE_HBASE:
- plugin = HBaseWriter(base_dir, config_parser, start_time, stop_time)
- elif ds_type == DS_TYPE_HDFS:
- plugin = HDFSWriter(base_dir, config_parser, start_time, stop_time)
- elif ds_type == DS_TYPE_KAFKA:
- plugin = KafkaWriter(base_dir, config_parser, start_time, stop_time)
- elif ds_type == DS_TYPE_MONGO:
- plugin = MongoWriter(base_dir, config_parser, start_time, stop_time)
- elif ds_type == DS_TYPE_MYSQL:
- plugin = MySQLWriter(base_dir, config_parser, start_time, stop_time)
- elif ds_type == DS_TYPE_POSTGRE_SQL:
- plugin = PostgreSQLWriter(base_dir, config_parser, start_time, stop_time)
- elif ds_type == DS_TYPE_ELASTICSEARCH:
- plugin = ElasticsearchWriter(base_dir, config_parser, start_time, stop_time)
- elif ds_type == DS_TYPE_CLICK_HOUSE:
- plugin = ClickHouseWriter(base_dir, config_parser, start_time, stop_time)
- else:
- raise ValueError('DataSource type %s of writer defined in %s is not supported yet' % (ds_type, ds_file))
- else:
- raise ValueError('Unsupported plugin type %s' % plugin_type)
- return plugin
|