最近更新时间:2025-06-11 14:15:49
以下代码为分块上传完整流程示例:
import math, os
from ks3.connection import Connection
from filechunkio import FileChunkIO
# 金山云主账号AccessKey拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号账号进行API访问或日常运维,请登录https://uc.console.ksyun.com/pro/iam/#/user/list创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域创建新的存储空间。host(Endpoint) 以北京为例,其它Region请按实际情况填写。
conn = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')
# 获取存储空间实例
b = conn.get_bucket('<yourBucketName>')
# 源文件路径
source_path = '<yourSourceFilePath>'
# 源文件大小
source_size = os.stat(source_path).st_size
# 初始化分块。获取初始化的uploadId,之后的操作中将会用到
# 如需在初始化分块时设置文件存储类型,请在 initiate_multipart_upload 中设置相关headers
# x-kss-storage-class有效值为"STANDARD"、"STANDARD_IA"。"STANDARD"表示标准存储,"STANDARD_IA"表示低频存储,如果不指定,默认为标准存储。
headers = {"x-kss-storage-class": "STANDARD"}
mp = b.initiate_multipart_upload('<yourKeyName>', headers=headers)
print('initiate_multipart_upload, requestid: ', mp.response_metadata.request_id)
# 举例以50 MiB为分块大小
chunk_size = 52428800
chunk_count = int(math.ceil(source_size*1.0 / chunk_size*1.0))
# 通过FileChunkIO将文件分块
for i in range(chunk_count):
offset = chunk_size * i
bytes = min(chunk_size, source_size - offset)
with FileChunkIO(source_path, 'r', offset=offset, bytes=bytes) as fp:
# 逐个上传分块
ret = mp.upload_part_from_file(fp, part_num=i + 1)
print('upload_part_from_file, requestid: ', ret.response_metadata.request_id)
# 发送请求,合并分片,完成分片上传
cmp = mp.complete_upload()
print('complete_upload, requestid: ', cmp.response_metadata.request_id)
if cmp and cmp.status == 200:
print("上传成功")
有关分块上传的更多详情,请参见分块上传简介。
以下代码用于取消分块上传:
from ks3.connection import Connection
# 金山云主账号AccessKey拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号账号进行API访问或日常运维,请登录https://uc.console.ksyun.com/pro/iam/#/user/list创建子账号。
# 通过指定host(Endpoint),您可以在指定的地域创建新的存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')
# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')
# 取一个分块任务对象
uploads = b.get_all_multipart_uploads(max_uploads=1)
some_mp = list(uploads)[0]
# 取消一个分块上传任务,已上传的分块会被删除
ret = some_mp.cancel_upload()
print('cancel_upload, id: %s, status: %s, request_id: %s' % (some_mp.id, ret.status, ret.response_metadata.request_id))
if ret and ret.status == 204:
print('取消分块上传成功')
取消分块上传操作相关API,请参见Abort Multipart Upload。
以下代码用于列举已上传的分块信息:
from ks3.connection import Connection
# 金山云主账号AccessKey拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号账号进行API访问或日常运维,请登录https://uc.console.ksyun.com/pro/iam/#/user/list创建子账号。
# 通过指定host(Endpoint),您可以在指定的地域创建新的存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')
# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')
# 取一个分块任务对象
uploads = b.get_all_multipart_uploads(max_uploads=1)
some_mp = list(uploads)[0]
# 列出指定上传任务中已上传的分块信息,单次请求最多返回1000个分块信息
# 方式一:循环调用接口以列出指定任务的全部分块信息
more_results = True
part_number_marker = 0
while more_results:
parts = some_mp.get_all_parts(
max_parts=1000, # max_parts 设置响应体中块的上限数量,默认值为1000
part_number_marker=part_number_marker # part_number_marker 指定应该从哪个分块开始列举,只有比设定值大的分块才会被列举
)
print("get_all_parts, request_id: ", parts.response_metadata.request_id)
for part in parts:
print('part etag:%s' % part.etag)
print('part_number:%s' % part.part_number)
part_number_marker = some_mp.next_part_number_marker
more_results = some_mp.is_truncated
# 方式二:通过直接遍历分块任务对象的方式,自动循环调用接口,获取所有分块信息
for part in some_mp:
print('part etag:%s' % part.etag)
print('part_number:%s' % part.part_number)
列举已上传的分块信息相关API,请参见List Parts。
以下代码用于列举存储空间下的所有分块上传事件:
from ks3.connection import Connection
# 金山云主账号AccessKey拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号账号进行API访问或日常运维,请登录https://uc.console.ksyun.com/pro/iam/#/user/list创建子账号。
# 通过指定host(Endpoint),您可以在指定的地域创建新的存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')
# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')
# 列举Bucket中所有的分块上传事件
key_marker='<yourKey>'
for p in b.list_multipart_uploads(key_marker=key_marker):
print('uploadId:%s,key:%s' % (p.id, p.key_name))
for i in p:
print(i.part_number, i.size, i.etag, i.last_modified)
列举所有分块上传事件相关API,请参见List Multipart Uploads。
纯净模式