# NEW: this is an awaitable object! @coroutine defnice(): print('before yield') yield print("after yield")
asyncdefhello(name): # NEW: this makes ``.send()`` return! await nice() print('Hello, %s!' % (name,))
task = hello('world')
# NEW: call send twice! task.send(None) print("after first sned") try: task.send(None) except StopIteration: pass """ before yield after first sned after yield Hello, world! """
defrun_until_complete(task): # NEW: schedule the "root" task. tasks = [(task, None)] while tasks: # NEW: round-robin between a set tasks (we may now have more than one and we'd like to be as "fair" as possible). # 新:一组任务之间的循环(我们现在可能有多个任务,我们希望尽可能“公平”)。 queue, tasks = tasks, [] for task, data in queue: # NEW: resume the task *once*. try: data = task.send(data) except StopIteration: pass except Exception as error: # NEW: prevent crashed task from ending the loop. print(error) else: # NEW: schedule the child task. if iscoroutine(data): tasks.append((data, None)) # NEW: reschedule the parent task. tasks.append((task, None))
run_until_complete(main()) """ main: before before spawn hello: before await before nice after spawn main: after after nice Hello, world! """
from collections import defaultdict from types import coroutine
@coroutine defnice(): print("nice: before yield") yield print("nice: after yield")
@coroutine defspawn(task): # NEW: recover the child task handle to pass it back to the parent. print("spawn: before yield") child = yield ('spawn', task) print("spawn: after yield") return child
# NEW: awaitable object that sends a request to be notified when a # concurrent task completes. @coroutine defjoin(task): print("join: before yield") yield ('join', task) print("join: after yield")
asyncdefhello(name): print("hello: before nice()") await nice() print('Hello, %s!' % (name,))
asyncdefmain(): # NEW: recover the child task handle. child = await spawn(hello('world')) print("main: after spawn and before join()") # NEW: wait for the child task to complete. # 等待子协程完成 await join(child) print('(after join)')
defrun_until_complete(task): tasks = [(task, None)] # NEW: keep track of tasks to resume when a task completes. watch = defaultdict(list) while tasks: queue, tasks = tasks, [] for task, data in queue: try: data = task.send(data) except StopIteration: # NEW: wait up tasks waiting on this one. # 子协程退出时, 唤醒被join阻塞的父协程 # 对watch中的task进行pop, 如果没有则返回[] tasks.extend((t, None) for t in watch.pop(task, [])) else: # NEW: dispatch request sent by awaitable object since we now have 3 different types of requests. # NEW: 由等待对象发送的调度请求,因为我们现在有 3 种不同类型的请求 # 1. 如果是会产生新协程的操作 if data and data[0] == 'spawn': # 将新协程加入 tasks.append((data[1], None)) # 将父协程和新协程写入, 在task.send(data)中可以使得父协程中赋值child=yield epx为子协程值 tasks.append((task, data[1])) elif data and data[0] == 'join': # 如果父协程执行的快, 那么把子协程与父协程的绑定关系记录下来 watch[data[1]].append(task) else: # 正常协程执行结果 tasks.append((task, None))
run_until_complete(main())
""" spawn: before yield hello: before nice() nice: before yield spawn: after yield main: after spawn and before join() --- 此时main执行完成, join等待nice()被执行 join: before yield nice: after yield --- nice执行完成 Hello, world! --- hello执行完成 join: after yield --- join结束 (after join) """
# NEW: keep track of tasks that are sleeping. timers = []
# NEW: watch out, all tasks might be suspended at the same time. while tasks or timers:
# NEW: if we have nothing to do for now, don't spin. ifnot tasks: _sleep(max(0.0, timers[0][0] - clock()))
# NEW: schedule tasks when their timer has elapsed. # 如果times队列中有任务, 且最近的时间已经到了, 则取出任务 while timers and timers[0][0] < clock(): _, task = heappop(timers) tasks.append((task, None))
queue, tasks = tasks, [] for task, data in queue: try: data = task.send(data) except StopIteration: pass else: # NEW: set a timer and don't reschedule right away. if data and data[0] == 'sleep': # 往堆里面添加任务, 默认以时间最近的在上(最小堆) heappush(timers, (clock() + data[1], task)) else: # 递归添加下一次要做的协程任务 tasks.append((task, None))
from selectors import ( DefaultSelector, EVENT_READ, EVENT_WRITE, ) from socket import socketpair as _socketpair from types import coroutine
# NEW: request that the event loop tell us when we can read. @coroutine defrecv(stream, size): yield (EVENT_READ, stream) return stream.recv(size)
# NEW: request that the event loop tell us when we can write. @coroutine defsend(stream, data): while data: yield (EVENT_WRITE, stream) size = stream.send(data) data = data[size:]
# NEW: connect sockets, make sure they never, ever block the loop. @coroutine defsocketpair(): lhs, rhs = _socketpair() lhs.setblocking(False) rhs.setblocking(False) yield return lhs, rhs
# NEW: send a message through the socket pair. asyncdefhello(name): lhs, rhs = await socketpair() await send(lhs, 'Hello, world!'.encode('utf-8')) data = await recv(rhs, 1024) print(data.decode('utf-8')) lhs.close() rhs.close()
# NEW: prepare for I/O multiplexing. selector = DefaultSelector()
# NEW: watch out, all tasks might be suspended at the same time. while tasks or selector.get_map():
# NEW: poll I/O operation status and resume tasks when ready. timeout = 0.0if tasks elseNone for key, events in selector.select(timeout): tasks.append((key.data, None)) selector.unregister(key.fileobj)
queue, tasks = tasks, [] for task, data in queue: try: data = task.send(data) except StopIteration: pass else: # NEW: register for I/O and suspend the task. if data and data[0] == EVENT_READ: selector.register(data[1], EVENT_READ, task) elif data and data[0] == EVENT_WRITE: selector.register(data[1], EVENT_WRITE, task) else: tasks.append((task, None))