| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- from minio import Minio
- import os
- MINIO_ENDPOINT = "192.168.77.249:9000"
- MINIO_ACCESS_KEY = "pZEwCGnpNN05KPnmC2Yh"
- MINIO_SECRET_KEY = "KfJRuWiv9pVxhIMcFqbkv8hZT9SnNTZ6LPx592D4" # 替换为你的 Secret Key
- MINIO_SECURE = False # 是否使用 https
- MINIO_BUCKET = "grading"
- MINIO_BASE_PREFIX = "monitor_video_data"
- DATA_HOST_URL = f"http://{MINIO_ENDPOINT}/{MINIO_BUCKET}/{MINIO_BASE_PREFIX}"
- minio_client = Minio(
- MINIO_ENDPOINT,
- access_key=MINIO_ACCESS_KEY,
- secret_key=MINIO_SECRET_KEY,
- secure=MINIO_SECURE
- )
- def get_full_url(path: str) -> str:
- """将相对路径转换为可以直接打开的 MinIO 绝对 URL"""
- if not path:
- return path
- if str(path).startswith("http"):
- return path
- # 移除开头的斜杠防止双斜杠 (如: /Data/xxx -> Data/xxx)
- clean_path = str(path).lstrip("/\\")
- return f"{DATA_HOST_URL}/{clean_path}"
- print(get_full_url("rsp_test.jpg"))
- try:
- # 2. 检查存储桶是否存在
- found = minio_client.bucket_exists(MINIO_BUCKET)
- if not found:
- minio_client.make_bucket(MINIO_BUCKET)
- print(f"存储桶 {MINIO_BUCKET} 已创建")
- else:
- print(f"存储桶 {MINIO_BUCKET} 已存在")
- object_name = "img_test.jpg"
- img_path = "./31_front_coaxial_1_0.jpg"
- cloud_path = os.path.join(MINIO_BASE_PREFIX, object_name).replace("\\", "/")
- minio_client.fput_object(MINIO_BUCKET, cloud_path, img_path, content_type="image/jpeg")
- print(f"成功上传 {object_name} 到 {MINIO_BUCKET}")
- except Exception as e:
- print(f"发生错误: {e}")
|