import os import oss2 from dw_base.utils.file_utils import get_abs_path class Readable: def read(self): pass class OssClient: """ OSS术语 English | 中文 Bucket | 存储空间 Object | 对象或者文件 Endpoint | OSS 访问域名 Region | 地域或者数据中心 AccessKey | AccessKeyId 和 AccessKeySecret 的统称,访问密钥 Put Object | 简单上传 Post Object | 表单上传 Multipart Upload | 分片上传 Append Object | 追加上传 Get Object | 简单下载 | 回调 Object Meta | 文件元信息。用来描述文件信息,例如长度,类型等 Data | 文件数据 Key | 文件名 ACL (Access Control List)| 存储空间或者文件的权限 """ def __init__(self, access_key_id, access_key_secret, endpoint, bucket_name=None): """ 提供操作OSS各种方法 :param str access_key_id: :param str access_key_secret: :param str endpoint: :rtype: OssClient """ self._access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', access_key_id) self._access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', access_key_secret) self._endpoint = os.getenv('OSS_TEST_ENDPOINT', endpoint) self._auth = oss2.Auth(self._access_key_id, self._access_key_secret) self._bucket_name = bucket_name if bucket_name: self._bucket = oss2.Bucket(self._auth, self._endpoint, bucket_name) # 创建一个Service对象 self._service = oss2.Service(self._auth, self._endpoint) def list_buckets(self, prefix='', marker='', max_keys=100, params=None): """ 根据前缀罗列用户的Bucket :param str prefix: 只罗列Bucket名为该前缀的Bucket,空串表示罗列所有的Bucket :param str marker: 分页标志。首次调用传空串,后续使用返回值中的next_marker :param int max_keys: 每次调用最多返回的Bucket数目 :param dict params: list操作参数,传入'tag-key','tag-value'对结果进行过滤 :return: 罗列的结果 :rtype: oss2.models.ListBucketsResult """ list_buckets_result = self._service.list_buckets(prefix, marker, max_keys, params) return list_buckets_result def get_bucket(self, bucket_name=None): """ 获取bucket :param str bucket_name: bucket名称 :return: :rtype: oss2.Bucket """ assert bucket_name or self._bucket_name, 'need bucket name since default bucket is not provided' if bucket_name: return oss2.Bucket(self._auth, self._endpoint, bucket_name) return self._bucket def get_bucket_info(self, bucket_name=None): """ 获取bucket相关信息,如创建时间,访问Endpoint,Owner与ACL等。 :param str bucket_name: bucket名称 :return: :rtype: oss2.models.GetBucketInfoResult """ return self.get_bucket(bucket_name).get_bucket_info() def get_bucket_status(self, bucket_name=None): """ 查看Bucket的状态,目前包括bucket大小,bucket的object数量,bucket正在上传的Multipart Upload事件个数等。 :param str bucket_name: bucket名称 :return: :rtype: oss2.models.GetBucketStatResult """ return self.get_bucket(bucket_name).get_bucket_stat() def set_bucket_lifecycle(self, bucket_name=None, lifecycle_rule=None): """ 设置bucket生命周期 例: # 设置bucket生命周期, 有'中文/'前缀的对象在最后修改时间之后357天失效 rule = oss2.models.LifecycleRule('lc_for_chinese_prefix', '中文/', status=oss2.models.LifecycleRule.ENABLED, expiration=oss2.models.LifecycleExpiration(days=357)) # 删除相对最后修改时间365天之后的parts rule.abort_multipart_upload = oss2.models.AbortMultipartUpload(days=356) # 对象最后修改时间超过180天后转为IA rule.storage_transitions = [oss2.models.StorageTransition(days=180, storage_class=oss2.BUCKET_STORAGE_CLASS_IA)] # 对象最后修改时间超过356天后转为ARCHIVE rule.storage_transitions.append(oss2.models.StorageTransition(days=356, storage_class=oss2.BUCKET_STORAGE_CLASS_ARCHIVE)) lifecycle = oss2.models.BucketLifecycle([rule]) :param str bucket_name: bucket名称 :param oss2.models.BucketLifecycle lifecycle_rule: 生命周期 :return: :rtype: oss2.models.RequestResult """ assert not lifecycle_rule return self.get_bucket(bucket_name).put_bucket_lifecycle(lifecycle_rule) def upload_object(self, object_name, data, bucket_name=None, headers=None, progress_callback=None): """ 上传一个普通文件 :param str bucket_name: bucket名称 :param str object_name: 要上传的对象名称 :param bytes or str or Readable data: 要上传的数据(字节数组、字符串或file-like object——即含有read方法的对象) :param dict[str,any] or oss2.CaseInsensitiveDict headers: :param function progress_callback: 进度回调函数 :return: :rtype: oss2.models.PutObjectResult """ bucket = self.get_bucket(bucket_name) put_object_result = bucket.put_object(object_name, data, headers, progress_callback) return put_object_result def upload_object_from_file(self, filename, object_name=None, bucket_name=None, headers=None, progress_callback=None): """ 上传一个普通文件 例: 1. upload('my-bucket','my-file1.txt','content of my-file1') 2. upload('my-bucket','my-file2.txt',b'content of my-file2') 3. with open(oss2.to_unicode('my-file3.txt'), 'rb') as f: upload('my-file3.txt', f) :param str bucket_name: bucket名称 :param str filename: 要上传的文件名称 :param str object_name: 上传到OSS后新的对象名称 :param dict[str,any] or oss2.CaseInsensitiveDict headers: :param function progress_callback: 进度回调函数 :return: :rtype: oss2.models.PutObjectResult """ if not object_name: object_name = os.path.basename(filename) bucket = self.get_bucket(bucket_name) put_object_result = bucket.put_object_from_file(object_name, filename, headers, progress_callback) return put_object_result def download_object(self, bucket_object_name, local_object_name=None, bucket_name=None): """ 下载一个文件到本地文件 :param str bucket_name: bucket名称 :param str bucket_object_name: bucket上的对象名称 :param str local_object_name: 下载到本地后保存的对象名称 :return: :rtype: oss2.models.GetObjectResult """ bucket = self.get_bucket(bucket_name) if local_object_name and bucket_object_name != local_object_name: # 下载并修改文件名 get_object_result = bucket.get_object_to_file(bucket_object_name, local_object_name) else: get_object_result = bucket.get_object_to_file(bucket_object_name, bucket_object_name) return get_object_result def delete_object(self, object_name, bucket_name=None): """ 删除单个对象 :param str bucket_name: bucket名称 :param str object_name: 对象名称 :return: :rtype: oss2.models.RequestResult """ bucket = self.get_bucket(bucket_name) return bucket.delete_object(object_name) def delete_objects(self, objects_name, bucket_name=None): """ 批量删除对象 :param str bucket_name: bucket名称 :param list[str] objects_name: 对象名称list :return: :rtype: oss2.models.BatchDeleteObjectsResult """ bucket = self.get_bucket(bucket_name) return bucket.batch_delete_objects(objects_name) def get_object(self, object_name, bucket_name=None): """ 获取一个对象 :param str object_name: 对象名称 :param str bucket_name: bucket名称 :return: :rtype: oss2.models.GetObjectResult """ bucket = self.get_bucket(bucket_name) return bucket.get_object(object_name) def get_object_meta(self, object_name, bucket_name=None): """ 获取一个对象的详细信息 :param str object_name: 对象名称 :param str bucket_name: bucket名称 :return: :rtype: oss2.models.GetObjectMetaResult """ bucket = self.get_bucket(bucket_name) return bucket.get_object_meta(object_name) def list_objects(self, bucket_name=None): bucket = self.get_bucket(bucket_name) return bucket.list_objects() DEFAULT_ACCESS_KEY_ID = 'LTAI5t9oGbXWacakS4PJyQsR' DEFAULT_ACCESS_KEY_SECRET = 'BeuOP5zsavBtsR8fQ5QmrNyczdCW1Q' DEFAULT_ENDPOINT = 'oss-cn-qingdao-internal.aliyuncs.com' DEFAULT_BUCKET_NAME = 'skb-applogo' DEFAULT_DOWNLOAD_ADDRESS = f'https://skb-applogo.oss-cn-qingdao.aliyuncs.com' DEFAULT_OSS_CLIENT = OssClient( DEFAULT_ACCESS_KEY_ID, DEFAULT_ACCESS_KEY_SECRET, DEFAULT_ENDPOINT, DEFAULT_BUCKET_NAME ) if __name__ == '__main__': DEFAULT_OSS_CLIENT.upload_object_from_file(get_abs_path('lib/gzrt-0.8.tar.gz'))