教程 ¶
欢迎来到 Trio 教程!Trio 是一个现代 Python 库,用于编写异步应用程序——也就是说,想要同时进行多项操作、并行化 I/O 的程序,比如一个并行抓取大量网页的网页蜘蛛,或者处理大量同时下载的网页服务器……诸如此类。在这里,我们将尝试为您提供一个关于使用 Trio 进行异步编程的温和介绍。
我们假设您对 Python 有基本的了解,但请放心——我们并不假设您了解任何关于异步编程或 Python 的新 async/await
特性。
与许多 async/await
教程不同,我们假设你的目标是使用 Trio 编写有趣的程序,因此我们不会深入探讨 async/await
在 Python 解释器内部是如何实现的细节。关于“协程”这个词也从未提及。事实上,除非你想实现像 Trio 这样的库,否则你真的不需要了解这些内容,所以我们将其省略(尽管我们会提供一些链接供那些想深入了解的人参考)。
好的,准备好了吗?我们开始吧。
在开始之前 ¶
确保您使用的是 Python 3.8 或更高版本。
```python3 -m pip install --upgrade trio
(或者在 Windows 上可能是py -3 -m pip install --upgrade trio
- 详细信息) ```
你能import trio
吗?如果可以的话,那你就没问题了!
如果你迷路或感到困惑…
那么,我们想知道!我们有一个友好的聊天频道,您可以在 StackOverflow 上使用“python-trio”标签提问,或者直接提交一个 bug(如果我们的文档让人困惑,那是我们的问题,我们希望修复它!)。
异步函数 ¶
Python 3.5 增加了一个重大新特性:异步函数。使用 Trio 的全部精髓就在于编写异步函数,所以让我们从这里开始。
异步函数的定义与普通函数类似,只是你用 async def
代替 def
来写:
# A regular function
def regular_double(x):
return 2 * x
# An async function
async def async_double(x):
return 2 * x
“Async”是“异步”的缩写;我们有时会将像 regular_double
这样的常规函数称为“同步函数”,以区分它们与异步函数。
从用户的角度来看,异步函数和普通函数之间有两个区别:
""" 调用异步函数时,必须使用await
关键字。所以,你不需要写regular_double(3)
,而是写await async_double(3)
。 """
您不能在常规函数体内使用await
关键字。如果您尝试这样做,将会出现语法错误:def print_double(x): print(await async_double(x)) # <-- SyntaxError here
但异步函数内部,await
是允许的:async def print_double(x): print(await async_double(x)) # <-- OK!
现在,让我们来考虑一下这里的后果:如果你需要 await
来调用异步函数,而只有异步函数才能使用 await
……下面是一个小表格:
|
|
它会发生吗? |
---|---|---|
同步 |
同步 |
✓ |
同步 |
```plaintext async ``` |
NOPE |
```plaintext async ``` |
同步 |
✓ |
```plaintext async ``` |
```plaintext async ``` |
✓ |
总结来说:作为用户,异步函数相较于普通函数的优势在于,异步函数拥有一种超能力:它们可以调用其他异步函数。
这立刻引发两个问题:如何,以及为什么?具体来说:
当您的 Python 程序启动时,它正在运行普通的同步代码。因此,存在一个鸡生蛋的问题:一旦我们运行了异步函数,我们就可以调用其他异步函数,但我们是怎样调用第一个异步函数的呢?
如果写异步函数的唯一理由是它可以调用其他异步函数,那我们为什么还要使用它们呢?我的意思是,作为超级能力来说,这似乎有点毫无意义。难道不是更简单直接的方法就是……根本不用任何异步函数吗?
这里就是异步库如 Trio 发挥作用的地方。它提供了两个功能:
一个运行函数,它是一种特殊的同步函数,用于接收并调用异步函数。在 Trio 中,这表示为trio.run
:
这样回答了“怎么”这部分。
一串实用的异步函数——特别是用于 I/O 操作的函数。这样也就回答了“为什么”:这些函数是异步的,而且很有用,所以如果你想使用它们,就必须编写异步代码。如果你觉得跟踪这些async
和await
东西很烦人,那也没办法——你在这方面没有选择!好吧,好吧,你也可以选择不使用 Trio。这也是一个合法的选择。但后来我们会讨论到,async/await
的东西实际上是一件好事,原因稍后我们会稍微讨论一下。
这里有一个示例函数,它使用了trio.sleep()
。(trio.sleep()
类似于time.sleep()
,但更具有异步性。)import trio async def double_sleep(x): await trio.sleep(2 * x) trio.run(double_sleep, 3) # does nothing for 6 seconds then returns
结果证明,我们的 async_double
函数实际上是一个糟糕的例子。我的意思是,它运行正常,没问题,但它是多余的:它完全可以写成一个普通的函数,那样会更有用。 double_sleep
则是一个更典型的例子:我们必须将其改为异步,因为它调用了另一个异步函数。最终结果是类似于异步三明治的结构,两边是 Trio,中间是我们的代码:
trio.run -> double_sleep -> trio.sleep
这种“三明治”结构在异步代码中很典型;总的来说,它看起来是这样的:
trio.run -> [async function] -> ... -> [async function] -> trio.whatever
在 trio.run()
和 trio.whatever
之间的路径上,正是那些函数需要是异步的。Trio 提供了异步的面包,然后你的代码填充了异步三明治的美味异步馅料。其他函数(例如你在过程中调用的辅助函数)通常应该是常规的、非异步函数。
警告:别忘了 await
!✂️
现在正是打开 Python 提示符,尝试编写简单的异步函数并用 trio.run
运行的好时机。
在某个阶段,你可能会在代码中写出类似这样的内容,尝试调用一个异步函数,但遗漏了 await
:
import time
import trio
async def broken_double_sleep(x):
print("*yawn* Going to sleep")
start_time = time.perf_counter()
# Whoops, we forgot the 'await'!
trio.sleep(2 * x)
sleep_time = time.perf_counter() - start_time
print(f"Woke up after {sleep_time:.2f} seconds, feeling well rested!")
trio.run(broken_double_sleep, 3)
您可能会认为 Python 在这里会抛出一个错误,就像它在我们调用函数时犯其他错误时那样。比如,如果我们忘记传递 trio.sleep()
所需的参数,那么我们会得到一个友好的 TypeError
提示。但不幸的是,如果您忘记了一个 await
,您不会得到这样的提示。实际上,您会得到的是:
>>> trio.run(broken_double_sleep, 3)
*yawn* Going to sleep
Woke up after 0.00 seconds, feeling well rested!
__main__:4: RuntimeWarning: coroutine 'sleep' was never awaited
>>>
这显然是出了问题——0.00 秒的时间根本不足以感到休息充分!然而代码却表现得像是成功了——没有抛出任何异常。唯一的线索是它打印出了 RuntimeWarning: coroutine 'sleep' was never awaited
。而且,警告信息打印的确切位置可能会变化,因为这取决于垃圾回收器的随意性。如果你使用 PyPy,可能直到下一次垃圾回收运行之前都不会收到任何警告:
# On PyPy:
>>> trio.run(broken_double_sleep, 3)
*yawn* Going to sleep
Woke up after 0.00 seconds, feeling well rested!
>>> # what the ... ?? not even a warning!
>>> # but forcing a garbage collection gives us a warning:
>>> import gc
>>> gc.collect()
/home/njs/pypy-3.8-nightly/lib-python/3/importlib/_bootstrap.py:191: RuntimeWarning: coroutine 'sleep' was never awaited
if _module_locks.get(name) is wr: # XXX PyPy fix?
0
(如果您看不到上面的警告,请尝试向右滚动。)
忘记一个 await
就像这样是一个非常常见的错误。你肯定会搞砸的。每个人都会犯这个错误。而且 Python 不会像你希望的那样帮助你 😞。关键是要记住:如果你看到这些魔法词 RuntimeWarning: coroutine '...' was never
awaited
,那么这总是意味着你在某个地方遗漏了 await
,你应该忽略所有其他错误信息,先去修复这个问题,因为其他的问题很可能只是附带损害。我甚至不确定 PyPy 输出中的所有那些垃圾是什么。幸运的是,我并不需要知道,我只需要修复我的函数!
(“我以为你说你不会提到协程!”好吧,是的,我没有提到协程,是 Python 提到了。去找 Guido 理论吧!但说真的,这不幸是一个内部实现细节稍微泄露出来的地方。)
为什么会出现这种情况?在 Trio 中,每次我们使用 await
都是调用异步函数,每次调用异步函数我们都会使用 await
。但是 Python 试图为其他组织得稍微差一点的库保留选择。所以,尽管对我们来说可以将 await trio.sleep(...)
视为一种单一的语法,Python 却将其视为两件事:首先是一个返回这种奇怪的“协程”对象的函数调用:
>>> trio.sleep(3)
<coroutine object sleep at 0x7f5ac77be6d0>
然后,那个对象会被传递给 await
,它实际上会运行这个函数。所以如果你忘记了 await
,那么会发生两件不好的事情:你的函数实际上没有被调用,你得到了一个“协程”对象,而你可能期望的是其他东西,比如一个数字:
>>> async_double(3) + 1
TypeError: unsupported operand type(s) for +: 'coroutine' and 'int'
如果你还没有自然地搞砸它,那就故意试试:尝试写一些代码,漏掉一个 await
,或者多一个 await
,看看会发生什么。这样,当你真正遇到这种情况时,你就有准备了。
记住:当心 RuntimeWarning: coroutine '...' was
never awaited
;这意味着你需要找到并修复你的缺失 await
。
好的,让我们先看看点酷的东西吧
现在我们开始使用 Trio 了,但到目前为止,我们只会写打印信息和睡眠不同时间的函数。这倒也足够有趣,但用 time.sleep()
也能做到。 async/await
真的是毫无用处!
嗯,其实并非如此。Trio 还有一招藏在袖子里,这使得异步函数比普通函数更强大:它可以同时运行多个异步函数。以下是一个例子:
1# tasks-intro.py
2
3import trio
4
5
6async def child1():
7 print(" child1: started! sleeping now...")
8 await trio.sleep(1)
9 print(" child1: exiting!")
10
11
12async def child2():
13 print(" child2: started! sleeping now...")
14 await trio.sleep(1)
15 print(" child2: exiting!")
16
17
18async def parent():
19 print("parent: started!")
20 async with trio.open_nursery() as nursery:
21 print("parent: spawning child1...")
22 nursery.start_soon(child1)
23
24 print("parent: spawning child2...")
25 nursery.start_soon(child2)
26
27 print("parent: waiting for children to finish...")
28 # -- we exit the nursery block here --
29 print("parent: all done!")
30
31
32trio.run(parent)
这里发生了很多事情,所以我们一步一步来。在第一部分,我们定义了两个异步函数 child1
和 child2
。这些函数在上一个部分应该看起来很熟悉:
6async def child1():
7 print(" child1: started! sleeping now...")
8 await trio.sleep(1)
9 print(" child1: exiting!")
10
11
12async def child2():
13 print(" child2: started! sleeping now...")
14 await trio.sleep(1)
15 print(" child2: exiting!")
然后,我们定义 parent
为一个异步函数,该函数将并发调用 child1
和 child2
:
18async def parent():
19 print("parent: started!")
20 async with trio.open_nursery() as nursery:
21 print("parent: spawning child1...")
22 nursery.start_soon(child1)
23
24 print("parent: spawning child2...")
25 nursery.start_soon(child2)
26
27 print("parent: waiting for children to finish...")
28 # -- we exit the nursery block here --
29 print("parent: all done!")
通过使用神秘的 async with
语句创建一个“苗圃”,然后“孵化” child1
和 child2
进入苗圃。
让我们从这个 async with
东西开始。这实际上很简单。在常规 Python 中,像 with someobj: ...
这样的语句指示解释器在块的开始处调用 someobj.__enter__()
,并在块的末尾调用 someobj.__exit__()
。我们称 someobj
为“上下文管理器”。 async
with
做的是完全相同的事情,只不过它调用的是异步方法,而不是常规方法:在块的开始处执行 await
someobj.__aenter__()
,在块的末尾执行 await
someobj.__aexit__()
。在这种情况下,我们称 someobj
为“异步上下文管理器”。所以简而言之: with
块是调用某些函数的简写,由于 Python 现在有了两种函数,即异步/await,它也需要两种类型的 with
块。就是这样!如果你理解了异步函数,那么你就理解了 async with
。
注意
这个例子没有使用它们,但既然我们在这里,不妨也提一下 async/await 添加的另一个新语法: async for
。它基本上与 async
with
和 with
的思路相同:一个 async for
循环就像一个 for
循环,只不过在 for
循环中是用来获取下一个项目的地方, async for
则是用来 await async_iterator.__anext__()
的。现在你该明白 async/await 了。基本上,只需记住它涉及到做三明治,并且把“async”这个词加在所有东西前面,你就能做得很好。
现在我们已经理解了 async with
,让我们再次看看 parent
:
18async def parent():
19 print("parent: started!")
20 async with trio.open_nursery() as nursery:
21 print("parent: spawning child1...")
22 nursery.start_soon(child1)
23
24 print("parent: spawning child2...")
25 nursery.start_soon(child2)
26
27 print("parent: waiting for children to finish...")
28 # -- we exit the nursery block here --
29 print("parent: all done!")
"""
这里真正起作用的只有 4 行代码。在第 20 行,我们使用 trio.open_nursery()
获取一个“托儿所”对象,然后在 async with
块内部,我们在第 22 行和第 25 行调用了 nursery.start_soon
两次。实际上有两种方式来调用异步函数:第一种就是我们之前看到的方式,使用 await
async_fn()
;新的方式是 nursery.start_soon(async_fn)
:它让 Trio 开始运行这个异步函数,但随后立即返回,不等待函数完成。所以在我们对 nursery.start_soon
、 child1
和 child2
的两次调用之后,它们现在都在后台运行。然后在第 28 行,注释行,我们到达了 async with
块的末尾,托儿所的 __aexit__
函数开始运行。这样做的作用是强制 parent
在这里停止,等待所有托儿所中的子进程退出。这就是为什么你必须使用 async with
来获取托儿所的原因:它为我们提供了一种确保子进程调用不会逃逸并丢失的方法。这之所以重要,一个原因是如果子进程中存在错误或其他问题,并引发异常,那么它允许我们将这个异常传播到父进程中;在许多其他框架中,这样的异常会被丢弃。Trio 永远不会丢弃异常。
"""
好的!让我们试运行一下,看看结果如何:
parent: started!
parent: spawning child1...
parent: spawning child2...
parent: waiting for children to finish...
child2: started! sleeping now...
child1: started! sleeping now...
[... 1 second passes ...]
child1: exiting!
child2: exiting!
parent: all done!
"""
(您的输出中“开始”和/或“退出”的顺序可能与我的不同。)
"""
注意, child1
和 child2
都是一起开始,然后又一起结束的。而且,尽管我们调用了两次 trio.sleep(1)
,但程序总共只用了 1 秒钟就完成了。所以看起来 child1
和 child2
真的是在同时运行!
现在,如果你熟悉使用线程进行编程,这可能会让你感到熟悉——这是故意的。但重要的是要意识到这里没有线程。所有这些都是在单个线程中发生的。为了提醒自己这一点,我们使用了略微不同的术语:我们不说“创建了两个线程”,而是说“创建了两个任务”。任务和线程之间有两个区别:(1)许多任务可以在单个线程上轮流运行,并且(2)在多线程中,Python 解释器/操作系统可以随时切换正在运行的线程;而在任务中,我们只能在某些称为“检查点”的指定位置切换。在下一节中,我们将深入探讨这意味着什么。
任务切换示例
异步/await 基础库,如 Trio,背后的主要思想是通过在适当的位置切换任务,在单个线程上同时运行大量任务——例如,如果我们正在实现一个 Web 服务器,那么一个任务可以同时发送 HTTP 响应,而另一个任务正在等待新的连接。如果你只想使用 Trio,那么你不需要了解所有关于这种切换的细节——但了解 Trio 在代码执行时“幕后”做了什么是非常有用的。为了帮助建立这种直觉,让我们更仔细地看看 Trio 是如何运行上一节中我们的示例的。
幸运的是,Trio 提供了一套丰富的工具来检查和调试你的程序。在这里,我们想要观察 trio.run()
的工作,我们可以通过编写一个名为 Tracer
的类来实现,该类实现了 Trio 的 Instrument
接口。它的任务是记录各种事件的发生:
class Tracer(trio.abc.Instrument):
def before_run(self):
print("!!! run started")
def _print_with_task(self, msg, task):
# repr(task) is perhaps more useful than task.name in general,
# but in context of a tutorial the extra noise is unhelpful.
print(f"{msg}: {task.name}")
def task_spawned(self, task):
self._print_with_task("### new task spawned", task)
def task_scheduled(self, task):
self._print_with_task("### task scheduled", task)
def before_task_step(self, task):
self._print_with_task(">>> about to run one step of task", task)
def after_task_step(self, task):
self._print_with_task("<<< task step finished", task)
def task_exited(self, task):
self._print_with_task("### task exited", task)
def before_io_wait(self, timeout):
if timeout:
print(f"### waiting for I/O for up to {timeout} seconds")
else:
print("### doing a quick check for I/O")
self._sleep_time = trio.current_time()
def after_io_wait(self, timeout):
duration = trio.current_time() - self._sleep_time
print(f"### finished I/O check (took {duration} seconds)")
def after_run(self):
print("!!! run finished")
然后,我们重新运行上一节中的示例程序,但这次我们传递了一个对象给 trio.run()
和 Tracer
:
trio.run(parent, instruments=[Tracer()])
这会产生大量输出,所以我们将一步一步地进行。
首先,当 Trio 准备运行我们的代码时,有一点点嘈杂声。现在对于我们来说,这些嘈杂声大多无关紧要,但在其中你可以看到 Trio 为 __main__.parent
函数创建了一个任务,并且“安排”了它(即,记下它应该很快运行):
$ python3 tutorial/tasks-with-trace.py
!!! run started
### new task spawned: <init>
### task scheduled: <init>
### doing a quick check for I/O
### finished I/O check (took 1.1122087016701698e-05 seconds)
>>> about to run one step of task: <init>
### new task spawned: <call soon task>
### task scheduled: <call soon task>
### new task spawned: __main__.parent
### task scheduled: __main__.parent
<<< task step finished: <init>
### doing a quick check for I/O
### finished I/O check (took 6.4980704337358475e-06 seconds)
一旦完成初步的整理工作,Trio 就开始运行 parent
函数,你可以看到 parent
创建了两个子任务。然后它到达了 async with
块的末尾,并暂停了:
>>> about to run one step of task: __main__.parent
parent: started!
parent: spawning child1...
### new task spawned: __main__.child1
### task scheduled: __main__.child1
parent: spawning child2...
### new task spawned: __main__.child2
### task scheduled: __main__.child2
parent: waiting for children to finish...
<<< task step finished: __main__.parent
控制权随后返回到 trio.run()
,它记录了更多内部对话:
>>> about to run one step of task: <call soon task>
<<< task step finished: <call soon task>
### doing a quick check for I/O
### finished I/O check (took 5.476875230669975e-06 seconds)
然后给两个子任务运行的机会:
>>> about to run one step of task: __main__.child2
child2 started! sleeping now...
<<< task step finished: __main__.child2
>>> about to run one step of task: __main__.child1
child1: started! sleeping now...
<<< task step finished: __main__.child1
每个任务都会运行到调用 trio.sleep()
,然后突然我们又回到了 trio.run()
,决定接下来运行什么。这是怎么发生的呢?秘诀在于 trio.run()
和 trio.sleep()
共同协作实现了这一点: trio.sleep()
拥有一些特殊的魔法,可以让自己暂停,因此它向 trio.run()
发送一个请求,要求在 1 秒后再次唤醒自己,然后挂起任务。一旦任务被挂起,Python 就会将控制权交回给 trio.run()
,由它决定接下来做什么。(如果这听起来和生成器通过执行 yield
来挂起执行的方式相似,那并不是巧合:在 Python 解释器内部,生成器和异步函数的实现有很多重叠。)
注意
您可能会想知道是否可以混合使用不同异步库的原始操作符。例如,我们能否同时使用 trio.run()
和 asyncio.sleep()
?答案是:不可以,上文已经解释了原因:我们这个异步三明治的两边使用的是一种私有的语言来相互沟通,而不同的库使用的是不同的语言。所以,如果您在 trio.run()
内部调用 asyncio.sleep()
,那么 Trio 会非常困惑,很可能会以一种戏剧性的方式崩溃。
只有异步函数才能访问用于挂起任务的特殊魔法,因此只有异步函数才能使程序切换到不同的任务。这意味着如果一个调用没有 await
,那么你就知道它不可能是一个任务挂起的地方。这使得任务比线程更容易推理,因为任务之间交错和干扰对方状态的方式要少得多。(例如,在 Trio 中, a += 1
这样的语句总是原子的——即使 a
是一个任意复杂的自定义对象!)Trio 还提供了更多的保证,但这是最重要的。
现在你也知道了为什么 parent
必须使用 async with
来打开幼儿园:如果我们使用常规的 with
块,那么它就不能在结束时暂停等待孩子们完成;我们需要我们的清理函数是异步的,这正是 async with
所提供的。
现在回到我们的执行点。回顾一下:目前 parent
正在等待 child1
和 child2
,而这两个子进程都在睡眠状态。所以 trio.run()
查看其笔记,发现除非可能发生某些外部 I/O 事件,否则在那些睡眠结束之前没有其他事情可做。当然,我们这里没有进行任何 I/O 操作,所以这种情况不会发生,但在其他情况下可能会发生。因此,接下来它调用操作系统原语将整个进程挂起:
### waiting for I/O for up to 0.9999009938910604 seconds
实际上并没有 I/O 输入,所以一秒后我们再次醒来,Trio 再次查看它的笔记。这时,它检查当前时间,将其与 trio.sleep()
发送的笔记进行比较,这些笔记说明了两个子任务应该再次被唤醒的时间,并意识到它们已经睡得够久了,因此它安排它们很快运行:
### finished I/O check (took 1.0006483688484877 seconds)
### task scheduled: __main__.child1
### task scheduled: __main__.child2
然后孩子们开始奔跑,这一次他们跑到了终点。还记得 parent
正在等待他们完成吗?注意,当第一个孩子退出时, parent
就会被安排上:
>>> about to run one step of task: __main__.child1
child1: exiting!
### task scheduled: __main__.parent
### task exited: __main__.child1
<<< task step finished: __main__.child1
>>> about to run one step of task: __main__.child2
child2 exiting!
### task exited: __main__.child2
<<< task step finished: __main__.child2
然后,在进行了一次 I/O 检查之后, parent
醒了过来。托儿所清理代码发现所有子进程都已退出,于是允许托儿所块完成。然后, parent
进行了一次最后的打印并退出:
### doing a quick check for I/O
### finished I/O check (took 9.045004844665527e-06 seconds)
>>> about to run one step of task: __main__.parent
parent: all done!
### task scheduled: <init>
### task exited: __main__.parent
<<< task step finished: __main__.parent
最后,经过一些额外的内部账目处理, trio.run()
也退出了:
### doing a quick check for I/O
### finished I/O check (took 5.996786057949066e-06 seconds)
>>> about to run one step of task: <init>
### task scheduled: <call soon task>
### task scheduled: <init>
<<< task step finished: <init>
### doing a quick check for I/O
### finished I/O check (took 6.258022040128708e-06 seconds)
>>> about to run one step of task: <call soon task>
### task exited: <call soon task>
<<< task step finished: <call soon task>
>>> about to run one step of task: <init>
### task exited: <init>
<<< task step finished: <init>
!!! run finished
你做到了!
那有很多文字,但同样,你不需要完全理解这里的一切才能使用 Trio——实际上,Trio 在让每个任务感觉像以简单、线性方式执行方面做了很多努力。(就像你的操作系统也在努力让你感觉单线程代码以简单线性方式执行,尽管在底层,操作系统在本质上以与 Trio 相同的方式在不同线程和进程之间进行切换。)但是,在脑海中有一个大致的模型来了解你编写的代码是如何实际执行的,这一点是有用的,而且——最重要的是——这对于并行性的影响。
或者,如果您这已经激发了您的兴趣,想要了解更多关于 async/await
内部工作原理的信息,那么这篇博客文章是一个很好的深入探讨,或者您可以查看这个精彩的教程,了解如何从头开始构建一个简单的异步 I/O 框架。
更温柔、更仁慈的 GIL
关于并行处理,让我们暂时放大视角,来谈谈 async/await 与 Python 中其他处理并发的方式的比较。
我们已经指出,Trio 任务在概念上与 Python 内置的线程非常相似,就像由 threading
模块提供的线程。在所有常见的 Python 实现中,线程都有一个著名的限制:全局解释器锁,简称 GIL。GIL 意味着即使你使用了多个线程,你的代码仍然(大部分情况下)只会在单个核心上运行。人们往往对此感到沮丧。
但就 Trio 而言,GIL 的问题并不在于它限制了并行性。当然,如果 Python 有更好的利用多核的能力那当然很好,但这是一个非常难以解决的问题,而在那之前,还有很多问题单核就足够了——或者即使单核不够,进程级或机器级并行也能很好地解决问题。
不,GIL 的问题在于它是个糟糕的交易:我们放弃了使用多核,而换来的却是……几乎与真实并行编程带来的所有挑战和令人头疼的 bug 一样,而且——更糟糕的是——扩展性相当差。Python 中的线程根本就不那么吸引人。
三重奏并不让你的代码在多个核心上运行;事实上,正如我们上面所看到的,三重奏的设计理念是,当它有多个任务时,它们会轮流执行,所以在任何时刻,只有一个任务正在积极运行。我们并不是在克服全局解释器锁(GIL),而是在拥抱它。但是,如果你愿意接受这一点,再加上一点额外的工作,将新的 async
和 await
关键字放在正确的位置,那么作为交换,你将得到:
优秀的可扩展性:Trio 能够在不费吹灰之力的情况下同时运行 10,000+个任务,前提是它们的总 CPU 需求不超过单个核心的处理能力。(这在例如拥有大量客户端连接但任何给定时间只有少数活跃的网络服务器中很常见。)
花哨的功能:大多数线程系统都是用 C 语言实现的,并且受限于操作系统提供的功能。在 Trio 中,我们的逻辑全部用 Python 编写,这使得我们能够实现像 Trio 的取消系统这样的强大且易用的功能。
代码更容易推理:await
关键字意味着在每个函数中显式标记了潜在的切换任务点,这使得 Trio 代码比使用线程的等效程序更容易推理。
当然,并不是每个应用都适合这样做……但有很多情况下,这里的权衡看起来相当吸引人。
有一个缺点需要您牢记在心。明确设置检查点可以给您更多控制任务交错执行的能力——但权力越大,责任越大。在使用线程时,运行环境负责确保每个线程都能获得公平的运行时间。而在 Trio 中,如果某个任务连续运行数秒而不执行检查点,那么……其他所有任务就只能等待了。
这里是一个例子,说明事情可能会出错。以我们上面的例子为基础,将调用 trio.sleep()
替换为调用 time.sleep()
。如果我们运行修改后的程序,我们会看到类似的情况:
parent: started!
parent: spawning child1...
parent: spawning child2...
parent: waiting for children to finish...
child2 started! sleeping now...
[... pauses for 1 second ...]
child2 exiting!
child1: started! sleeping now...
[... pauses for 1 second ...]
child1: exiting!
parent: all done!
三号之所以拥有如此丰富的仪器 API,主要原因是使其能够编写调试工具,捕捉此类问题。
网络与 Trio 的交流
现在,让我们将所学知识应用于 I/O 操作,这正是 async/await 大放异彩的地方。
传统的网络 API 演示玩具应用是一个“回声服务器”:一个等待从远程客户端接收任意数据的程序,然后将相同的数据直接发送回去。(也许在当今,一个执行大量并发 HTTP 请求的应用程序会更相关,但为了使用像 asks 这样的 HTTP 库,我们将坚持回声服务器的传统。)
在这个教程中,我们展示了管道的两端:客户端和服务器。客户端定期向服务器发送数据,并显示其答案。服务器等待连接;当客户端连接时,它将接收到的数据回传到管道中。
一个回声客户端
首先,这里有一个示例回声客户端,即将要向我们的回声服务器发送一些数据并接收响应的程序:
1# echo-client.py
2
3import sys
4import trio
5
6# arbitrary, but:
7# - must be in between 1024 and 65535
8# - can't be in use by some other program on your computer
9# - must match what we set in our echo server
10PORT = 12345
11
12
13async def sender(client_stream):
14 print("sender: started!")
15 while True:
16 data = b"async can sometimes be confusing, but I believe in you!"
17 print(f"sender: sending {data!r}")
18 await client_stream.send_all(data)
19 await trio.sleep(1)
20
21
22async def receiver(client_stream):
23 print("receiver: started!")
24 async for data in client_stream:
25 print(f"receiver: got data {data!r}")
26 print("receiver: connection closed")
27 sys.exit()
28
29
30async def parent():
31 print(f"parent: connecting to 127.0.0.1:{PORT}")
32 client_stream = await trio.open_tcp_stream("127.0.0.1", PORT)
33 async with client_stream:
34 async with trio.open_nursery() as nursery:
35 print("parent: spawning sender...")
36 nursery.start_soon(sender, client_stream)
37
38 print("parent: spawning receiver...")
39 nursery.start_soon(receiver, client_stream)
40
41
42trio.run(parent)
请注意,这段代码在没有以下我们将要实现的 TCP 服务器的情况下无法运行。
整体结构在这里应该是熟悉的,因为它和我们的上一个例子非常相似:我们有一个父任务,它会派生两个子任务来完成实际工作,然后在 async with
块结束时,它会切换到全权监护模式,等待它们完成。但现在,子任务不再只是调用 trio.sleep()
,而是使用了 Trio 的一些网络 API。
让我们首先看看家长:
30async def parent():
31 print(f"parent: connecting to 127.0.0.1:{PORT}")
32 client_stream = await trio.open_tcp_stream("127.0.0.1", PORT)
33 async with client_stream:
34 async with trio.open_nursery() as nursery:
35 print("parent: spawning sender...")
36 nursery.start_soon(sender, client_stream)
37
38 print("parent: spawning receiver...")
39 nursery.start_soon(receiver, client_stream)
首先,我们调用 trio.open_tcp_stream()
来与服务器建立 TCP 连接。 127.0.0.1
是一个魔法 IP 地址,意味着“我正在运行的计算机”,因此这会将我们连接到本地计算机上使用 PORT
作为其接触点的任何程序。这个函数返回一个实现 Trio 的 Stream
接口的对象,它为我们提供了发送和接收字节数据的方法,以及在我们完成时关闭连接的方法。我们使用 async with
块来确保我们确实关闭了连接——在这个玩具示例中这不是什么大问题,但养成这个习惯是好的,而且 Trio 被设计成使 with
和 async with
块易于使用。
最后,我们启动两个子任务,并将流引用传递给每个子任务。(这也是一个很好的例子,说明了 nursery.start_soon
如何让你向生成的函数传递位置参数。)
我们的第一个任务是向服务器发送数据:
13async def sender(client_stream):
14 print("sender: started!")
15 while True:
16 data = b"async can sometimes be confusing, but I believe in you!"
17 print(f"sender: sending {data!r}")
18 await client_stream.send_all(data)
19 await trio.sleep(1)
它使用一个循环,交替调用 await
client_stream.send_all(...)
来发送一些数据(这是你在任何类型的 Trio 流上发送数据的方法),然后休眠一秒钟,以避免在终端上使输出滚动得太快。
第二项任务的职责是处理服务器返回的数据:
22async def receiver(client_stream):
23 print("receiver: started!")
24 async for data in client_stream:
25 print(f"receiver: got data {data!r}")
26 print("receiver: connection closed")
27 sys.exit()
它使用一个 async for
循环从服务器获取数据。或者,它也可以使用 receive_some
,这是 send_all
的相反,但使用 async for
可以节省一些样板代码。
现在我们准备查看服务器了。
回声服务器
通常,我们先整体看看,然后再讨论各个部分:
1# echo-server.py
2
3import trio
4from itertools import count
5
6# Port is arbitrary, but:
7# - must be in between 1024 and 65535
8# - can't be in use by some other program on your computer
9# - must match what we set in our echo client
10PORT = 12345
11
12CONNECTION_COUNTER = count()
13
14
15async def echo_server(server_stream):
16 # Assign each connection a unique number to make our debug prints easier
17 # to understand when there are multiple simultaneous connections.
18 ident = next(CONNECTION_COUNTER)
19 print(f"echo_server {ident}: started")
20 try:
21 async for data in server_stream:
22 print(f"echo_server {ident}: received data {data!r}")
23 await server_stream.send_all(data)
24 print(f"echo_server {ident}: connection closed")
25 # FIXME: add discussion of (Base)ExceptionGroup to the tutorial, and use
26 # exceptiongroup.catch() here. (Not important in this case, but important
27 # if the server code uses nurseries internally.)
28 except Exception as exc:
29 # Unhandled exceptions will propagate into our parent and take
30 # down the whole program. If the exception is KeyboardInterrupt,
31 # that's what we want, but otherwise maybe not...
32 print(f"echo_server {ident}: crashed: {exc!r}")
33
34
35async def main():
36 await trio.serve_tcp(echo_server, PORT)
37
38
39# We could also just write 'trio.run(trio.serve_tcp, echo_server, PORT)', but real
40# programs almost always end up doing other stuff too and then we'd have to go
41# back and factor it out into a separate function anyway. So it's simplest to
42# just make it a standalone function from the beginning.
43trio.run(main)
让我们从 main
开始,它只是一行长:
35async def main():
36 await trio.serve_tcp(echo_server, PORT)
这所做的是调用 serve_tcp()
,这是一个 Trio 提供的便利函数,它会一直运行(至少直到你按下控制-C 或以其他方式取消它)。这个函数做了几件很有帮助的事情:
它内部创建了一个托儿所,以便我们的服务器能够同时处理多个连接。
它监听指定端口的传入 TCP 连接。
每当有连接到来时,它就会启动一个新的任务,运行我们传递的函数(在这个例子中是echo_server
),并将代表该连接的流传递给它。
当每个任务退出时,它会确保关闭相应的连接。(这就是为什么你在服务器上看不到任何async with server_stream
——serve_tcp()
会为我们处理这件事。)
所以 serve_tcp()
非常方便!这部分对于任何服务器来说都几乎是通用的,无论是回声服务器、HTTP 服务器、SSH 服务器还是其他什么服务器,所以把它打包成一个辅助函数很有意义。
现在让我们看看 echo_server
,它处理每个客户端连接——如果有多个客户端,可能会有多个 echo_server
同时运行。这就是我们实现服务器“回声”行为的地方。这应该很容易理解,因为它使用了我们在上一节中看到的相同流函数:
15async def echo_server(server_stream):
16 # Assign each connection a unique number to make our debug prints easier
17 # to understand when there are multiple simultaneous connections.
18 ident = next(CONNECTION_COUNTER)
19 print(f"echo_server {ident}: started")
20 try:
21 async for data in server_stream:
22 print(f"echo_server {ident}: received data {data!r}")
23 await server_stream.send_all(data)
24 print(f"echo_server {ident}: connection closed")
25 # FIXME: add discussion of (Base)ExceptionGroup to the tutorial, and use
26 # exceptiongroup.catch() here. (Not important in this case, but important
27 # if the server code uses nurseries internally.)
28 except Exception as exc:
29 # Unhandled exceptions will propagate into our parent and take
30 # down the whole program. If the exception is KeyboardInterrupt,
31 # that's what we want, but otherwise maybe not...
32 print(f"echo_server {ident}: crashed: {exc!r}")
该论点由 serve_tcp()
提供,是我们在客户端建立连接的另一端,因此客户端传递给 send_all
的数据将在这里输出。然后是下面讨论的 try
块,最后是服务器循环,它交替地从套接字读取一些数据并将其发送出去(除非套接字已关闭,在这种情况下我们将退出)。
那么这个 try
块是做什么用的呢?记住,在 Trio 中,就像 Python 语言本身一样,异常会一直传播,直到被捕获。我们认为可能存在意外的异常,我们希望将其隔离,只让这个任务崩溃,而不会导致整个程序崩溃。例如,如果客户端在错误的时间关闭连接,那么这段代码可能会在关闭的连接上调用 send_all
,并得到一个 BrokenResourceError
;这很不幸,在一个更严肃的程序中,我们可能希望更明确地处理它,但这并不表示其他连接存在问题。另一方面,如果异常是像 KeyboardInterrupt
这样的,我们希望它传播到父任务,并导致整个程序退出。为了表达这一点,我们使用一个 try
块和一个 except Exception:
处理程序。
通常情况下,Trio 会让你自行决定是否以及如何处理异常,就像 Python 一样。
试试看吧
打开几个终端,在一个终端中运行 echo-server.py
,在另一个终端中运行 echo-client.py
,然后观看信息滚动!当你感到无聊时,可以通过按 Ctrl+C 来退出。
一些尝试的建议:
打开多个终端,同时运行多个客户端,所有客户端都与同一服务器进行通信。
查看服务器在客户端按下控制+C 键时的反应。
看看当你在服务器上按下控制-C 键时,客户是如何反应的。
流程控制在我们的回声客户端和服务器中
这里有一个你可能想知道的问题:为什么我们的客户在发送和接收时使用两个独立的任务,而不是像服务器那样使用一个交替进行这两个任务的单一任务——例如,我们的客户可以使用以下单一任务:
# Can you spot the two problems with this code?
async def send_and_receive(client_stream):
while True:
data = ...
await client_stream.send_all(data)
received = await client_stream.receive_some()
if not received:
sys.exit()
await trio.sleep(1)
结果发现,这里存在两个问题——一个是小的,一个是大的。这两个问题都与流程控制有关。小问题是,当我们在这里调用 receive_some
时,我们并没有等待所有数据都可用; receive_some
一旦有数据可用就会返回。如果 data
很小,那么我们的操作系统/网络/服务器可能会将其全部保存在一个单独的数据块中,但这没有保证。如果服务器发送 hello
,我们可能会收到 hello
,或者 he
llo
,或者 h
e
l
l
o
,或者……总之,当我们预期接收多于一个字节的数据时,我们必须准备好多次调用 receive_some
。
并且,如果 data
变得足够大,以至于超过了某些内部阈值,操作系统或网络决定总是将其拆分成多个部分,那么这里就会出特别大的问题。现在,在每次循环中,我们发送 len(data)
字节,但读取的字节数少于这个量。结果是类似于内存泄漏的情况:我们会在网络上积累越来越多的数据,最终导致某个部分崩溃。
注意
如果你对事物是如何崩溃的好奇,那么你可以使用 receive_some
的可选参数来限制每次读取的字节数,看看会发生什么。
我们可以通过跟踪每个时刻预期的数据量,然后不断调用 receive_some
直到获取所有数据来解决这个问题:
这有点麻烦,但可以解决这个问题。
尽管如此,还有一个更深层次的问题。我们仍然在发送和接收之间交替。请注意,当我们发送数据时,我们使用 await
:这意味着发送可能会阻塞。为什么会这样呢?我们发送的任何数据首先进入操作系统缓冲区,然后进入网络,再进入接收计算机上的另一个操作系统缓冲区,最后接收程序才会调用 receive_some
从这些缓冲区中取出数据。如果我们用少量数据调用 send_all
,那么这些数据就会进入这些缓冲区,而 send_all
会立即返回。但是,如果我们快速发送足够多的数据,最终缓冲区会填满, send_all
将会阻塞,直到远程端调用 receive_some
并释放一些空间。
现在让我们从服务器的角度来思考这个问题。每次它调用 receive_some
时,它会获取一些需要发送的数据。并且直到它发送出去,那些暂时存放的数据就会占用内存。计算机的 RAM 是有限的,所以如果我们的服务器表现良好,那么在某个时候,它需要停止调用 receive_some
,直到它通过自己的调用 send_all
来清除一些旧数据。因此,对于服务器来说,真正可行的选择就是交替进行接收和发送。
但是,我们需要记住,不仅仅是客户端对 send_all
的调用可能会阻塞:服务器对 send_all
的调用也可能陷入一种等待客户端调用 receive_some
的情况。所以,如果服务器在调用 receive_some
之前需要等待 send_all
完成,而我们的客户端在调用 receive_some
之前也需要等待 send_all
完成,……我们就遇到了问题!客户端不会调用 receive_some
,直到服务器已经调用了 receive_some
,而服务器也不会调用 receive_some
,直到客户端已经调用了 receive_some
。如果我们的客户端被编写为交替发送和接收,并且它试图发送的数据块足够大(例如,在大多数配置中,10 兆字节可能就足够了),那么这两个进程将会发生死锁。
道德:Trio 为您提供了强大的工具来管理顺序和并发执行。在这个例子中,我们看到了服务器需要 send
和 receive_some
交替顺序执行,而客户端需要它们并发运行,这两者都易于实现。但是,当你实现这样的网络代码时,仔细思考流量控制和缓冲是非常重要的,因为选择正确的执行模式取决于你自己!
其他流行的异步库,如 Twisted 和 asyncio
,往往通过在各个地方添加无界缓冲区来掩盖这些问题。这可以避免死锁,但可能会引入新的问题,特别是可能会使内存使用和延迟难以控制。虽然这两种方法都有其优点,但 Trio 认为,最好尽可能直接暴露底层问题,并提供良好的工具来直面它。
注意
如果您想故意尝试造成死锁并亲自查看,并且您正在使用 Windows 系统,那么您可能需要将 send_all
调用拆分成两个调用,每个调用发送一半的数据。这是因为 Windows 在处理缓冲区方面有相当独特的方式。
当任务并发出现问题时:超时、取消和异常处理 ¶
"""
待办:使用 fail_after()
提供一个示例
"""
待解释 Cancelled
待办:解释当子进程抛出异常时,如何也使用取消操作
待办:也许可以简要讨论一下 KeyboardInterrupt
的处理方法?