import sys import os import re abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) import json from configparser import ConfigParser import yaml from dw_base.scheduler.mg2es.path_util import PathUtil class ConfReader(): def get_yml_data(self, yml_file_path): with open(yml_file_path, 'r') as file: data = yaml.safe_load(file) return data def get_json_data(self, json_file_path): with open(json_file_path) as f: data = json.load(f) return data def get_es_conf(self): path = PathUtil.get_es_conn_path() print(path) config_parser = ConfigParser() config_parser.read(path) host = config_parser.get('base', 'host') port = int(config_parser.get('base', 'port')) return host, port def get_redis_conf(self): path = PathUtil.get_redis_conn_path() print(path) config_parser = ConfigParser() config_parser.read(path) host = config_parser.get('base', 'host') port = int(config_parser.get('base', 'port')) db = int(config_parser.get('base', 'db')) password = config_parser.get('base', 'password') # 将空字符串密码转换为 None password = password if password != '' else None return host, port,db, password if __name__ == '__main__': cf = ConfReader() print(cf.get_es_conf()) print(cf.get_redis_conf())