Python中的协程

什么是协程

协程是指一个过程,这个过程与调用方协作,产出由调用方提供的值。

协程是程序可以控制的,可以在内部中断。

生成器与协程

从句法上看,协程与生成器类似,都是定义体中包含 yield 关键字的函数。但是在协程中, yield 通常会出现在表达式的右边,例如 value = yeild ,可以选择是否产出值,如果 yield 后面没有表达式,那么生成器产出 None

协程通常包含着协程本身与调用方的数据交互,因此协程可能会从调用方接收数据,不过调用方把数据提供给协程的方式是通过 coroutine.send(value) 方法,而不是 next(coroutinue) 函数。除了 .send(value) 方法之外,还有 .throw(Exception).close() 方法:前者的作用是让调用方抛出异常,在生成器中处理;后者的作用是终止生成器。

Python 中协程的使用方式

本文使用的 Python 环境是 Python3.7.1

有了上面的知识,可以写出我们的第一个使用协程的简单例子。

Example 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> def my_coroutine():
... print('start')
... value = yield # 这里 value 会接收协程调用方使用 `.send()` 发送的值
... print(f'received {value}')
...
>>> my_coro = my_coroutine()
>>> my_coro
<generator object my_coroutine at 0x10165bc78>
>>> next(my_coro) # 预激协程
start
>>> my_coro.send("233") # 将值传给协程
received 233
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

协程的状态

协程具有四个状态,分别是:

  1. GEN_CREATED 等待开始执行。
  2. GEN_RUNNING 解释器正在执行。
  3. GEN_SUSPENDEDyield 表达式处暂停。
  4. GEN_CLOSED 执行结束。

要获取协程的状态可以通过 inspect.getgeneratorstate(coroutine) 方法获取。

一开始的时候,协程还处于未激活状态 GEN_CREATED,这时需要使用 next(coroutine) 或者 coroutine.send(None) 方法激活协程。这一步通常叫做 预激(prime) 协程(即让协程向前执行到第一个 yield 表达式,准备好作为活跃的协程使用)。

Tips:如果没有预激协程,那么会抛出一个异常,如下:

Example 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> def my_coroutine():
... print('start')
... import time
... time.sleep(5)
... x = yield
... print(f'end -> {x}')
...
>>> coro = my_coroutine()
>>> import inspect
>>> print(inspect.getgeneratorstate(coro))
GEN_CREATED
>>> coro.send("233")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: can't send non-None value to a just-started generator

由于 .send() 方法的参数会成为暂停的 yield 表达式的值,所以仅当协程处于暂停状态时才可以调用 sned 方法,换句话说,调用方在使用 .send() 方法的时候可能会阻塞主程序的运行。例如我们尝试在协程中加上 sleep()

Example 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
>>> def my_coroutine():
... print('start')
... import time
... time.sleep(5) # 这里会阻塞主程序 5s
... print('sleep 5 ok')
... x = yield
... print(f'end -> {x}')
...
>>>
>>> coro = my_coroutine()
>>> print(coro)
<generator object my_coroutine at 0x10a0482a0>
>>> print(inspect.getgeneratorstate(coro))
GEN_CREATED
>>> next(coro)
start
sleep 5 ok # 5s 后输出

Eample 4 产出多个值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
>>> def my_coroutine2(a):
... print(f'start: a = {a}')
... b = yield a
... print(f'b = {b}')
... c = yield a + b
... print(f'c = {c}')
...
>>>
>>> coro = my_coroutine2(1)
>>> print(inspect.getgeneratorstate(coro))
GEN_CREATED
>>> next(coro)
start: a = 1
1
>>> print(inspect.getgeneratorstate(coro))
GEN_SUSPENDED
>>> coro.send(3) # 把数值 3 发给协程,b 被赋值为 3,计算 `a + b`,得到 4, 产出 `a + b` 的值
b = 3
4
>>> coro.send(5)
c = 5
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
>>>
>>> print(inspect.getgeneratorstate(coro))
GEN_CLOSED

my_coroutine2() 的执行分为三个阶段:

  1. 调用 next(coro),打印第一个消息,然后执行 yield a,产出 1.
  2. 调用 coro.send(3),把值 3 赋予 b,打印第二个消息,然后执行 yield a + b, 产出 4.
  3. 调用 coro.send(5),把值 5 赋予 c,打印第三个消息,协程终止。

执行流程

协程的终止与异常处理

协程的终止可以调用 coroutine.close() 方法。close() 是会让生成器在暂停的 yield 表达式处抛出 GeneratorExit 异常。如果生成器没有处理这个异常,或者抛出了 StopIteration 异常(通常指运行到结尾),调用方不会报错。

要在协程中抛出异常可以调用 coroutine.throw(...) 方法。throw() 会使生成器在暂停的 yield 表达式处抛出指定异常。如果生成器处理了抛出的异常,代码会向前执行到下一个 yield 表达式,而产出的值会成为调用 throw() 方法得到的返回值。

Example 5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
>>> class CustomException(Exception):
... pass
...
>>>
>>> def my_coroutine3():
... print('my coroutine3 start...')
... while True:
... try:
... value = yield
... except CustomException:
... print('Catch custom exception...')
... else:
... print(f'coroutine3 received value: {value}')
... print('coroutine3 terminated by unknown exception...')
...
>>>
>>> coro = my_coroutine3()
>>> next(coro)
my coroutine3 start...
>>> coro.send(5)
coroutine3 received value: 5
>>> coro.send(20)
coroutine3 received value: 20
>>> print(inspect.getgeneratorstate(coro))
GEN_SUSPENDED
>>> coro.throw(CustomException())
Catch custom exception...
>>> print(inspect.getgeneratorstate(coro))
GEN_SUSPENDED
>>> coro.close()
>>> print(inspect.getgeneratorstate(coro))
GEN_CLOSED

获取协程的返回值

Example 6 尝试在协程的最后添加 return 语句返回结果。

Example 6

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
>>> from collections import namedtuple
>>>
>>> Result = namedtuple('Result', 'count average')
>>>
>>> def averager():
... total = 0.0
... count = 0
... average = None
... while True:
... value = yield
... if value is None:
... break
... total += value
... count += 1
... average = total / count
... return Result(count, average)
...
>>> coro = averager()
>>> next(coro)
>>> coro.send(5)
>>> coro.send(20)
>>> coro.send(None)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration: Result(count=2, average=12.5)

可以看到协程 return 的值保存在了 StopIterationvalue 属性中。

于是我们可以修改 Example 6 得到 Example 7, 通过捕获异常去获取返回值:

Example 7

1
2
3
4
5
6
7
8
9
10
11
>>> coro = averager()
>>> next(coro)
>>> coro.send(5)
>>> coro.send(20)
>>> try:
... coro.send(None)
... except StopIteration as e:
... result = e.value
...
>>> print(result)
Result(count=2, average=12.5)

这样的程序的缺点很明显,即我们需要添加更多的异常处理。 yield from 可以解决这个问题。

yield from

yield from 会在内部自动捕获 StopIteration 异常,并把异常的 value 属性的值变成 yield from 表达式的值。

举个例子:现在我们有一个动态获取求一组数据的平均结果的需求。

不使用 yield from 的写法如 Example 8 所示。

Example 8 不使用 yield from

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def averager():
total = 0.0
count = 0
average = None
while True:
value = yield average
if value is None:
break
total += value
count += 1
average = total / count
return Result(count, average)


def main():
data = {
"A": [i for i in range(4, 7)],
"B": [i for i in range(3)],
}
results = {}
for key, values in data.items():
avg = averager()
next(avg) # 预激 group 协程
for value in values:
avg.send(value)
try:
avg.send(None)
except StopIteration as e: # catch exception
results[key] = e.value
print(results)

main()

===

▶ python3 test.py
{'A': Result(count=3, average=5.0), 'B': Result(count=3, average=1.0)}

使用 yield from 的代码如 Example 9 所示:

Example 9 使用 yield from

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from collections import namedtuple

Result = namedtuple('Result', 'count average')


def averager():
total = 0.0
count = 0
average = None
while True:
value = yield # value 的值是调用方 main() 中 send 过来的
if value is None:
break
total += value
count += 1
average = total / count
return Result(count, average)


def grouper(results, key):
while True: # Tag
results[key] = yield from averager()


def main():
data = {
"A": [i for i in range(50)],
"B": [i for i in range(100)],
}
results = {}
for key, values in data.items():
group = grouper(results, key)
next(group) # 预激 group 协程
for value in values:
group.send(value)
group.send(None) # 终止 averager,处理下一个 key 的 values
print(results)


main()

===

▶ python3 test.py
{'A': Result(count=3, average=5.0), 'B': Result(count=3, average=1.0)}

使用 yield from 会涉及到下面的术语:

  1. 委派生成器:包含 yield from <iterable> 表达式的生成器函数。即 grouper()
  2. 子生成器:从 yield from 表达式中 <iterable> 部分获取的生成器。即 averager()
  3. 调用方:调用委派生成器的客户端代码。即 main()

yield from 的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器链接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程添加大量处理异常的样板代码。

Example 9 示意图

Qustion: 为什么在 grouper() 里面需要加 while True 呢?

Answer:由于我们在最后 send(None) 的时候,averager() 直接 break 了,这时候没有再执行到 value = yieldyield 处,因此调用方 group.send(None) 拿不到子生成器中 yield 的值,会抛出 StopIteration 异常。我们需要让调用方 group.send(None) 能够拿到 yield 的结果,因此需要再进入子生成器 yield 产出结果给调用方 group.send(None)

当然 yield from 不只处理了 StopIteration 异常,它还会做一些其他操作,这里是 PEP 380 说明的 yield from 的行为:

  1. 子生成器产生的值都返回给委派生成器的调用方。
  2. 任何使用 send() 方法发送给委派生成器的值都直接传给子生成器。如果发送的值是 None ,那么会调用子生成器的 __next__() 方法。如果发送的值不是 None,那么会调用子生成器的 send() 方法。如果调用的方法抛出 StopIteration 异常,那么委派生成器恢复执行,其他任何异常都会向上传递给委派生成器。
  3. 除了 GeneratorExit 异常以外的其他传入委派生成器的异常,都会传给子生成器的 throw() 方法。如果调用 throw() 方法时抛出 StopIteration 异常,委派生成器恢复运行。 StopIteration 以外的异常都会向上传递给委派生成器。
  4. 如果 GeneratorExit 异常被抛给委派生成器,或者委派生成器的 close() 方法被调用,如果子生成器有 close() 的话也将被调用。如果 close() 调用产生异常,异常将传递给委派生成器。否则,委派生成器将抛出 GeneratorExit 异常。
  5. yield from 表达式的值是子生成器终止时传给 StopIteration 异常的第一个参数。
  6. 生成器退出时,生成器(或子生成器)中的 return expr 表达式会触发 StopIteration(expr) 异常抛出。

References

Fluent Python 本文几乎都是基于这本书的内容做的笔记。

Python Developer’s Guide PEP 380