from minio import Minio import os MINIO_ENDPOINT = "192.168.77.249:9000" MINIO_ACCESS_KEY = "pZEwCGnpNN05KPnmC2Yh" MINIO_SECRET_KEY = "KfJRuWiv9pVxhIMcFqbkv8hZT9SnNTZ6LPx592D4" # 替换为你的 Secret Key MINIO_SECURE = False # 是否使用 https MINIO_BUCKET = "grading" MINIO_BASE_PREFIX = "monitor_video_data" DATA_HOST_URL = f"http://{MINIO_ENDPOINT}/{MINIO_BUCKET}/{MINIO_BASE_PREFIX}" minio_client = Minio( MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=MINIO_SECURE ) def get_full_url(path: str) -> str: """将相对路径转换为可以直接打开的 MinIO 绝对 URL""" if not path: return path if str(path).startswith("http"): return path # 移除开头的斜杠防止双斜杠 (如: /Data/xxx -> Data/xxx) clean_path = str(path).lstrip("/\\") return f"{DATA_HOST_URL}/{clean_path}" print(get_full_url("rsp_test.jpg")) try: # 2. 检查存储桶是否存在 found = minio_client.bucket_exists(MINIO_BUCKET) if not found: minio_client.make_bucket(MINIO_BUCKET) print(f"存储桶 {MINIO_BUCKET} 已创建") else: print(f"存储桶 {MINIO_BUCKET} 已存在") object_name = "img_test.jpg" img_path = "./31_front_coaxial_1_0.jpg" cloud_path = os.path.join(MINIO_BASE_PREFIX, object_name).replace("\\", "/") minio_client.fput_object(MINIO_BUCKET, cloud_path, img_path, content_type="image/jpeg") print(f"成功上传 {object_name} 到 {MINIO_BUCKET}") except Exception as e: print(f"发生错误: {e}")