Trio - Asynchronous Programming for People

image

Python has a Trio library - an asynchronous programming library.
Getting to know Trio will mainly be interesting for those who work on Asyncio, because it is a good alternative that allows you to solve some of the problems that Asyncio cannot handle. In this review, we will consider what Trio is and what features it gives us.

For those who are just starting work in asynchronous programming, I propose to read a small introduction on what asynchrony and synchronism are.

Synchronism and asynchrony


In synchronous programming, all operations are performed sequentially, and you can start a new task only after completing the previous one. However, one of its “pain” points is that if one of the threads has been working on a task for a very long time, the entire program may freeze. There are tasks that do not require computing power, but take up processor time, which can be used more rationally by giving control to another thread. In synchronous programming, there is no way to pause the current task in order to complete the following in its gap.

What is asynchrony for ?? It is necessary to distinguish between true asynchrony and input-output asynchrony. In our case, we are talking about asynchronous input-output. Globally - in order to save time and more efficiently use production facilities. Asynchrony allows you to bypass the problem areas of threads. In asynchronous input-output, the current thread will not wait for the execution of some external event, but will give control to another thread. Thus, in fact, only one thread is executed at a time. The thread that has given control goes into the queue and waits for control to return to it. Perhaps, by that time, the expected external event will occur and it will be possible to continue working. This will allow you to switch between tasks in order to minimize the waste of time.

And now we can go back to what isAsyncio . The operation of this library event loop (event loop), which includes the task queue and the loop itself. The cycle controls the execution of tasks, namely, it draws tasks from the queue and determines what will happen to it. For example, it could be handling I / O tasks. That is, the event loop selects the task, registers and at the right time starts its processing.

Coroutines are special functions that return control of this task back to the event loop, that is, return them to the queue. It is necessary that these coroutines be launched precisely through a series of events.

Also there are futures- objects in which the current result of the execution of a task is stored. This may be information that the task has not yet been processed or the result has already been obtained; or there may be an exception.

In general, the Asyncio library is well known, however, it has a number of drawbacks that Trio is capable of closing.

Trio


According to the author of the library, Nathaniel Smith , when developing Trio, he sought to create a lightweight and easily used tool for the developer, which would provide the simplest asynchronous input / output and error handling.

An important feature of Trio is asynchronous context management, which Asyncio does not have. To do this, the author created in Trio the so-called "nursery"(nursery) - an area of ​​cancellation that takes responsibility for the atomicity (continuity) of a group of threads. The key idea is that if in the “nursery” one of the coroutines fails, then all flows in the “nursery” will either be successfully completed or canceled. In any case, the result will be correct. And only when all the coroutines are completed, after exiting the function, the developer himself decides how to proceed.

That is, “children’s” allows you to prevent the continuation of error processing, which can lead to the fact that either everything will “fall” or the result will be an incorrect result.
This is exactly what can happen with Asyncio, because in Asyncio the process does not stop, despite the fact that an error occurred. And in this case, firstly, the developer will not know what exactly happened at the time of the error, and secondly, the processing will continue.

Examples


Consider the simplest example of two competing features:

Asyncio

import asyncio

async def foo1():
    print('  foo1: ')
    await asyncio.sleep(2)
    print('  foo1: ')

async def foo2():
    print('  foo2: ')
    await asyncio.sleep(1)
    print('  foo2: ')

loop = asyncio.get_event_loop()
bundle = asyncio.wait([
    loop.create_task(foo1()),
    loop.create_task(foo2()),
])
try:
    loop.run_until_complete(bundle)
finally:
    loop.close()

Trio

import trio

async def foo1():
    print('  foo1: ')
    await trio.sleep(2)
    print('  foo1: ')

async def foo2():
    print('  foo2: ')
    await trio.sleep(1)
    print('  foo2: ')

async def root():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(foo1)
        nursery.start_soon(foo2)

trio.run(root)

in both cases the result will be the same:

foo1: 
foo2: 
foo2: 
foo1: 

Structurally, the Asyncio and Trio code in this example is similar.

The obvious difference is that Trio does not require the explicit completion of the event loop.

Consider a slightly more lively example. Let's make a call to the web service to get a timestamp.

For Asyncio we will use additionally aiohttp :

import time
import asyncio
import aiohttp

URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

async def foo(session, i):
    start = time.time()
    async with session.get(URL) as response:
        content = await response.json()
        print(f'{i} | {content.get("time")} (  {time.time() - start})')

async def root():
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.ensure_future(foo(session, i))
            for i in range(MAX_CLIENTS)
        ]
        await asyncio.wait(tasks)
    print(f'  {time.time() - start}')

ioloop = asyncio.get_event_loop()
try:
    ioloop.run_until_complete(root())
finally:
    ioloop.close()

For Trio we use asks :

import trio
import time
import asks
URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

asks.init('trio')

async def foo(i):
    start = time.time()
    response = await asks.get(URL)
    content = response.json()
    print(f'{i} | {content.get("time")} (  {time.time() - start})')

async def root():
    start = time.time()
    async with trio.open_nursery() as nursery:
        for i in range(MAX_CLIENTS):
            nursery.start_soon(foo, i)

    print(f'  {time.time() - start}')

trio.run(root)

In both cases, we get something like

0 | 1543837647522 (  0.11855053901672363)
2 | 1543837647535 (  0.1389765739440918)
3 | 1543837647527 (  0.13904547691345215)
4 | 1543837647557 (  0.1591191291809082)
1 | 1543837647607 (  0.2100353240966797)
  0.2102828025817871

Good. Imagine that an error occurred
for Asyncio during the execution of one of the corutins.

async def foo(session, i):
    start = time.time()
    if i == 3:
        raise Exception
    async with session.get(URL) as response:
        content = await response.json()
        print(f'{i} | {content.get("time")} (  {time.time() - start})')

1 | 1543839060815 (  0.10857725143432617)
2 | 1543839060844 (  0.10372781753540039)
5 | 1543839060843 (  0.10734415054321289)
4 | 1543839060874 (  0.13985681533813477)
  0.15044045448303223
Traceback (most recent call last):
  File "...py", line 12, in foo
    raise Exception
Exception

for trio

async def foo(i):
    start = time.time()
    response = await asks.get(URL)
    content = response.json()
    if i == 3:
        raise Exception
    print(f'{i} | {content.get("time")} (  {time.time() - start})')


4 | 1543839223372 (  0.13524699211120605)
2 | 1543839223379 (  0.13848185539245605)
Traceback (most recent call last):
  File "...py", line 28, in <module>
    trio.run(root)
  File "/lib64/python3.6/site-packages/trio/_core/_run.py", line 1337, in run
    raise runner.main_task_outcome.error
  File "...py", line 23, in root
    nursery.start_soon(foo, i)
  File "/lib64/python3.6/site-packages/trio/_core/_run.py", line 397, in __aexit__
    raise combined_error_from_nursery
  File "...py", line 15, in foo
    raise Exception
Exception

It is clearly seen that in Trio, immediately after the occurrence of the error, the “cancel area” worked, and two of the four tasks that did not contain errors were abnormally terminated.

In Asyncio, all the tasks were completed, and only then did the trackback appear.

In the given example, this is not important, but let us imagine that the tasks in one way or another depend on each other, and the set of tasks must have the property of atomicity. In this case, timely response to an error becomes much more important. Of course, you can use await asyncio.wait (tasks, return_when = FIRST_EXCEPTION) , but you must remember to correctly complete open tasks.

And here is another example:

Suppose that coroutines simultaneously access several similar web services, and the first answer received is important.

import asyncio
from asyncio import FIRST_COMPLETED
import aiohttp

URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

async def foo(session):
    async with session.get(URL) as response:
        content = await response.json()
        return content.get("time")

async def root():
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.ensure_future(foo(session))
            for i in range(1, MAX_CLIENTS + 1)
        ]
        done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED)
        print(done.pop().result())
        for future in pending:
            future.cancel()

ioloop = asyncio.get_event_loop()
try:
    ioloop.run_until_complete(root())
except:
    ioloop.close()

Everything is pretty simple. The only requirement is to remember to complete tasks that have not been completed.

In Trio, cranking a similar maneuver is a little more difficult, but it is almost impossible to leave the “tails” invisible right away:

import trio
import asks
URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5
asks.init('trio')

async def foo(session, send_channel, nursery):
    response = await session.request('GET', url=URL)
    content = response.json()
    async with send_channel:
        send_channel.send_nowait(content.get("time"))
    nursery.cancel_scope.cancel()

async def root():
    send_channel, receive_channel = trio.open_memory_channel(1)
    async with send_channel, receive_channel:
        async with trio.open_nursery() as nursery:
            async with asks.Session() as session:
                for i in range(MAX_CLIENTS):
                    nursery.start_soon(foo, session, send_channel.clone(), nursery)

        async with receive_channel:
            x = await receive_channel.receive()
            print(x)

trio.run(root)

nursery.cancel_scope.cancel () - the first coroutine that completes will call a function in the undo area that will cancel all other tasks, so there is no need to worry about it separately.
True, in order to transfer the result of coroutine execution to the function that caused it, you will have to initiate a communication channel.

Hopefully this comparative review has provided an understanding of the main features of Trio. Thanks to all!

All Articles