| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- import sys
- import os
- import re
- abspath = os.path.abspath(__file__)
- root_path = re.sub(r"tendata-warehouse.*", "tendata-warehouse", abspath)
- sys.path.append(root_path)
- import subprocess
- from datetime import datetime
- from dw_base.scheduler.mg2es.path_util import PathUtil
- class GitHelper:
- # def git_pull(self, working_dir):
- # subprocess.run(["git", "pull"], cwd=working_dir, check=True)
- def git_pull(self, working_dir):
- """
- 从远程仓库拉取最新的更改。
- 参数:
- working_dir (str): Git 仓库的目录。
- 异常:
- subprocess.CalledProcessError: 如果 git pull 命令失败。
- FileNotFoundError: 如果工作目录不存在。
- """
- # 检查工作目录是否存在
- if not os.path.exists(working_dir):
- raise FileNotFoundError(f"指定的目录不存在: {working_dir}")
- try:
- subprocess.run(["git", "pull"], cwd=working_dir, check=True)
- print("成功拉取最新的更改。")
- except subprocess.CalledProcessError as e:
- print(f"git pull 过程中出错: {e}")
- def git_pull_etlconfig(self):
- root_path = PathUtil.get_project_root_path()
- # 调用函数并指定文件路径和工作目录
- working_dir = root_path + '/../mongo2es-customs'
- print(f'【git pull】 working_dir: {working_dir}')
- self.git_pull(working_dir)
- def get_last_commit_date(self, file_path, working_dir):
- try:
- # 构建Git命令
- git_command = ["git", "log", "-1", "--format=%cd", "--", file_path]
- # 执行命令并获取输出
- output = subprocess.check_output(git_command, cwd=working_dir, stderr=subprocess.STDOUT,
- universal_newlines=True)
- date_object = datetime.strptime(output.strip(), '%a %b %d %H:%M:%S %Y %z')
- # 格式化日期时间字符串
- formatted_date = date_object.strftime('%Y-%m-%d %H:%M:%S')
- print("最近一次提交的日期为: "+formatted_date)
- # 返回输出(即最近一次提交的日期)
- return formatted_date
- except subprocess.CalledProcessError as e:
- # 如果命令执行失败,输出错误信息
- print("Error:", e.output)
- return None
- def get_etlconfig_last_uptime(self, catalog, database_name):
- root_path = PathUtil.get_project_root_path()
- # 调用函数并指定文件路径和工作目录
- es_json_path,mg2es_mapping_path = PathUtil.get_conf_path(catalog,database_name)
- working_dir = root_path + '/../mongo2es-customs'
- es_json_date = self.get_last_commit_date(es_json_path, working_dir)
- mg2es_mapping_date = self.get_last_commit_date(mg2es_mapping_path, working_dir)
- if es_json_date and mg2es_mapping_date:
- last_commit_date = max(es_json_date, mg2es_mapping_date)
- print("最近一次提交的日期:", last_commit_date)
- return last_commit_date
- else:
- print("获取最近一次提交的日期时出错。")
- return None
- if __name__ == '__main__':
- git_helper = GitHelper()
- # last_commit_date = git_helper.get_etlconfig_last_uptime('exports', 'kazakhstan')
- git_helper.git_pull_etlconfig()
|