Python 协程与异步 IO¶
python 一直在进行并发编程的优化, 比较熟知的是使用 thread 模块多线程和 multiprocessing 多进程,后来慢慢引入基于 yield 关键字的协程。
对于 IO 密集型任务我们有一种选择就是协程。协程,又称微线程,英文名 Coroutine,是运行在单线程中的“并发”,协程相比多线程的一大优势就是省去了多线程之间的切换开销,获得了更高的运行效率。Python 中的异步 IO 模块 asyncio 就是基本的协程模块。
Python 中的协程经历了很长的一段发展历程。最初的生成器 yield 和 send() 语法,然后在Python3.4 中加入了 asyncio 模块,引入 @asyncio.coroutine 装饰器和 yield from 语法,在 Python3.5 上又提供了 async/await 语法,目前正式发布的 Python3.6 中 asynico 也由临时版改为了稳定版。
yield 是迭代器,yield + send 是协程。此时 yield 语句不再只是 yield xxxx 的形式,还可以是 var = yield xxxx 的赋值形式。它同时具备两个功能,一是暂停并返回函数,二是用 var 接收外部 send() 方法发送过来的值,重新激活函数。
协程¶
协程的切换不同于线程切换,是由程序自身控制的,没有切换的开销。协程不需要多线程的锁机制,因为都是在同一个线程中运行,所以没有同时访问数据的问题,执行效率比多线程高很多。
因为协程是单线程执行,那怎么利用多核 CPU 呢?最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
进程/线程:操作系统提供的一种并发处理任务的能力。
协程:程序员通过高超的代码能力,在代码执行流程中人为的实现多任务并发,是单个线程内的任务调度技巧。
多进程和多线程体现的是操作系统的能力,而协程体现的是程序员的流程控制能力。
因为 send() 方法的参数会成为暂停的 yield 表达式的值,所以,仅当协程处于暂停状态时才能调用 send() 方法,例如 my_coro.send(10)。不过,如果协程还没激活(状态是’GEN_CREATED’),就立即把 None 之外的值发给它,会出现 TypeError。因此,始终要先调用next(my_coro) 激活协程(也可以调用 my_coro.send(None)),这一过程被称作预激活。
@asyncio.coroutine 与 yield from¶
yield from range(10)
等价于 for i in range(10): yield i
yield from
其实就是等待另外一个协程的返回
import asyncio
import datetime
@asyncio.coroutine # 声明一个协程
def display_date(num, loop):
end_time = loop.time() + 10.0
while True:
print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
if (loop.time() + 1.0) >= end_time:
break
yield from asyncio.sleep(2) # 阻塞直到协程 sleep(2) 返回结果
loop = asyncio.get_event_loop() # 获取一个event_loop
tasks = [display_date(1, loop), display_date(2, loop)]
loop.run_until_complete(asyncio.gather(*tasks)) # "阻塞"直到所有的 tasks 完成
loop.close()
async 和 await¶
推荐使用 Python3.5 中对协程提供了更直接的支持,引入了 async/await 关键字。上面的代码可以这样改写:使用 async 代替@asyncio.coroutine,使用 await 代替 yield from,代码变得更加简洁可读。从 Python 设计的角度来说,async/await 让协程独立于生成器而存在,不再使用 yield 语法。
import asyncio
import datetime
async def display_date(num, loop): # 注意这一行的写法
end_time = loop.time() + 10.0
while True:
print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(2) # 阻塞直到协程 sleep(2) 返回结果
loop = asyncio.get_event_loop() # 获取一个 event_loop
tasks = [display_date(1, loop), display_date(2, loop)]
loop.run_until_complete(asyncio.gather(*tasks)) # "阻塞"直到所有的 tasks 完成
asyncio 模块¶
asyncio 的使用可分三步走
创建事件循环
指定循环模式并运行
关闭循环
通常我们使用 asyncio.get_event_loop() 方法创建一个循环。
运行循环有两种方法:一是调用 run_until_complete() 方法,二是调用 run_forever() 方法。run_until_complete() 内置 add_done_callback 回调函数,run_forever() 则可以自定义 add_done_callback(),具体差异请看下面两个例子。
使用 run_until_complete() 方法:
import asyncio
async def func(future):
await asyncio.sleep(1)
future.set_result('Future is done!')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(func(future))
print(loop.is_running()) # 查看当前状态时循环是否已经启动
loop.run_until_complete(future)
print(future.result())
loop.close()
使用 run_forever() 方法:
import asyncio
async def func(future):
await asyncio.sleep(1)
future.set_result('Future is done!')
def call_result(future):
print(future.result())
loop.stop()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(func(future))
future.add_done_callback(call_result) # 注意这行
try:
loop.run_forever()
finally:
loop.close()