| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- import os
- import oss2
- from dw_base.utils.file_utils import get_abs_path
- class Readable:
- def read(self):
- pass
- class OssClient:
- """
- OSS术语
- English | 中文
- Bucket | 存储空间
- Object | 对象或者文件
- Endpoint | OSS 访问域名
- Region | 地域或者数据中心
- AccessKey | AccessKeyId 和 AccessKeySecret 的统称,访问密钥
- Put Object | 简单上传
- Post Object | 表单上传
- Multipart Upload | 分片上传
- Append Object | 追加上传
- Get Object | 简单下载
- | 回调
- Object Meta | 文件元信息。用来描述文件信息,例如长度,类型等
- Data | 文件数据
- Key | 文件名
- ACL (Access Control List)| 存储空间或者文件的权限
- """
- def __init__(self, access_key_id, access_key_secret, endpoint, bucket_name=None):
- """
- 提供操作OSS各种方法
- :param str access_key_id:
- :param str access_key_secret:
- :param str endpoint:
- :rtype: OssClient
- """
- self._access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', access_key_id)
- self._access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', access_key_secret)
- self._endpoint = os.getenv('OSS_TEST_ENDPOINT', endpoint)
- self._auth = oss2.Auth(self._access_key_id, self._access_key_secret)
- self._bucket_name = bucket_name
- if bucket_name:
- self._bucket = oss2.Bucket(self._auth, self._endpoint, bucket_name)
- # 创建一个Service对象
- self._service = oss2.Service(self._auth, self._endpoint)
- def list_buckets(self, prefix='', marker='', max_keys=100, params=None):
- """
- 根据前缀罗列用户的Bucket
- :param str prefix: 只罗列Bucket名为该前缀的Bucket,空串表示罗列所有的Bucket
- :param str marker: 分页标志。首次调用传空串,后续使用返回值中的next_marker
- :param int max_keys: 每次调用最多返回的Bucket数目
- :param dict params: list操作参数,传入'tag-key','tag-value'对结果进行过滤
- :return: 罗列的结果
- :rtype: oss2.models.ListBucketsResult
- """
- list_buckets_result = self._service.list_buckets(prefix, marker, max_keys, params)
- return list_buckets_result
- def get_bucket(self, bucket_name=None):
- """
- 获取bucket
- :param str bucket_name: bucket名称
- :return:
- :rtype: oss2.Bucket
- """
- assert bucket_name or self._bucket_name, 'need bucket name since default bucket is not provided'
- if bucket_name:
- return oss2.Bucket(self._auth, self._endpoint, bucket_name)
- return self._bucket
- def get_bucket_info(self, bucket_name=None):
- """
- 获取bucket相关信息,如创建时间,访问Endpoint,Owner与ACL等。
- :param str bucket_name: bucket名称
- :return:
- :rtype: oss2.models.GetBucketInfoResult
- """
- return self.get_bucket(bucket_name).get_bucket_info()
- def get_bucket_status(self, bucket_name=None):
- """
- 查看Bucket的状态,目前包括bucket大小,bucket的object数量,bucket正在上传的Multipart Upload事件个数等。
- :param str bucket_name: bucket名称
- :return:
- :rtype: oss2.models.GetBucketStatResult
- """
- return self.get_bucket(bucket_name).get_bucket_stat()
- def set_bucket_lifecycle(self, bucket_name=None, lifecycle_rule=None):
- """
- 设置bucket生命周期
- 例:
- # 设置bucket生命周期, 有'中文/'前缀的对象在最后修改时间之后357天失效
- rule = oss2.models.LifecycleRule('lc_for_chinese_prefix', '中文/', status=oss2.models.LifecycleRule.ENABLED,
- expiration=oss2.models.LifecycleExpiration(days=357))
- # 删除相对最后修改时间365天之后的parts
- rule.abort_multipart_upload = oss2.models.AbortMultipartUpload(days=356)
- # 对象最后修改时间超过180天后转为IA
- rule.storage_transitions = [oss2.models.StorageTransition(days=180, storage_class=oss2.BUCKET_STORAGE_CLASS_IA)]
- # 对象最后修改时间超过356天后转为ARCHIVE
- rule.storage_transitions.append(oss2.models.StorageTransition(days=356,
- storage_class=oss2.BUCKET_STORAGE_CLASS_ARCHIVE))
- lifecycle = oss2.models.BucketLifecycle([rule])
- :param str bucket_name: bucket名称
- :param oss2.models.BucketLifecycle lifecycle_rule: 生命周期
- :return:
- :rtype: oss2.models.RequestResult
- """
- assert not lifecycle_rule
- return self.get_bucket(bucket_name).put_bucket_lifecycle(lifecycle_rule)
- def upload_object(self, object_name, data, bucket_name=None, headers=None, progress_callback=None):
- """
- 上传一个普通文件
- :param str bucket_name: bucket名称
- :param str object_name: 要上传的对象名称
- :param bytes or str or Readable data: 要上传的数据(字节数组、字符串或file-like object——即含有read方法的对象)
- :param dict[str,any] or oss2.CaseInsensitiveDict headers:
- :param function progress_callback: 进度回调函数
- :return:
- :rtype: oss2.models.PutObjectResult
- """
- bucket = self.get_bucket(bucket_name)
- put_object_result = bucket.put_object(object_name, data, headers, progress_callback)
- return put_object_result
- def upload_object_from_file(self, filename, object_name=None, bucket_name=None, headers=None,
- progress_callback=None):
- """
- 上传一个普通文件
- 例:
- 1. upload('my-bucket','my-file1.txt','content of my-file1')
- 2. upload('my-bucket','my-file2.txt',b'content of my-file2')
- 3. with open(oss2.to_unicode('my-file3.txt'), 'rb') as f:
- upload('my-file3.txt', f)
- :param str bucket_name: bucket名称
- :param str filename: 要上传的文件名称
- :param str object_name: 上传到OSS后新的对象名称
- :param dict[str,any] or oss2.CaseInsensitiveDict headers:
- :param function progress_callback: 进度回调函数
- :return:
- :rtype: oss2.models.PutObjectResult
- """
- if not object_name:
- object_name = os.path.basename(filename)
- bucket = self.get_bucket(bucket_name)
- put_object_result = bucket.put_object_from_file(object_name, filename, headers, progress_callback)
- return put_object_result
- def download_object(self, bucket_object_name, local_object_name=None, bucket_name=None):
- """
- 下载一个文件到本地文件
- :param str bucket_name: bucket名称
- :param str bucket_object_name: bucket上的对象名称
- :param str local_object_name: 下载到本地后保存的对象名称
- :return:
- :rtype: oss2.models.GetObjectResult
- """
- bucket = self.get_bucket(bucket_name)
- if local_object_name and bucket_object_name != local_object_name:
- # 下载并修改文件名
- get_object_result = bucket.get_object_to_file(bucket_object_name, local_object_name)
- else:
- get_object_result = bucket.get_object_to_file(bucket_object_name, bucket_object_name)
- return get_object_result
- def delete_object(self, object_name, bucket_name=None):
- """
- 删除单个对象
- :param str bucket_name: bucket名称
- :param str object_name: 对象名称
- :return:
- :rtype: oss2.models.RequestResult
- """
- bucket = self.get_bucket(bucket_name)
- return bucket.delete_object(object_name)
- def delete_objects(self, objects_name, bucket_name=None):
- """
- 批量删除对象
- :param str bucket_name: bucket名称
- :param list[str] objects_name: 对象名称list
- :return:
- :rtype: oss2.models.BatchDeleteObjectsResult
- """
- bucket = self.get_bucket(bucket_name)
- return bucket.batch_delete_objects(objects_name)
- def get_object(self, object_name, bucket_name=None):
- """
- 获取一个对象
- :param str object_name: 对象名称
- :param str bucket_name: bucket名称
- :return:
- :rtype: oss2.models.GetObjectResult
- """
- bucket = self.get_bucket(bucket_name)
- return bucket.get_object(object_name)
- def get_object_meta(self, object_name, bucket_name=None):
- """
- 获取一个对象的详细信息
- :param str object_name: 对象名称
- :param str bucket_name: bucket名称
- :return:
- :rtype: oss2.models.GetObjectMetaResult
- """
- bucket = self.get_bucket(bucket_name)
- return bucket.get_object_meta(object_name)
- def list_objects(self, bucket_name=None):
- bucket = self.get_bucket(bucket_name)
- return bucket.list_objects()
- DEFAULT_ACCESS_KEY_ID = 'LTAI5t9oGbXWacakS4PJyQsR'
- DEFAULT_ACCESS_KEY_SECRET = 'BeuOP5zsavBtsR8fQ5QmrNyczdCW1Q'
- DEFAULT_ENDPOINT = 'oss-cn-qingdao-internal.aliyuncs.com'
- DEFAULT_BUCKET_NAME = 'skb-applogo'
- DEFAULT_DOWNLOAD_ADDRESS = f'https://skb-applogo.oss-cn-qingdao.aliyuncs.com'
- DEFAULT_OSS_CLIENT = OssClient(
- DEFAULT_ACCESS_KEY_ID,
- DEFAULT_ACCESS_KEY_SECRET,
- DEFAULT_ENDPOINT,
- DEFAULT_BUCKET_NAME
- )
- if __name__ == '__main__':
- DEFAULT_OSS_CLIENT.upload_object_from_file(get_abs_path('lib/gzrt-0.8.tar.gz'))
|