ork(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] for task in asyncio.as_completed(tasks): result = await task print('Task ret: {}'.format(result)) start = now() loop = asyncio.get_event_loop() done = loop.run_until_complete(main()) print('TIME: ', now() - start)
协程嵌套
使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来 官网实例:
图解:
1、run_until_complete运行,会注册task(协程:print_sum)并开启事件循环 →
2、print_sum协程中嵌套了子协程,此时print_sum协程暂停(类似委托生成器),转到子协程(协程:compute)中运行代码,期间子协程需sleep1秒钟,直接将结果反馈到event loop中,即将控制权转回调用方,而中间的print_sum暂停不操作 →
3、1秒后,调用方将控制权给到子协程(调用方与子协程直接通信),子协程执行接下来的代码,直到再遇到wait(此实例没有)→
4、 最后执行到return语句,子协程向上级协程(print_sum抛出异常:StopIteration),同时将return返回的值返回给上级协程(print_sum中的result接收值),print_sum继续执行暂时时后续的代码,直到遇到return语句 →
5、向 event loop 抛出StopIteration异常,此时协程任务都已经执行完毕,事件循环执行完成(event loop :the loop is stopped),close事件循环。
调度线程
asyncio.run_coroutine_threadsafe(coro, loop) 等待其他线程返回一个concurrent.futures.Future对象,这是一个线程安全的方法。 这个函数应该从不同的OS线程调用,而不是从事件循环所在的线程调用。
def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def do_some_work(x): print('Waiting {}'.format(x)) await asyncio.sleep(x) print('Done after {}s'.format(x)) def more_work(x): print('More work {}'.format(x)) time.sleep(x) print('Finished more work {}'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: {}'.format(time.time() - start)) asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
上述的例子,主线程中创建一个new_loop,然后在另外的子线程中开启一个无限事件循环。主线程通过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被block。一共执行的时间大概在6s左右。 run_in_executor
import time import asyncio async def main(): print(f'{time.ctime()} Hello') await asyncio.sleep(1.0) print(f'{time.ctime()} Goodbye') loop.stop() def blocking():
输出
Fri Jan 4 15:32:03 2019 Hello Fri Jan 4 15:32:04 2019 Hello from a thread! Fri Jan 4 15:32:04 2019 Goodbye
下面对上面的函数的序号进行讲解:
1 这个函数调用了常规的sleep(),这会阻塞主线程并阻止loop运行,我们不能使这个函数变成协程,更糟糕的是不能在主线程运行loop时调用它,解决办法是用一个executor来运行它; 2 注意一点,这个sleep运行时间比协程中的sleep运行时间要短,后文再讨论如果长的话会发生什么; 3 该方法帮助我们在事件loop里用额外的线程或进程执行函数,这个方法的返回值是一个Future对象,意味着可以用await来切换它; 4 挂起的task中不包含前面的阻塞函数,并且这个方法只返回task对象,绝对不会返回Future对象。
绑定回调
绑定回调,在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值。如果回调需要多个参数,可以通过偏函数导入
import time import asyncio now = lambda : time.time() async def do_some_work(x): print('Waiting: ', x) return 'Done after {}s'.format(x) def callback(future):
回调函数 |