全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

使用外链分块上传(Python)

最近更新时间:2026-06-03 21:32:11

生成初始化分块的外链

from ks3.connection import Connection

# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')

# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')

# 生成分块上传的外链
init_mpu_url = b.presign(60, method="POST", key_name='<YOUR_KEY_NAME>', params={'uploads': None})

更多初始化分块的详情,参考 Initiate Multipart Upload

生成列举分块任务的外链

from ks3.connection import Connection

# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')

# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')

# list分块上传任务
list_mpu_params = {'key-marker': '<YOUR_KEY_MARKER>', 'upload-id-marker': '', 'uploads': None, 'max-uploads': '2'}
list_mpu_url = b.presign(60, method="GET", params=list_mpu_params)

更多列举分块的详情,参考 List Multipart Uploads

生成上传分块的外链

from ks3.connection import Connection

# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')

# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')

upload_mpu_url = b.presign(60, method="PUT", key_name='<YOUR_KEY_NAME>', params={'partNumber': '1', 'uploadId': '<YOUR_UPLOAD_ID>'})

更多上传分块的详情,参考 Upload Part

生成列举分块的外链

from ks3.connection import Connection

# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')

# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')

# list分块
list_parts_params = {'uploadId': '<YOUR_UPLOAD_ID>'}
list_parts_url = b.presign(60, method="GET", params=list_parts_params, key_name='<YOUR_KEY_NAME>')

更多列举分块的详情,参考 List Parts

生成完成分块任务的外链

from ks3.connection import Connection
import requests

# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')

# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')

# 分块上传合并
complete_data = '''<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUpload>
  <Part>
    <PartNumber>1</PartNumber>
    <ChecksumCRC64ECMA>{CRC64}</ChecksumCRC64ECMA>
    <ETag>{ETag}</ETag>
  </Part>
</CompleteMultipartUpload>'''
# 生成url不需要body数据
complete_mpu_url = b.presign(60, method="POST", key_name='<YOUR_KEY_NAME>', params={'uploadId':'<YOUR_UPLOAD_ID>'})
# 使用生成的url,完成分块任务
result = requests.post(url=complete_mpu_url, data=complete_data)
print('request complete_mpu_url, status_code: ', result.status_code)
print('request complete_mpu_url, request_id: ', result.headers['x-kss-request-id'])

更多完成分块任务的详情,参考 Complete Multipart Upload

生成终止分块任务的外链

from ks3.connection import Connection

# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')

# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')

# 终止分块上传
abort_mpu_url = b.presign(60, method="DELETE", key_name='<YOUR_KEY_NAME>', params={'uploadId': '<YOUR_UPLOAD_ID>'})

更多终止分块任务的详情,参考 Abort Multipart Upload

适用于预签名链接分块上传文件的示例

from ks3.connection import Connection
import requests
from xml.etree import ElementTree

# 金山云主账号 AccessKey 拥有所有API的访问权限,风险很高。
# 强烈建议您创建并使用子账号进行 API 访问或日常运维,请登录 https://uc.console.ksyun.com/pro/iam/#/user/list 创建子账号。
# 通过指定 host(Endpoint),您可以在指定的地域使用存储空间。
c = Connection('<YOUR_ACCESS_KEY>', '<YOUR_SECRET_KEY>', host='<YOUR_REGION_ENDPOINT>')

# 获取存储空间实例
b = c.get_bucket('<YOUR_BUCKET_NAME>')

key_name = 'test-multipart-upload'

# 生成初始化分块上传的外链
init_mpu_url = b.presign(60, method="POST", key_name=key_name, params={'uploads': None})
result = requests.post(url=init_mpu_url)
print('request init_mpu_url, request_id: ', result.headers['x-kss-request-id'])
# xml序列化content,然后取UploadId
root = ElementTree.fromstring(result.content)
# 提取命名空间
namespace = {'ns': 'http://s3.amazonaws.com/doc/2006-03-01/'}
upload_id = root.find('ns:UploadId', namespace).text
print(upload_id)

# 生成分块上传的外链
upload_mpu_url = b.presign(60, method="PUT", key_name=key_name, params={'partNumber': '1', 'uploadId': upload_id})
result = requests.put(url=upload_mpu_url, data='hello')
print('request upload_mpu_url, status_code: ', result.status_code)
print('request upload_mpu_url, request_id: ', result.headers['x-kss-request-id'])

# 完成分块合并
# 拼写完成分块合并的xml,具体参考https://docs.ksyun.com/documents/42469
complete_data = '''<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUpload>
  <Part>
    <PartNumber>1</PartNumber>
    <ETag>{ETag}</ETag>
  </Part>
</CompleteMultipartUpload>'''
# 生成完成分块上传的外链
complete_mpu_url = b.presign(60, method="POST", key_name=key_name, params={'uploadId':upload_id})
result = requests.post(url=complete_mpu_url, data=complete_data)
print('request complete_mpu_url, status_code: ', result.status_code)
print('request complete_mpu_url, request_id: ', result.headers['x-kss-request-id'])
文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈