最近更新时间:2026-06-03 21:32:11
from ks3.connection import Connection
# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')
# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')
# 生成分块上传的外链
init_mpu_url = b.presign(60, method="POST", key_name='<YOUR_KEY_NAME>', params={'uploads': None})
更多初始化分块的详情,参考 Initiate Multipart Upload
from ks3.connection import Connection
# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')
# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')
# list分块上传任务
list_mpu_params = {'key-marker': '<YOUR_KEY_MARKER>', 'upload-id-marker': '', 'uploads': None, 'max-uploads': '2'}
list_mpu_url = b.presign(60, method="GET", params=list_mpu_params)
更多列举分块的详情,参考 List Multipart Uploads
from ks3.connection import Connection
# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')
# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')
upload_mpu_url = b.presign(60, method="PUT", key_name='<YOUR_KEY_NAME>', params={'partNumber': '1', 'uploadId': '<YOUR_UPLOAD_ID>'})
更多上传分块的详情,参考 Upload Part
from ks3.connection import Connection
# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')
# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')
# list分块
list_parts_params = {'uploadId': '<YOUR_UPLOAD_ID>'}
list_parts_url = b.presign(60, method="GET", params=list_parts_params, key_name='<YOUR_KEY_NAME>')
更多列举分块的详情,参考 List Parts
from ks3.connection import Connection
import requests
# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')
# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')
# 分块上传合并
complete_data = '''<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUpload>
<Part>
<PartNumber>1</PartNumber>
<ChecksumCRC64ECMA>{CRC64}</ChecksumCRC64ECMA>
<ETag>{ETag}</ETag>
</Part>
</CompleteMultipartUpload>'''
# 生成url不需要body数据
complete_mpu_url = b.presign(60, method="POST", key_name='<YOUR_KEY_NAME>', params={'uploadId':'<YOUR_UPLOAD_ID>'})
# 使用生成的url,完成分块任务
result = requests.post(url=complete_mpu_url, data=complete_data)
print('request complete_mpu_url, status_code: ', result.status_code)
print('request complete_mpu_url, request_id: ', result.headers['x-kss-request-id'])
更多完成分块任务的详情,参考 Complete Multipart Upload
from ks3.connection import Connection
# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')
# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')
# 终止分块上传
abort_mpu_url = b.presign(60, method="DELETE", key_name='<YOUR_KEY_NAME>', params={'uploadId': '<YOUR_UPLOAD_ID>'})
更多终止分块任务的详情,参考 Abort Multipart Upload
from ks3.connection import Connection
import requests
from xml.etree import ElementTree
# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')
# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')
key_name = 'test-multipart-upload'
# 生成初始化分块上传的外链
init_mpu_url = b.presign(60, method="POST", key_name=key_name, params={'uploads': None})
result = requests.post(url=init_mpu_url)
print('request init_mpu_url, request_id: ', result.headers['x-kss-request-id'])
# xml序列化content,然后取UploadId
root = ElementTree.fromstring(result.content)
# 提取命名空间
namespace = {'ns': 'http://s3.amazonaws.com/doc/2006-03-01/'}
upload_id = root.find('ns:UploadId', namespace).text
print(upload_id)
# 生成分块上传的外链
upload_mpu_url = b.presign(60, method="PUT", key_name=key_name, params={'partNumber': '1', 'uploadId': upload_id})
result = requests.put(url=upload_mpu_url, data='hello')
print('request upload_mpu_url, status_code: ', result.status_code)
print('request upload_mpu_url, request_id: ', result.headers['x-kss-request-id'])
# 完成分块合并
# 拼写完成分块合并的xml,具体参考https://docs.ksyun.com/documents/42469
complete_data = '''<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUpload>
<Part>
<PartNumber>1</PartNumber>
<ETag>{ETag}</ETag>
</Part>
</CompleteMultipartUpload>'''
# 生成完成分块上传的外链
complete_mpu_url = b.presign(60, method="POST", key_name=key_name, params={'uploadId':upload_id})
result = requests.post(url=complete_mpu_url, data=complete_data)
print('request complete_mpu_url, status_code: ', result.status_code)
print('request complete_mpu_url, request_id: ', result.headers['x-kss-request-id'])
纯净模式
