Mrli
别装作很努力,
因为结局不会陪你演戏。
Contacts:
QQ博客园

toy reimplementation of an event loop in Python[翻译]

2022/05/09 Python 翻译 并发
Word count: 6,277 | Reading time: 31min

事件循环的一个小故事

Translated from: https://github.com/AndreLouisCaron/a-tale-of-event-loops

我最近被伯克利大学的研究员Nathaniel J. Smith关于 asyncio 的惊人见解所困。他的文章《关于后async/await世界中异步API设计的一些想法》( Some thoughts on asynchronous API design in a post-async/await world) 使得我 在最近大量使用asyncio后,对它有了一些(略)复杂的感受。

虽然这或多或少是他文章的重点,但他认为 curio 的实现比asyncio简单得多,因为它预先假定你有一个实现PEP 492asyncawait关键字)的Python版本。

我很感兴趣,看了一下curio的源代码,…由于它获得了很多功能(有希望使它更接近于一个可生产的库),我认为它现在已经拥有足够的功能,主要的本质已经被淡化。这绝对不是一件坏事,但如果是像我一样,想了解Nathaniel在说什么,并学习一些整洁的Python技巧的话,阅读curio的源代码几乎不是一个好的起点。

Python核心开发人员Brett Cannon在他的帖子《How the heck does async/await work in Python 3.5?》中提供了一些关于如何使用协程对象(例如,如果你正在实现事件循环)非常好的见解,他的重点是实现细节,至少对我来说,要摸清coroutine还是缺少一小块。

TL; DR: 这是我试图抓住curio所基于的核心基本原理的尝试。希望它能成为熟悉curio源代码的一个好的垫脚石。

让我们开始吧 😃

协程的本质

首先,让我们来看看当调用一个协程时会发生什么。

提示:你得到一个 “coroutine对象”,它有一个.send()和一个.throw()方法,就像生成器对象那样。

1
2
3
4
5
6
7
8
9
10
11
12
>>> async def hello(name):
... print('Hello, %s!' % (name,))
>>>
>>> coro = hello('world')
>>> type(coro)
<class 'coroutine'>
>>> type(coro.send)
<class 'builtin_function_or_method'>
>>> type(coro.throw)
<class 'builtin_function_or_method'>
>>>
...

当然,你通常不会使用这些方法,因为它们隐藏在await coroasyncio.get_event_loop().run_until_complete()后面,但既然我们想研究它的工作原理… 😃

请注意,上面的代码从来没有打印过我们的 "Hello, world!"信息。这是因为我们从未在coroutine函数中实际执行过语句——我们只是创建了coroutine对象。事实上,如果你在解释器中实际运行这段代码,你会得到一个警告,说明我们的hello coroutine从未完成。

如果你想执行coroutine函数,你需要以某种方式安排它。为了做到这一点,我们将调用.send()方法。

1
2
3
4
5
6
7
8
9
>>> async def hello(name):
... print('Hello, %s!' % (name,))
>>>
>>> task = hello('world')
>>> try:
... task.send(None)
... except StopIteration:
... pass
Hello, world!

就像生成器对象一样,一个协程对象的.send()方法在协程结束时引发StopIteration事件。

我们将在后面看到,我们可以使用.send()方法在第二次、第三次或第N次调用.send()方法重新开始时将信息传递给coroutine。

如果该循环程序没有正常返回,而是出现了异常,那么该异常将通过.send()传播回来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> class HelloException(Exception):
... def __init__(self, name):
... self._name = name
... def __str__(self):
... return 'Hello, %s!' % (self._name,)
>>>
>>> async def hello(name):
... raise HelloException(name)
>>>
>>> task = hello('world')
>>> try:
... task.send(None)
... except Exception as error:
... # NEW: exception will be propagated here!
... print(error)
Hello, world!

我还提到了一个.throw()方法。和.send()一样,它恢复了协程,但它不是传递一个值,而是在协程中的暂停点引发一个异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> class Hello(Exception):
... def __init__(self, name):
... self._name = name
... def __str__(self):
... return 'Hello, %s!' % (self._name,)
>>>
>>> async def hello():
... pass
>>>
>>> task = hello()
>>> try:
... # NEW: inject exception.
... task.throw(Hello('world'))
... except Exception as error:
... print(error)
Hello, world!

在这一点上,你应该对这样一个事实感到满意:coroutine 对象与生成器对象非常非常相似,后者从 Python 2.2 (PEP 255) 开始就存在,并且从 Python 2.5 (PEP 342) 开始就有 .send().throw()方法。

与事件循环的对话

如果你仔细观察(或尝试),你会发现,与生成器函数相反,coroutine函数不能使用yield表达式。这就提出了(而不是 begs)一个问题:到底怎样才能让coroutine函数将控制权交还给调用.send()的代码?

答案是在一个awaitalbe对象上使用await。对于一个对象来说,它必须实现特殊的__await__()方法,以返回一个iterable对象。在实践中,这有点尴尬,所以在标准库中有一个@types.coroutine装饰器,允许你以一种类似@contextlib.contextmanager的风格来创建awaitable对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
>>> from types import coroutine
>>>
>>> # NEW: this is an awaitable object!
>>> @coroutine
... def nice():
... yield
>>>
>>> async def hello(name):
... # NEW: this makes ``.send()`` return!
... await nice()
... print('Hello, %s!' % (name,))
>>>
>>> task = hello('world')
>>> # NEW: call send twice!
>>> task.send(None)
>>> try:
... task.send(None)
... except StopIteration:
... pass
Hello, world!

当然,我们的nice()对象是非常无用的。别着急,我们很快就会做一些更有用的事情。

Looping

我们前面的例子正好调用.send()两次,因为它知道hello()只产生一次控制。当我们不知道时(常见的情况),我们需要把它放在一个循环中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
>>> from types import coroutine
>>>
>>> @coroutine
... def nice():
... yield
>>>
>>> async def hello(name):
... # NEW: yield many times!
... for i in range(5):
... await nice()
... print('Hello, %s!' % (name,))
>>>
>>> task = hello('world')
>>> try:
... # NEW: loop!
... while True:
... task.send(None)
... except StopIteration:
... pass
Hello, world!

我们逐渐开始得到了一些最简单的实现,其类似于asyncio.get_event_loop().run_until_complete()。所以我们让它在语法上更加相似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
>>> from types import coroutine
>>>
>>> @coroutine
... def nice():
... yield
>>>
>>> async def hello(name):
... for i in range(5):
... await nice()
... print('Hello, %s!' % (name,))
>>>
>>> # NEW: now a reusable function!
>>> def run_until_complete(task):
... try:
... while True:
... task.send(None)
... except StopIteration:
... pass
>>>
>>> # NEW: call it as a function!
>>> run_until_complete(hello('world'))
Hello, world!

生成子任务

现在我们已经有了一个可以完成单个任务的事件循环,我们可能想开始做一些有用的事情。我们期望做许多不同的事情,但由于这是关于并发的,让我们从允许创建子任务开始。

我们在这里需要做的主要事情是引入一个新的原语spawn(),用于安排新的子任务。一旦任务被安排好,我们要把控制权返回给父任务,这样它就可以继续前进了。

注意:这个例子是故意不完整的。我们以后会看到如何join任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
>>> from inspect import iscoroutine
>>> from types import coroutine
>>>
>>> # NEW: awaitable object that sends a request to launch a child task!
>>> @coroutine
... def spawn(task):
... yield task
>>>
>>> async def hello(name):
... await nice()
... print('Hello, %s!' % (name,))
>>>
>>> # NEW: parent task!
>>> async def main():
... # NEW: create a child task!
... await spawn(hello('world'))
>>>
>>> def run_until_complete(task):
... # NEW: schedule the "root" task.
... tasks = [(task, None)]
... while tasks:
... # NEW: round-robin between a set tasks (we may now have more
... # than one and we'd like to be as "fair" as possible).
... queue, tasks = tasks, []
... for task, data in queue:
... # NEW: resume the task *once*.
... try:
... data = task.send(data)
... except StopIteration:
... pass
... except Exception as error:
... # NEW: prevent crashed task from ending the loop.
... print(error)
... else:
... # NEW: schedule the child task.
... if iscoroutine(data):
... tasks.append((data, None))
... # NEW: reschedule the parent task.
... tasks.append((task, None))
>>>
>>> run_until_complete(main())
Hello, world!

! 这比我们之前版本的run_until_complete()要复杂得多。这些都是怎么来的?

好吧…现在我们可以运行多个任务了,我们需要担心的事情包括:

  1. 等待所有的子任务完成(递归),尽管任何任务都有错误
  2. 在任务之间交替进行,让所有的任务同时完成

注意,我们现在有一个嵌套循环。

  • 外循环负责检查我们是否完成了任务
  • 内循环负责处理一次调度器的 “tick”(倒计时完成)。

还有其他方法可以做到这一点,而且我们为什么要这样做可能不是很明显,但这很重要,因为事件循环中还缺少两个关键部分:定时器和I/O。当我们以后增加对这些的支持时,我们也需要以一种 "公平 "的方式来安排内部检查。外循环为我们提供了一个检查计时器和轮询I/O操作状态的方便位置。

  1. 检查定时器,恢复已经过了延迟期的睡眠任务。
  2. 检查I/O操作,安排那些待定I/O操作已经完成的任务。
  3. 执行一次调度器的"tick",以恢复我们刚刚调度的所有任务。

简而言之,这就是基于coroutine的调度器循环的要点。

然而,在我们进入更复杂的定时器和I/O之前…还记得我在前面提到这个例子是故意不完整的吗?我们知道如何生成新的子任务,但我们还不知道如何等待它们完成。这是一个很好的机会来学习如何扩展,由我们的awaitable对象发送事件循环 "请求"的词汇表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
>>> from collections import defaultdict
>>> from types import coroutine
>>>
>>> @coroutine
... def nice():
... yield
>>>
>>> @coroutine
... def spawn(task):
... # NEW: recover the child task handle to pass it back to the parent.
... child = yield ('spawn', task)
... return child
>>>
>>> # NEW: awaitable object that sends a request to be notified when a
>>> # concurrent task completes.
>>> @coroutine
... def join(task):
... yield ('join', task)
>>>
>>> async def hello(name):
... await nice()
... print('Hello, %s!' % (name,))
>>>
>>> async def main():
... # NEW: recover the child task handle.
... child = await spawn(hello('world'))
... # NEW: wait for the child task to complete.
... await join(child)
... print('(after join)')
>>>
>>> def run_until_complete(task):
... tasks = [(task, None)]
... # NEW: keep track of tasks to resume when a task completes.
... watch = defaultdict(list)
... while tasks:
... queue, tasks = tasks, []
... for task, data in queue:
... try:
... data = task.send(data)
... except StopIteration:
... # NEW: wait up tasks waiting on this one.
... tasks.extend((t, None) for t in watch.pop(task, []))
... else:
... # NEW: dispatch request sent by awaitable object since
... # we now have 3 different types of requests.
... if data and data[0] == 'spawn':
... tasks.append((data[1], None))
... tasks.append((task, data[1]))
... elif data and data[0] == 'join':
... watch[data[1]].append(task)
... else:
... tasks.append((task, None))
>>>
>>> run_until_complete(main())
Hello, world!
(after join)

由于实际原因,我们可能希望有某种Task包装器用于coroutine对象。这对于暴露cancle的API和处理一些竞争情况是很方便的,比如子任务在父任务试图join()它之前就结束了(你能发现这个bug吗?)

将子任务的返回值作为await join() 的结果传回,并传播使子任务崩溃的异常,这些都是留给读者的练习。

Sleeping & timers

现在我们已经控制了任务调度,我们可以开始处理一些更高级的东西,比如定时器和I/O。I/O是最终的目标,但它牵涉到很多新的东西,所以我们先看看定时器。

如果你需要休眠Sleep,你不能只调用time.sleep(),因为你会阻塞所有的任务,而不仅仅是你想暂停的那个。

你现在可能已经发现了这个模式。我们将添加两样东西。

  1. 一种新的请求类型
  2. 一段基于task.send()返回值的调度代码。

我们还将添加一些bookKeeping以跟踪那些被暂停的任务。请记住,tasks是一个预定在下一个tick中运行的coroutine列表,但沉睡的任务在准备再次运行之前可能会跳过一个或多个tick。

请记住,休眠的任务不太可能按照先进先出的顺序重新安排,所以我们需要一些更进化的东西。维护计时器,最实用的方法(直到你允许取消它们)是使用一个优先级队列,感谢标准库的 heapq 模块使之超级简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
>>> from heapq import heappop, heappush
>>> from time import sleep as _sleep
>>> from timeit import default_timer
>>> from types import coroutine
>>>
>>> # NEW: we need to keep track of elapsed time.
>>> clock = default_timer
>>>
>>> # NEW: request that the event loop reschedule us "later".
>>> @coroutine
... def sleep(seconds):
... yield ('sleep', seconds)
>>>
>>> # NEW: verify elapsed time matches our request.
>>> async def hello(name):
... ref = clock()
... await sleep(3.0)
... now = clock()
... assert (now - ref) >= 3.0
... print('Hello, %s!' % (name,))
>>>
>>> def run_until_complete(task):
... tasks = [(task, None)]
...
... # NEW: keep track of tasks that are sleeping.
... timers = []
...
... # NEW: watch out, all tasks might be suspended at the same time.
... while tasks or timers:
...
... # NEW: if we have nothing to do for now, don't spin.
... if not tasks:
... _sleep(max(0.0, timers[0][0] - clock()))
...
... # NEW: schedule tasks when their timer has elapsed.
... while timers and timers[0][0] < clock():
... _, task = heappop(timers)
... tasks.append((task, None))
...
... queue, tasks = tasks, []
... for task, data in queue:
... try:
... data = task.send(data)
... except StopIteration:
... pass
... else:
... # NEW: set a timer and don't reschedule right away.
... if data and data[0] == 'sleep':
... heappush(timers, (clock() + data[1], task))
... else:
... tasks.append((task, None))
>>>
>>> run_until_complete(hello('world'))
Hello, world!

哇,我们真的掌握了这个窍门! 也许这个异步的东西毕竟没有那么难?

让我们看看我们能为I/O做什么!

处理I/O

现在我们已经经历了所有其他的事情,是时候进行终极对决了:I/O。

可伸缩的 I/O 通常使用native C的APIs 来实现 I/O 的复用。通常,这是 I/O 库中最困难的部分,但值得庆幸的是,Python 的 selectors 模块使其非常容易实现。

至于到目前为止我们添加的所有其他操作,我们将在事件循环中添加一些新的I/O请求和相应的请求处理程序。另外,像计时器一样,我们需要在每个调度器的开始阶段做一些内部检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
>>> from selectors import (
... DefaultSelector,
... EVENT_READ,
... EVENT_WRITE,
... )
>>> from socket import socketpair as _socketpair
>>> from types import coroutine
>>>
>>> # NEW: request that the event loop tell us when we can read.
>>> @coroutine
... def recv(stream, size):
... yield (EVENT_READ, stream)
... return stream.recv(size)
>>>
>>> # NEW: request that the event loop tell us when we can write.
>>> @coroutine
... def send(stream, data):
... while data:
... yield (EVENT_WRITE, stream)
... size = stream.send(data)
... data = data[size:]
>>>
>>> # NEW: connect sockets, make sure they never, ever block the loop.
>>> @coroutine
... def socketpair():
... lhs, rhs = _socketpair()
... lhs.setblocking(False)
... rhs.setblocking(False)
... yield
... return lhs, rhs
>>>
>>> # NEW: send a message through the socket pair.
>>> async def hello(name):
... lhs, rhs = await socketpair()
... await send(lhs, 'Hello, world!'.encode('utf-8'))
... data = await recv(rhs, 1024)
... print(data.decode('utf-8'))
... lhs.close()
... rhs.close()
>>>
>>> def run_until_complete(task):
... tasks = [(task, None)]
...
... # NEW: prepare for I/O multiplexing.
... selector = DefaultSelector()
...
... # NEW: watch out, all tasks might be suspended at the same time.
... while tasks or selector.get_map():
...
... # NEW: poll I/O operation status and resume tasks when ready.
... timeout = 0.0 if tasks else None
... for key, events in selector.select(timeout):
... tasks.append((key.data, None))
... selector.unregister(key.fileobj)
...
... queue, tasks = tasks, []
... for task, data in queue:
... try:
... data = task.send(data)
... except StopIteration:
... pass
... else:
... # NEW: register for I/O and suspend the task.
... if data and data[0] == EVENT_READ:
... selector.register(data[1], EVENT_READ, task)
... elif data and data[0] == EVENT_WRITE:
... selector.register(data[1], EVENT_WRITE, task)
... else:
... tasks.append((task, None))
>>>
>>> run_until_complete(hello('world'))
Hello, world!

取消任务

最后,但也是最重要的一块拼图是——取消任务,这就是我们利用coroutine对象的.throw()方法的地方。

由于取消任务有一个竞争条件(它可能与尝试取消任务同时完成),所以需要跟踪所有正在运行的任务,以了解它们的 “状态”。

否则,它就是任务生成和加入的简单扩展。

注意:这个实现是故意不完整的。它没有正确处理取消一个已经安排在下一个tick中运行的任务的可能性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
>>> from inspect import iscoroutine
>>> from types import coroutine
>>>
>>> @coroutine
... def spawn(task):
... task = yield ('spawn', task)
... return task
>>>
>>> @coroutine
... def join(task):
... yield ('join', task)
>>>
>>> # NEW: exception to be raised inside tasks when they are cancelled.
>>> class CancelledError(Exception):
... pass
>>>
>>> # NEW: request that CancelledError be raised inside the task.
>>> @coroutine
... def cancel(task):
... cancelled = yield ('cancel', task, CancelledError())
... assert cancelled is True
>>>
>>> # NEW: pause the task without plans to reschedule it (this is simply to
>>> # guarantee execution order in this demo).
>>> @coroutine
... def suspend():
... yield ('suspend',)
>>>
>>> async def hello(name):
... try:
... await suspend()
... except CancelledError:
... print('Hello, %s!' % (name,))
... raise
>>>
>>> # NEW: spawn a task and then cancel it.
>>> async def main():
... child = await spawn(hello('world'))
... await cancel(child)
... await join(child)
>>>
>>> def run_until_complete(task):
... tasks = [(task, None)]
... watch = defaultdict(list)
...
... # NEW: keep track of all tasks in the tree.
... tree = {task}
...
... while tasks:
... queue, tasks = tasks, []
... for task, data in queue:
... try:
... # NEW: we may need to pass data or inject an exception.
... if isinstance(data, Exception):
... data = task.throw(data)
... else:
... data = task.send(data)
... except (StopIteration, CancelledError):
... tasks.extend((t, None) for t in watch.pop(task, []))
... # NEW: update bookkeeping.
... tree.discard(task)
... else:
... if data and data[0] == 'spawn':
... tasks.append((data[1], None))
... tasks.append((task, data[1]))
... # NEW: update bookkeeping.
... tree.add(data[1])
... elif data and data[0] == 'join':
... watch[data[1]].append(task)
... elif data and data[0] == 'cancel':
... # NEW: schedule to raise the exception in the task.
... if data[1] in tree:
... tasks.append((data[1], data[2]))
... tasks.append((task, True))
... else:
... tasks.append((task, False))
... elif data and data[0] == 'suspend':
... pass
... else:
... tasks.append((task, None))
>>>
>>> run_until_complete(main())
Hello, world!

许可证

本文件的版权归Andre Caron andre.l.caron@gmail.com所有,并在知识共享 CC-BY-SA 许可下向您提供。

个人收获

一个异步框架通常主要包括事件循环、事件队列、polling、timer队列,所有的异步框架皆不例外,asyncio也是如此。本文的toy event loop也是如此,在自己实现的时候就是关注这几个部分。

文中的实验都表明了,协程的send()throw()awaitsend()throw()yield使用都类似。

个人实验

针对与事件循环的对话的实验代码,我又增加了一些内容看效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from types import coroutine


# NEW: this is an awaitable object!
@coroutine
def nice():
print('before yield')
yield
print("after yield")


async def hello(name):
# NEW: this makes ``.send()`` return!
await nice()
print('Hello, %s!' % (name,))


task = hello('world')

# NEW: call send twice!
task.send(None)
print("after first sned")
try:
task.send(None)
except StopIteration:
pass
"""
before yield
after first sned
after yield
Hello, world!
"""

通过输出我们可以发现, task.send()会在nice()yield处停下来。@coroutine的注解,让nice()函数成为了awaitalbe对象,从而可以await nice()调用。

针对生成子任务,发现main协程结束的比hello协程早。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from inspect import iscoroutine
from types import coroutine


@coroutine
def nice():
print("before nice")
yield
print("after nice")


# NEW: awaitable object that sends a request to launch a child task!
@coroutine
def spawn(task):
print("before spawn")
yield task
print("after spawn")


async def hello(name):
print("hello: before await")
await nice()
print('Hello, %s!' % (name,))


# NEW: parent task!
async def main():
print("main: before")
# NEW: create a child task!
await spawn(hello('world'))
print("main: after")


def run_until_complete(task):
# NEW: schedule the "root" task.
tasks = [(task, None)]
while tasks:
# NEW: round-robin between a set tasks (we may now have more than one and we'd like to be as "fair" as possible).
# 新:一组任务之间的循环(我们现在可能有多个任务,我们希望尽可能“公平”)。
queue, tasks = tasks, []
for task, data in queue:
# NEW: resume the task *once*.
try:
data = task.send(data)
except StopIteration:
pass
except Exception as error:
# NEW: prevent crashed task from ending the loop.
print(error)
else:
# NEW: schedule the child task.
if iscoroutine(data):
tasks.append((data, None))
# NEW: reschedule the parent task.
tasks.append((task, None))


run_until_complete(main())
"""
main: before
before spawn
hello: before await
before nice
after spawn
main: after
after nice
Hello, world!
"""

等待子任务join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from collections import defaultdict
from types import coroutine


@coroutine
def nice():
print("nice: before yield")
yield
print("nice: after yield")


@coroutine
def spawn(task):
# NEW: recover the child task handle to pass it back to the parent.
print("spawn: before yield")
child = yield ('spawn', task)
print("spawn: after yield")
return child


# NEW: awaitable object that sends a request to be notified when a
# concurrent task completes.
@coroutine
def join(task):
print("join: before yield")
yield ('join', task)
print("join: after yield")

async def hello(name):
print("hello: before nice()")
await nice()
print('Hello, %s!' % (name,))


async def main():
# NEW: recover the child task handle.
child = await spawn(hello('world'))
print("main: after spawn and before join()")
# NEW: wait for the child task to complete.
# 等待子协程完成
await join(child)
print('(after join)')


def run_until_complete(task):
tasks = [(task, None)]
# NEW: keep track of tasks to resume when a task completes.
watch = defaultdict(list)
while tasks:
queue, tasks = tasks, []
for task, data in queue:
try:
data = task.send(data)
except StopIteration:
# NEW: wait up tasks waiting on this one.
# 子协程退出时, 唤醒被join阻塞的父协程
# 对watch中的task进行pop, 如果没有则返回[]
tasks.extend((t, None) for t in watch.pop(task, []))
else:
# NEW: dispatch request sent by awaitable object since we now have 3 different types of requests.
# NEW: 由等待对象发送的调度请求,因为我们现在有 3 种不同类型的请求
# 1. 如果是会产生新协程的操作
if data and data[0] == 'spawn':
# 将新协程加入
tasks.append((data[1], None))
# 将父协程和新协程写入, 在task.send(data)中可以使得父协程中赋值child=yield epx为子协程值
tasks.append((task, data[1]))
elif data and data[0] == 'join':
# 如果父协程执行的快, 那么把子协程与父协程的绑定关系记录下来
watch[data[1]].append(task)
else:
# 正常协程执行结果
tasks.append((task, None))


run_until_complete(main())

"""
spawn: before yield
hello: before nice()
nice: before yield
spawn: after yield
main: after spawn and before join()
--- 此时main执行完成, join等待nice()被执行
join: before yield
nice: after yield
--- nice执行完成
Hello, world!
--- hello执行完成
join: after yield
--- join结束
(after join)
"""

多了两样东西:

  • 新的请求类型:
    • spawn
    • join
    • push
  • 基于task.send()返回值的调度代码。

针对[Sleeping & timers](#Sleeping & timers)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from heapq import heappop, heappush
from time import sleep as _sleep
from timeit import default_timer
from types import coroutine

# NEW: we need to keep track of elapsed time.
clock = default_timer

# NEW: request that the event loop reschedule us "later".
@coroutine
def sleep(seconds):
yield ('sleep', seconds)

# NEW: verify elapsed time matches our request.
async def hello(name):
ref = clock()
await sleep(3.0)
now = clock()
assert (now - ref) >= 3.0
print('Hello, %s!' % (name,))

def run_until_complete(task):
tasks = [(task, None)]

# NEW: keep track of tasks that are sleeping.
timers = []

# NEW: watch out, all tasks might be suspended at the same time.
while tasks or timers:

# NEW: if we have nothing to do for now, don't spin.
if not tasks:
_sleep(max(0.0, timers[0][0] - clock()))

# NEW: schedule tasks when their timer has elapsed.
# 如果times队列中有任务, 且最近的时间已经到了, 则取出任务
while timers and timers[0][0] < clock():
_, task = heappop(timers)
tasks.append((task, None))

queue, tasks = tasks, []
for task, data in queue:
try:
data = task.send(data)
except StopIteration:
pass
else:
# NEW: set a timer and don't reschedule right away.
if data and data[0] == 'sleep':
# 往堆里面添加任务, 默认以时间最近的在上(最小堆)
heappush(timers, (clock() + data[1], task))
else:
# 递归添加下一次要做的协程任务
tasks.append((task, None))

run_until_complete(hello('world'))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from selectors import (
DefaultSelector,
EVENT_READ,
EVENT_WRITE,
)
from socket import socketpair as _socketpair
from types import coroutine


# NEW: request that the event loop tell us when we can read.
@coroutine
def recv(stream, size):
yield (EVENT_READ, stream)
return stream.recv(size)


# NEW: request that the event loop tell us when we can write.
@coroutine
def send(stream, data):
while data:
yield (EVENT_WRITE, stream)
size = stream.send(data)
data = data[size:]


# NEW: connect sockets, make sure they never, ever block the loop.
@coroutine
def socketpair():
lhs, rhs = _socketpair()
lhs.setblocking(False)
rhs.setblocking(False)
yield
return lhs, rhs


# NEW: send a message through the socket pair.
async def hello(name):
lhs, rhs = await socketpair()
await send(lhs, 'Hello, world!'.encode('utf-8'))
data = await recv(rhs, 1024)
print(data.decode('utf-8'))
lhs.close()
rhs.close()


def run_until_complete(task):
tasks = [(task, None)]

# NEW: prepare for I/O multiplexing.
selector = DefaultSelector()

# NEW: watch out, all tasks might be suspended at the same time.
while tasks or selector.get_map():

# NEW: poll I/O operation status and resume tasks when ready.
timeout = 0.0 if tasks else None
for key, events in selector.select(timeout):
tasks.append((key.data, None))
selector.unregister(key.fileobj)

queue, tasks = tasks, []
for task, data in queue:
try:
data = task.send(data)
except StopIteration:
pass
else:
# NEW: register for I/O and suspend the task.
if data and data[0] == EVENT_READ:
selector.register(data[1], EVENT_READ, task)
elif data and data[0] == EVENT_WRITE:
selector.register(data[1], EVENT_WRITE, task)
else:
tasks.append((task, None))


run_until_complete(hello('world'))

Q: socketpair是什么?

  • socket.socketpair()函数仅返回两个已经连接的套接字对象,参数和socket.socket()里的参数一样的用法。
  • socket.socketpair()可以理解为 创建了两个socket, 比喻为一个server的 socket,一个client的socket,这两个socket是已经connected连接状态
  • socket.socketpair()`是全双工模式,也就是每个socket都能收发,比喻为\server.send—>client.recv,和 client.send—>server.recv
  • socket.socketpair()`默认是创建unix套接字

总结:

  • 函数加上async def变成了coroutine object
  • 函数加上@types.coroutine变成了awaitable的function

Author: Mrli

Link: https://nymrli.top/2022/05/08/toy-reimplementation-of-an-event-loop-in-Python-翻译/

Copyright: All articles in this blog are licensed under CC BY-NC-SA 3.0 unless stating additionally.

< PreviousPost
《代码整洁之道》——阅读笔记
NextPost >
selenium不行的工作,pyppeteer上
CATALOG
  1. 1. 事件循环的一个小故事
    1. 1.1. 协程的本质
    2. 1.2. 与事件循环的对话
    3. 1.3. Looping
    4. 1.4. 生成子任务
    5. 1.5. Sleeping & timers
    6. 1.6. 处理I/O
    7. 1.7. 取消任务
    8. 1.8. 许可证
  2. 2. 个人收获
  3. 3. 个人实验