minio测试.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. from minio import Minio
  2. import os
  3. MINIO_ENDPOINT = "192.168.77.249:9000"
  4. MINIO_ACCESS_KEY = "pZEwCGnpNN05KPnmC2Yh"
  5. MINIO_SECRET_KEY = "KfJRuWiv9pVxhIMcFqbkv8hZT9SnNTZ6LPx592D4" # 替换为你的 Secret Key
  6. MINIO_SECURE = False # 是否使用 https
  7. MINIO_BUCKET = "grading"
  8. MINIO_BASE_PREFIX = "raspi_img_data"
  9. DATA_HOST_URL = f"http://{MINIO_ENDPOINT}/{MINIO_BUCKET}/{MINIO_BASE_PREFIX}"
  10. minio_client = Minio(
  11. MINIO_ENDPOINT,
  12. access_key=MINIO_ACCESS_KEY,
  13. secret_key=MINIO_SECRET_KEY,
  14. secure=MINIO_SECURE
  15. )
  16. def get_full_url(path: str) -> str:
  17. """将相对路径转换为可以直接打开的 MinIO 绝对 URL"""
  18. if not path:
  19. return path
  20. if str(path).startswith("http"):
  21. return path
  22. # 移除开头的斜杠防止双斜杠 (如: /Data/xxx -> Data/xxx)
  23. clean_path = str(path).lstrip("/\\")
  24. return f"{DATA_HOST_URL}/{clean_path}"
  25. print(get_full_url("rsp_test.jpg"))
  26. try:
  27. # 2. 检查存储桶是否存在
  28. found = minio_client.bucket_exists(MINIO_BUCKET)
  29. if not found:
  30. minio_client.make_bucket(MINIO_BUCKET)
  31. print(f"存储桶 {MINIO_BUCKET} 已创建")
  32. else:
  33. print(f"存储桶 {MINIO_BUCKET} 已存在")
  34. object_name = "img_test.jpg"
  35. img_path = "./img.jpg"
  36. cloud_path = os.path.join(MINIO_BASE_PREFIX, object_name).replace("\\", "/")
  37. minio_client.fput_object(MINIO_BUCKET, cloud_path, img_path, content_type="image/jpeg")
  38. print(f"成功上传 {object_name} 到 {MINIO_BUCKET}")
  39. except Exception as e:
  40. print(f"发生错误: {e}")