minio测试.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. from minio import Minio
  2. import os
  3. MINIO_ENDPOINT = "192.168.77.249:9000"
  4. MINIO_ACCESS_KEY = "pZEwCGnpNN05KPnmC2Yh"
  5. MINIO_SECRET_KEY = "KfJRuWiv9pVxhIMcFqbkv8hZT9SnNTZ6LPx592D4" # 替换为你的 Secret Key
  6. MINIO_SECURE = False # 是否使用 https
  7. MINIO_BUCKET = "grading"
  8. MINIO_BASE_PREFIX = "raspi_img_data"
  9. DATA_HOST_URL = f"http://{MINIO_ENDPOINT}/{MINIO_BUCKET}/{MINIO_BASE_PREFIX}"
  10. minio_client = Minio(
  11. MINIO_ENDPOINT,
  12. access_key=MINIO_ACCESS_KEY,
  13. secret_key=MINIO_SECRET_KEY,
  14. secure=MINIO_SECURE
  15. )
  16. def get_full_url(path: str) -> str:
  17. """将相对路径转换为可以直接打开的 MinIO 绝对 URL"""
  18. if not path:
  19. return path
  20. if str(path).startswith("http"):
  21. return path
  22. # 移除开头的斜杠防止双斜杠 (如: /Data/xxx -> Data/xxx)
  23. clean_path = str(path).lstrip("/\\")
  24. return f"{DATA_HOST_URL}/{clean_path}"
  25. print(get_full_url("rsp_test.jpg"))
  26. try:
  27. # 2. 检查存储桶是否存在
  28. found = minio_client.bucket_exists(MINIO_BUCKET)
  29. if not found:
  30. minio_client.make_bucket(MINIO_BUCKET)
  31. print(f"存储桶 {MINIO_BUCKET} 已创建")
  32. else:
  33. print(f"存储桶 {MINIO_BUCKET} 已存在")
  34. object_name = "img_test.jpg"
  35. cloud_path = os.path.join(MINIO_BASE_PREFIX, object_name).replace("\\", "/")
  36. minio_client.fput_object(MINIO_BUCKET, cloud_path, img_path, content_type="image/jpeg")
  37. print(f"成功上传 {object_name} 到 {MINIO_BUCKET}")
  38. except Exception as e:
  39. print(f"发生错误: {e}")