data_source_factory.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435
  1. # -*- coding:utf-8 -*-
  2. from dw_base.datax.datasources.data_source import DataSource
  3. from dw_base.datax.datasources.elasticseach_data_source import DS_TYPE_ELASTICSEARCH, ElasticsearchDataSource
  4. from dw_base.datax.datasources.hbase_data_source import DS_TYPE_HBASE, HBaseDataSource
  5. from dw_base.datax.datasources.hdfs_data_source import DS_TYPE_HDFS, HDFSDataSource
  6. from dw_base.datax.datasources.kafka_data_source import KafkaDataSource, DS_TYPE_KAFKA
  7. from dw_base.datax.datasources.mongo_data_source import DS_TYPE_MONGO, MongoDataSource
  8. from dw_base.datax.datasources.mysql_data_source import DS_TYPE_MYSQL, MySQLDataSource
  9. from dw_base.datax.datasources.postgresql_data_source import DS_TYPE_POSTGRE_SQL, PostgreSQLDataSource
  10. from dw_base.datax.datasources.clickhouse_data_source import DS_TYPE_CLICK_HOUSE, ClickHouseDataSource
  11. class DataSourceFactory:
  12. @staticmethod
  13. def get_data_source(ds_type: str, ds_file_path: str) -> DataSource:
  14. if ds_type == DS_TYPE_ELASTICSEARCH:
  15. return ElasticsearchDataSource(ds_file_path)
  16. elif ds_type == DS_TYPE_HBASE:
  17. return HBaseDataSource(ds_file_path)
  18. elif ds_type == DS_TYPE_HDFS:
  19. return HDFSDataSource(ds_file_path)
  20. elif ds_type == DS_TYPE_KAFKA:
  21. return KafkaDataSource(ds_file_path)
  22. elif ds_type == DS_TYPE_MONGO:
  23. return MongoDataSource(ds_file_path)
  24. elif ds_type == DS_TYPE_MYSQL:
  25. return MySQLDataSource(ds_file_path)
  26. elif ds_type == DS_TYPE_POSTGRE_SQL:
  27. return PostgreSQLDataSource(ds_file_path)
  28. elif ds_type == DS_TYPE_CLICK_HOUSE:
  29. return ClickHouseDataSource(ds_file_path)
  30. else:
  31. raise ValueError('DataSource type %s defined in %s is not supported yet' % (ds_type, ds_file_path))