# -*- coding:utf-8 -*- from dw_base.datax.datasources.data_source import DataSource from dw_base.datax.datasources.elasticseach_data_source import DS_TYPE_ELASTICSEARCH, ElasticsearchDataSource from dw_base.datax.datasources.hbase_data_source import DS_TYPE_HBASE, HBaseDataSource from dw_base.datax.datasources.hdfs_data_source import DS_TYPE_HDFS, HDFSDataSource from dw_base.datax.datasources.kafka_data_source import KafkaDataSource, DS_TYPE_KAFKA from dw_base.datax.datasources.mongo_data_source import DS_TYPE_MONGO, MongoDataSource from dw_base.datax.datasources.mysql_data_source import DS_TYPE_MYSQL, MySQLDataSource from dw_base.datax.datasources.postgresql_data_source import DS_TYPE_POSTGRE_SQL, PostgreSQLDataSource from dw_base.datax.datasources.clickhouse_data_source import DS_TYPE_CLICK_HOUSE, ClickHouseDataSource class DataSourceFactory: @staticmethod def get_data_source(ds_type: str, ds_file_path: str) -> DataSource: if ds_type == DS_TYPE_ELASTICSEARCH: return ElasticsearchDataSource(ds_file_path) elif ds_type == DS_TYPE_HBASE: return HBaseDataSource(ds_file_path) elif ds_type == DS_TYPE_HDFS: return HDFSDataSource(ds_file_path) elif ds_type == DS_TYPE_KAFKA: return KafkaDataSource(ds_file_path) elif ds_type == DS_TYPE_MONGO: return MongoDataSource(ds_file_path) elif ds_type == DS_TYPE_MYSQL: return MySQLDataSource(ds_file_path) elif ds_type == DS_TYPE_POSTGRE_SQL: return PostgreSQLDataSource(ds_file_path) elif ds_type == DS_TYPE_CLICK_HOUSE: return ClickHouseDataSource(ds_file_path) else: raise ValueError('DataSource type %s defined in %s is not supported yet' % (ds_type, ds_file_path))