三重奏-人的异步编程

图片

Python有一个Trio库-异步编程库。
认识Trio对于从事Asyncio工作的人们来说主要是有趣的,因为它是一个很好的选择,它使您能够解决Asyncio无法处理的一些问题。在这篇评论中,我们将考虑Trio是什么以及它具有什么功能。

对于刚开始异步编程的人,我建议阅读有关异步和同步的简短介绍。

同步与异步


同步编程中,所有操作都是按顺序执行的,并且只有在完成前一个任务后才能启动新任务。但是,它的“痛点”之一是,如果其中一个线程在很长时间内一直在执行任务,则整个程序可能会冻结。有些任务不需要计算能力,但会占用处理器时间,可以通过控制另一个线程来更合理地使用这些任务。在同步编程中,无法暂停当前任务以完成其后的任务。异步的作用

什么??必须区分真实异步和输入输出异步。在我们的例子中,我们正在谈论异步输入输出。全球范围内-为了节省时间并更有效地利用生产设施。异步允许您绕过线程的问题区域。在异步输入输出中,当前线程将不等待某个外部事件的执行,而是将控制权交给另一个线程。因此,实际上一次只执行一个线程。拥有控制权的线程进入队列,等待控制权返回。也许到那时,预期的外部事件将发生,并且有可能继续工作。这将允许您在任务之间切换,以最大程度地减少时间浪费。

现在我们可以回到什么ASYNCIO。此库事件循环(事件循环)的操作,其中包括任务队列和循环本身。该周期控制任务的执行,即,它从队列中提取任务并确定将要发生的事情。例如,它可能正在处理I / O任务。即,事件循环选择任务,注册并在适当的时间开始其处理。

协程是特殊的函数,可以将对此任务的控制权返回到事件循环,即将其返回到队列。这些协程必须通过一系列事件精确启动。

也有期货-存储任务执行的当前结果的对象。这可能是有关任务尚未处理或已经获得结果的信息;否则可能会有例外。

通常,Asyncio库是众所周知的,但是它具有Trio可以关闭的许多缺点。

三重奏


根据该库的作者Nathaniel Smith的说法,在开发Trio时,他试图为开发人员创建一个轻便且易于使用的工具,该工具将提供最简单的异步输入/输出和错误处理。

Trio的一个重要功能是异步上下文管理,而Asyncio则没有。为此,作者在Trio中创建了所谓的“苗圃”(护理)-取消区域负责一组线程的原子性(连续性)。关键思想是,如果在“托儿所”中某个协程失败,那么“托儿所”中的所有流程将成功完成或取消。无论如何,结果都是正确的。并且只有当所有协程都完成后,退出函数后,开发人员自己才决定如何进行。

也就是说,“孩子们的”允许您阻止继续进行错误处理,这可能导致以下事实:要么一切都会“下降”,要么结果将是错误的结果。
这正是Asyncio可能发生的情况,因为在Asyncio中,即使发生错误,该过程也不会停止。在这种情况下,首先,开发人员将不知道发生错误时确切发生了什么,其次,处理将继续。

例子


考虑两个竞争功能的最简单示例:

Asyncio

import asyncio

async def foo1():
    print('  foo1: ')
    await asyncio.sleep(2)
    print('  foo1: ')

async def foo2():
    print('  foo2: ')
    await asyncio.sleep(1)
    print('  foo2: ')

loop = asyncio.get_event_loop()
bundle = asyncio.wait([
    loop.create_task(foo1()),
    loop.create_task(foo2()),
])
try:
    loop.run_until_complete(bundle)
finally:
    loop.close()

三重奏

import trio

async def foo1():
    print('  foo1: ')
    await trio.sleep(2)
    print('  foo1: ')

async def foo2():
    print('  foo2: ')
    await trio.sleep(1)
    print('  foo2: ')

async def root():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(foo1)
        nursery.start_soon(foo2)

trio.run(root)

在这两种情况下,结果都是相同的:

foo1: 
foo2: 
foo2: 
foo1: 

从结构上讲,此示例中的Asyncio和Trio代码相似。

明显的区别是Trio不需要显式完成事件循环。

考虑一个更生动的例子。让我们调用Web服务以获得时间戳。

对于Asyncio,我们将另外使用aiohttp

import time
import asyncio
import aiohttp

URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

async def foo(session, i):
    start = time.time()
    async with session.get(URL) as response:
        content = await response.json()
        print(f'{i} | {content.get("time")} (  {time.time() - start})')

async def root():
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.ensure_future(foo(session, i))
            for i in range(MAX_CLIENTS)
        ]
        await asyncio.wait(tasks)
    print(f'  {time.time() - start}')

ioloop = asyncio.get_event_loop()
try:
    ioloop.run_until_complete(root())
finally:
    ioloop.close()

对于Trio,我们使用asks

import trio
import time
import asks
URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

asks.init('trio')

async def foo(i):
    start = time.time()
    response = await asks.get(URL)
    content = response.json()
    print(f'{i} | {content.get("time")} (  {time.time() - start})')

async def root():
    start = time.time()
    async with trio.open_nursery() as nursery:
        for i in range(MAX_CLIENTS):
            nursery.start_soon(foo, i)

    print(f'  {time.time() - start}')

trio.run(root)

在这两种情况下,我们都会得到类似

0 | 1543837647522 (  0.11855053901672363)
2 | 1543837647535 (  0.1389765739440918)
3 | 1543837647527 (  0.13904547691345215)
4 | 1543837647557 (  0.1591191291809082)
1 | 1543837647607 (  0.2100353240966797)
  0.2102828025817871

好。想象一下,在执行协程之一期间
,Asyncio 发生了错误

async def foo(session, i):
    start = time.time()
    if i == 3:
        raise Exception
    async with session.get(URL) as response:
        content = await response.json()
        print(f'{i} | {content.get("time")} (  {time.time() - start})')

1 | 1543839060815 (  0.10857725143432617)
2 | 1543839060844 (  0.10372781753540039)
5 | 1543839060843 (  0.10734415054321289)
4 | 1543839060874 (  0.13985681533813477)
  0.15044045448303223
Traceback (most recent call last):
  File "...py", line 12, in foo
    raise Exception
Exception

三人组

async def foo(i):
    start = time.time()
    response = await asks.get(URL)
    content = response.json()
    if i == 3:
        raise Exception
    print(f'{i} | {content.get("time")} (  {time.time() - start})')


4 | 1543839223372 (  0.13524699211120605)
2 | 1543839223379 (  0.13848185539245605)
Traceback (most recent call last):
  File "...py", line 28, in <module>
    trio.run(root)
  File "/lib64/python3.6/site-packages/trio/_core/_run.py", line 1337, in run
    raise runner.main_task_outcome.error
  File "...py", line 23, in root
    nursery.start_soon(foo, i)
  File "/lib64/python3.6/site-packages/trio/_core/_run.py", line 397, in __aexit__
    raise combined_error_from_nursery
  File "...py", line 15, in foo
    raise Exception
Exception

可以清楚地看到,在Trio中,错误发生后,“取消区域”立即起作用,并且四个不包含错误的任务中的两个被异常终止。

在Asyncio中,所有任务都已完成,然后才出现引用。

在给定的示例中,这并不重要,但让我们想象一下,任务以一种或另一种方式相互依赖,并且任务集必须具有原子属性。在这种情况下,对错误的及时响应变得更加重要。当然,您可以使用await asyncio.wait(任务,return_when = FIRST_EXCEPTION),但是您必须记住正确完成打开的任务。

这是另一个示例:

假设协程同时访问多个相似的Web服务,并且收到的第一个响应很重要。

import asyncio
from asyncio import FIRST_COMPLETED
import aiohttp

URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

async def foo(session):
    async with session.get(URL) as response:
        content = await response.json()
        return content.get("time")

async def root():
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.ensure_future(foo(session))
            for i in range(1, MAX_CLIENTS + 1)
        ]
        done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED)
        print(done.pop().result())
        for future in pending:
            future.cancel()

ioloop = asyncio.get_event_loop()
try:
    ioloop.run_until_complete(root())
except:
    ioloop.close()

一切都非常简单。唯一的要求是记住要完成尚未完成的任务。

在Trio中,执行类似的操作要困难一些,但几乎不可能立即使“尾巴”不可见:

import trio
import asks
URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5
asks.init('trio')

async def foo(session, send_channel, nursery):
    response = await session.request('GET', url=URL)
    content = response.json()
    async with send_channel:
        send_channel.send_nowait(content.get("time"))
    nursery.cancel_scope.cancel()

async def root():
    send_channel, receive_channel = trio.open_memory_channel(1)
    async with send_channel, receive_channel:
        async with trio.open_nursery() as nursery:
            async with asks.Session() as session:
                for i in range(MAX_CLIENTS):
                    nursery.start_soon(foo, session, send_channel.clone(), nursery)

        async with receive_channel:
            x = await receive_channel.receive()
            print(x)

trio.run(root)

Nursery.cancel_scope.cancel() -完成的第一个协程将在撤消区域调用一个函数,该函数将取消所有其他任务,因此无需单独担心它。
的确,为了将协程执行的结果传递给引起它的函数,您将必须启动一个通信通道。

希望这篇比较评论能对Trio的主要功能有所了解。谢谢大家!

All Articles