设为首页 加入收藏

TOP

乾坤大挪移,如何将同步阻塞(sync)三方库包转换为异步非阻塞(async)模式?Python3.10实现。(二)
2023-07-25 21:29:40 】 浏览:145
Tags:何将同 步阻塞 sync 方库包 步非阻 async 模式 Python3.10 实现
gress_handler=None, fname=None, hostscache_dir=None, metadata=None): """上传二进制流到七牛 Args: up_token: 上传凭证 key: 上传文件名 data: 上传二进制流 params: 自定义变量,规格参考 https://developer.qiniu.com/kodo/manual/vars#xvar mime_type: 上传数据的mimeType check_crc: 是否校验crc32 progress_handler: 上传进度 hostscache_dir: host请求 缓存文件保存位置 metadata: 元数据 Returns: 一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"} 一个ResponseInfo对象 """ final_data = b'' if hasattr(data, 'read'): while True: tmp_data = data.read(config._BLOCK_SIZE) if len(tmp_data) == 0: break else: final_data += tmp_data else: final_data = data crc = crc32(final_data) return _form_put(up_token, key, final_data, params, mime_type, crc, hostscache_dir, progress_handler, fname, metadata=metadata) def _form_put(up_token, key, data, params, mime_type, crc, hostscache_dir=None, progress_handler=None, file_name=None, modify_time=None, keep_last_modified=False, metadata=None): fields = {} if params: for k, v in params.items(): fields[k] = str(v) if crc: fields['crc32'] = crc if key is not None: fields['key'] = key fields['token'] = up_token if config.get_default('default_zone').up_host: url = config.get_default('default_zone').up_host else: url = config.get_default('default_zone').get_up_host_by_token(up_token, hostscache_dir) # name = key if key else file_name fname = file_name if not fname or not fname.strip(): fname = 'file_name' # last modify time if modify_time and keep_last_modified: fields['x-qn-meta-!Last-Modified'] = rfc_from_timestamp(modify_time) if metadata: for k, v in metadata.items(): if k.startswith('x-qn-meta-'): fields[k] = str(v) r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)}) if r is None and info.need_retry(): if info.connect_failed: if config.get_default('default_zone').up_host_backup: url = config.get_default('default_zone').up_host_backup else: url = config.get_default('default_zone').get_up_host_backup_by_token(up_token, hostscache_dir) if hasattr(data, 'read') is False: pass elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()): data.seek(0) else: return r, info r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)}) return r, info

这里官方使用两个方法,先试用put_data方法将字符串转换为二进制文件流,随后调用_form_put进行同步上传操作,这里_form_put这个私有方法是可复用的,既兼容文件流也兼容文件实体,写法上非常值得我们借鉴,弄明白了官方原版的流程后,让我们撰写文件流传输的异步版本:

# 上传文件流  
    async def upload_data(self,up_token, key,data,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):  
  
        data.encode('utf-8')  
          
        fields = {}  
        if params:  
            for k, v in params.items():  
                fields[k] = str(v)  
  
        if key is not None:  
            fields['key'] = key  
        fields['token'] = up_token  
  
        fname = file_name  
        if not fname or not fname.strip():  
            fname = 'file_name'  
  
        async with httpx.AsyncClient() as client:  
  
            # 调用异步使用await关键字  
            res = await client.post(url,data=fields,files={'file': (fname,data,mime_type)})  
  
            print(res.text)

这里我们声明异步方法upload_data,通过encode直接转换文件流,并使用异步httpx.AsyncClient()对象将文件流推送到官网接口地址:up-z1.qiniup.com

随后进行测试:

import asyncio
q = qiniu_async.Qiniu("accesskey","accesssecret")  
  
t
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇anaconda peompt 、labalimg 数据.. 下一篇【脚本项目源码】Python制作提升..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目