众所周知,异步并发编程可以帮助程序更好地处理阻塞操作,比如网络 IO 操作或文件 IO 操作,避免因等待这些操作完成而导致程序卡住的情况。云存储文件传输场景正好包含网络 IO 操作和文件 IO 操作,比如业内相对著名的七牛云存储,官方sdk的默认阻塞传输模式虽然差强人意,但未免有些循规蹈矩,不够锐意创新。在全球同性交友网站Github上找了一圈,也没有找到异步版本,那么本次我们来自己动手将同步阻塞版本改造为异步非阻塞版本,并上传至Python官方库。
异步改造
首先参见七牛云官方接口文档:https://developer.qiniu.com/kodo,新建qiniu_async.py文件:
# @Author:Liu Yue (v3u.cn)
# @Software:Vscode
# @Time:2022/12/30
import base64
import hmac
import time
from hashlib import sha1
import json
import httpx
import aiofiles
class Qiniu:
def __init__(self, access_key, secret_key):
"""初始化"""
self.__checkKey(access_key, secret_key)
self.__access_key = access_key
self.__secret_key = secret_key.encode('utf-8')
def get_access_key(self):
return self.__access_key
def get_secret_key(self):
return self.__secret_key
def __token(self, data):
hashed = hmac.new(self.__secret_key,data.encode('utf-8'), sha1)
return self.urlsafe_base64_encode(hashed.digest())
def token(self, data):
return '{0}:{1}'.format(self.__access_key, self.__token(data))
def token_with_data(self, data):
data = self.urlsafe_base64_encode(data)
return '{0}:{1}:{2}'.format(
self.__access_key, self.__token(data), data)
def urlsafe_base64_encode(self,data):
if isinstance(data, str):
data = data.encode('utf-8')
ret = base64.urlsafe_b64encode(data)
data = ret.decode('utf-8')
return data
@staticmethod
def __checkKey(access_key, secret_key):
if not (access_key and secret_key):
raise ValueError('invalid key')
def upload_token(
self,
bucket,
key=None,
expires=3600,
policy=None,
strict_policy=True):
"""生成上传凭证
Args:
bucket: 上传的空间名
key: 上传的文件名,默认为空
expires: 上传凭证的过期时间,默认为3600s
policy: 上传策略,默认为空
Returns:
上传凭证
"""
if bucket is None or bucket == '':
raise ValueError('invalid bucket name')
scope = bucket
if key is not None:
scope = '{0}:{1}'.format(bucket, key)
args = dict(
scope=scope,
deadline=int(time.time()) + expires,
)
return self.__upload_token(args)
@staticmethod
def up_token_decode(up_token):
up_token_list = up_token.split(':')
ak = up_token_list[0]
sign = base64.urlsafe_b64decode(up_token_list[1])
decode_policy = base64.urlsafe_b64decode(up_token_list[2])
decode_policy = decode_policy.decode('utf-8')
dict_policy = json.loads(decode_policy)
return ak, sign, dict_policy
def __upload_token(self, policy):
data = json.dumps(policy, separators=(',', ':'))
return self.token_with_data(data)
@staticmethod
def __copy_policy(policy, to, strict_policy):
for k, v in policy.items():
if (not strict_policy) or k in _policy_fields:
to[k] = v
这里有两个很关键的异步非阻塞三方库,分别是httpx和aiofiles,对应处理网络IO和文件IO阻塞问题:
pip3 install httpx
pip3 install aiofiles
随后按照文档流程通过加密方法获取文件上传token,这里无须进行异步改造,因为并不涉及IO操作:
q = Qiniu(access_key,access_secret)
token = q.upload_token("空间名称")
print(token)
程序返回:
? mydemo git:(master) ? /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"
q06bq54Ps5JLfZyP8Ax-qvByMBdu8AoIVJpMco2m:8RjIo9a4CxHM3009DwjbMxDzlU8=:eyJzY29wZSI6ImFkLWgyMTEyIiwiZGVhZGxpbmUiOjE2NzIzNjg2NTd9
接着添加文件流推送方法,先看官方原版逻辑:
def put_data(
up_token, key, data, params=None, mime_type='application/octet-stream', check_crc=False, pro