| 1234567891011121314151617181920212223242526272829303132333435 |
- # -*- coding:utf-8 -*-
- from dw_base.datax.datasources.data_source import DataSource
- from dw_base.datax.datasources.elasticseach_data_source import DS_TYPE_ELASTICSEARCH, ElasticsearchDataSource
- from dw_base.datax.datasources.hbase_data_source import DS_TYPE_HBASE, HBaseDataSource
- from dw_base.datax.datasources.hdfs_data_source import DS_TYPE_HDFS, HDFSDataSource
- from dw_base.datax.datasources.kafka_data_source import KafkaDataSource, DS_TYPE_KAFKA
- from dw_base.datax.datasources.mongo_data_source import DS_TYPE_MONGO, MongoDataSource
- from dw_base.datax.datasources.mysql_data_source import DS_TYPE_MYSQL, MySQLDataSource
- from dw_base.datax.datasources.postgresql_data_source import DS_TYPE_POSTGRE_SQL, PostgreSQLDataSource
- from dw_base.datax.datasources.clickhouse_data_source import DS_TYPE_CLICK_HOUSE, ClickHouseDataSource
- class DataSourceFactory:
- @staticmethod
- def get_data_source(ds_type: str, ds_file_path: str) -> DataSource:
- if ds_type == DS_TYPE_ELASTICSEARCH:
- return ElasticsearchDataSource(ds_file_path)
- elif ds_type == DS_TYPE_HBASE:
- return HBaseDataSource(ds_file_path)
- elif ds_type == DS_TYPE_HDFS:
- return HDFSDataSource(ds_file_path)
- elif ds_type == DS_TYPE_KAFKA:
- return KafkaDataSource(ds_file_path)
- elif ds_type == DS_TYPE_MONGO:
- return MongoDataSource(ds_file_path)
- elif ds_type == DS_TYPE_MYSQL:
- return MySQLDataSource(ds_file_path)
- elif ds_type == DS_TYPE_POSTGRE_SQL:
- return PostgreSQLDataSource(ds_file_path)
- elif ds_type == DS_TYPE_CLICK_HOUSE:
- return ClickHouseDataSource(ds_file_path)
- else:
- raise ValueError('DataSource type %s defined in %s is not supported yet' % (ds_type, ds_file_path))
|