import sys import os import re abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) sys.path.append(root_path) import os import re class PathUtil(): @staticmethod def get_project_root_path(): abspath = os.path.abspath(__file__) root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath) return root_path @staticmethod def get_conf_path(catalog,database_name): es_json_path = f"customs/{catalog}/{database_name}/customs_{catalog}_{database_name}-es7.json" mg2es_mapping_path = f"customs/{catalog}/{database_name}/settings.yml" return es_json_path,mg2es_mapping_path @staticmethod def get_conf_abspath(catalog, database_name): working_dir = PathUtil.get_project_root_path() + '/../mongo2es-customs' es_json_path = f"{working_dir}/customs/{catalog}/{database_name}/customs_{catalog}_{database_name}-es7.json" mg2es_mapping_path = f"{working_dir}/customs/{catalog}/{database_name}/settings.yml" return es_json_path, mg2es_mapping_path @staticmethod def get_es_conn_path(): es_conf_path = PathUtil.get_project_root_path() + '/../datasource/elasticsearch/es-prod-cts.ini' return es_conf_path @staticmethod def get_redis_conn_path(): es_conf_path = PathUtil.get_project_root_path() + '/../datasource/redis/redis-prod-cts.ini' return es_conf_path