# -*- coding:utf-8 -*- from dw_base.datax.datasources.data_source import DataSource from dw_base.datax.datax_constants import DS_MYSQL_JDBC_URL # MySQL Data Source DS_TYPE_MYSQL = 'mysql' DS_MYSQL_KEYS = [DS_MYSQL_JDBC_URL, 'username', 'password'] class MySQLDataSource(DataSource): def __init__(self, ds_file: str): super(MySQLDataSource, self).__init__(ds_file) self.source_type = DS_TYPE_MYSQL self.keys = DS_MYSQL_KEYS @staticmethod def generate_definition(host: str, port: int, username: str, password: str, database: str) -> str: lines = [ '[base]', 'jdbcUrl=jdbc:mysql://%s:%s/%s' % (host, str(port), database), 'username=%s' % username, 'password=%s' % password ] return '\n'.join(lines)