gress_handler=None,
fname=None, hostscache_dir=None, metadata=None):
"""上传二进制流到七牛
Args:
up_token: 上传凭证
key: 上传文件名
data: 上传二进制流
params: 自定义变量,规格参考 https://developer.qiniu.com/kodo/manual/vars#xvar
mime_type: 上传数据的mimeType
check_crc: 是否校验crc32
progress_handler: 上传进度
hostscache_dir: host请求 缓存文件保存位置
metadata: 元数据
Returns:
一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"}
一个ResponseInfo对象
"""
final_data = b''
if hasattr(data, 'read'):
while True:
tmp_data = data.read(config._BLOCK_SIZE)
if len(tmp_data) == 0:
break
else:
final_data += tmp_data
else:
final_data = data
crc = crc32(final_data)
return _form_put(up_token, key, final_data, params, mime_type,
crc, hostscache_dir, progress_handler, fname, metadata=metadata)
def _form_put(up_token, key, data, params, mime_type, crc, hostscache_dir=None, progress_handler=None, file_name=None,
modify_time=None, keep_last_modified=False, metadata=None):
fields = {}
if params:
for k, v in params.items():
fields[k] = str(v)
if crc:
fields['crc32'] = crc
if key is not None:
fields['key'] = key
fields['token'] = up_token
if config.get_default('default_zone').up_host:
url = config.get_default('default_zone').up_host
else:
url = config.get_default('default_zone').get_up_host_by_token(up_token, hostscache_dir)
# name = key if key else file_name
fname = file_name
if not fname or not fname.strip():
fname = 'file_name'
# last modify time
if modify_time and keep_last_modified:
fields['x-qn-meta-!Last-Modified'] = rfc_from_timestamp(modify_time)
if metadata:
for k, v in metadata.items():
if k.startswith('x-qn-meta-'):
fields[k] = str(v)
r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})
if r is None and info.need_retry():
if info.connect_failed:
if config.get_default('default_zone').up_host_backup:
url = config.get_default('default_zone').up_host_backup
else:
url = config.get_default('default_zone').get_up_host_backup_by_token(up_token, hostscache_dir)
if hasattr(data, 'read') is False:
pass
elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()):
data.seek(0)
else:
return r, info
r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})
return r, info
这里官方使用两个方法,先试用put_data方法将字符串转换为二进制文件流,随后调用_form_put进行同步上传操作,这里_form_put这个私有方法是可复用的,既兼容文件流也兼容文件实体,写法上非常值得我们借鉴,弄明白了官方原版的流程后,让我们撰写文件流传输的异步版本:
# 上传文件流
async def upload_data(self,up_token, key,data,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):
data.encode('utf-8')
fields = {}
if params:
for k, v in params.items():
fields[k] = str(v)
if key is not None:
fields['key'] = key
fields['token'] = up_token
fname = file_name
if not fname or not fname.strip():
fname = 'file_name'
async with httpx.AsyncClient() as client:
# 调用异步使用await关键字
res = await client.post(url,data=fields,files={'file': (fname,data,mime_type)})
print(res.text)
这里我们声明异步方法upload_data,通过encode直接转换文件流,并使用异步httpx.AsyncClient()对象将文件流推送到官网接口地址:up-z1.qiniup.com
随后进行测试:
import asyncio
q = qiniu_async.Qiniu("accesskey","accesssecret")
t
|