最近更新时间:2024-03-19 19:20:44
使用KS3时,所有发送的非匿名请求都需要携带签名,KS3服务器端收到消息后,进行身份验证,验证成功则接受并执行请求,否则将会返回403错误信息并丢弃此请求。本文主要介绍V2签名算法在Python语言中的实现方式。
add_auth_header函数是Python SDK中auth.py文件提供的函数,该函数会计算签名并把结果填入传入的请求头中。
如下所示为该函数对应的参数信息:
参数名称 | 描述 |
---|---|
access_key_id | 您的AccessKeyID,必填。 |
secret_access_key | 您的SecretAccessKey,必填。 |
headers | 请求的HTTP头部。 |
method | 用于请求的HTTP方法(GET,POST,PUT,DELETE等)。 |
bucket | 在KS3上的存储空间名称。 |
key | 存储空间内对象的名称。 |
query_args | 与存储桶或对象关联的子资源,用于执行特定的操作,如:acl、crr、retention等。 |
如下所示为签名计算函数的具体实现:
def add_auth_header(access_key_id, secret_access_key, headers, method, bucket, key, query_args):
if not access_key_id:
return
if 'Date' not in headers:
from email.utils import formatdate
headers['Date'] = formatdate(time.time(), usegmt=True)
c_string = canonical_string(method, bucket, key, query_args, headers)
headers['Authorization'] = \
"%s %s:%s" % ("KSS", access_key_id, encode(secret_access_key, c_string))
如下所示为使用add_auth_header计算v2签名的代码示例:
注:实际使用过程中,请按照相关API文档填写请求头、子资源等信息。
from ks3.auth import add_auth_header
import ks3.utils
import time
ak = 'Access-Key-ID'
sk = 'Secret-Access-Key'
bucket_name = 'your-bucket-name'
key_name = 'your-key-name'
def common_demo():
headers = {}
sub_resource = ''
method = '' # 请求方法:GET/PUT/POST/HEAD/DELETE 等
headers['Content-Type'] = 'text/xml'
# 可手动获取时间填入请求头,否则add_auth_header将会自动获取Date并设置
from email.utils import formatdate
headers['Date'] = formatdate(time.time(), usegmt=True)
# 按需准备请求数据
data = 'data'
# 根据API文档要求,将数据转换为合适的格式,如XML
format_data = data.to_xml()
if not isinstance(format_data, bytes):
format_data = format_data.encode('utf-8')
# 参考API文档,按需计算MD5
md5 = ks3.utils.compute_base64_md5_digest(format_data)
headers['Content-MD5'] = md5
# 参考API文档,按需提供其他请求头部
headers['x-kss-xxxxx'] = 'xxxx'
headers['x-kss-xxxxx'] = 'xxxx'
headers['xxx'] = 'xxxx'
add_auth_header(ak, sk, headers, method, bucket_name, key_name, sub_resource)
print('sign: {}'.format(headers['Authorization']))
from ks3.auth import add_auth_header
import ks3.utils
import time
ak = 'Access-Key-ID'
sk = 'Secret-Access-Key'
bucket_name = 'your-bucket-name'
key_name = 'your-key-name'
def main():
# 计算列举桶的签名(不需要提供资源和子资源名)
calculate_list_buckets_sign()
# 计算列举对象的签名(提供桶名称)
calculate_list_object_sign()
# 计算删除对象的签名(提供桶和对象名称)
calculate_delete_object_sign()
# 计算获取对象ACL的签名(提供桶、对象和子资源)
calculate_get_object_acl_sign()
# 计算上传对象的签名(提供额外请求头、桶和对象)
calculate_put_object_sign()
# 计算设置复制规则的签名(提供请求体及其MD5值,以及其它必要信息)
calculate_put_bucket_replication_sign()
def calculate_list_buckets_sign():
headers = {}
add_auth_header(ak, sk, headers, 'GET', '', '', '')
print('sign: {}'.format(headers['Authorization']))
def calculate_list_object_sign():
headers = {}
add_auth_header(ak, sk, headers, 'GET', bucket_name, '', '')
print('sign: {}'.format(headers['Authorization']))
def calculate_delete_object_sign():
headers = {}
add_auth_header(ak, sk, headers, 'DELETE', bucket_name, key_name, '')
print('sign: {}'.format(headers['Authorization']))
def calculate_get_object_acl_sign():
headers = {}
sub_resource = 'acl'
add_auth_header(ak, sk, headers, 'GET', bucket_name, key_name, sub_resource)
print('sign: {}'.format(headers['Authorization']))
def calculate_put_object_sign():
headers = {
'Content-Type': 'text/plain',
'x-kss-storage-class': 'STANDARD'
}
add_auth_header(ak, sk, headers, 'PUT', bucket_name, key_name, '')
print('sign: {}'.format(headers['Authorization']))
def calculate_put_bucket_replication_sign():
from ks3.xmlParsers.bucketCrossReplicate import BucketCrossReplicate
target_bucket = 'target_bucket_name'
sub_resource = 'crr'
headers = {}
replicate = BucketCrossReplicate(target_bucket, 'Disabled', None)
replicate_xml = replicate.to_xml()
if not isinstance(replicate_xml, bytes):
replicate_xml = replicate_xml.encode('utf-8')
md5 = ks3.utils.compute_base64_md5_digest(replicate_xml)
# Content-Type和Content-MD5会参与到签名计算中
headers['Content-Type'] = 'text/xml'
headers['Content-MD5'] = md5
add_auth_header(ak, sk, headers, 'PUT', bucket_name, '', sub_resource)
print('sign: {}'.format(headers['Authorization']))
if __name__ == "__main__":
main()
纯净模式