Trio - Programmation asynchrone pour les personnes

image

Python a une bibliothèque Trio - une bibliothèque de programmation asynchrone.
Découvrir Trio sera surtout intéressant pour ceux qui travaillent sur Asyncio, car c'est une bonne alternative qui vous permet de résoudre certains des problèmes qu'Asyncio ne peut pas gérer. Dans cette revue, nous considérerons ce qu'est le Trio et quelles fonctionnalités il nous offre.

Pour ceux qui commencent tout juste Ă  travailler en programmation asynchrone, je propose de lire une petite introduction sur ce que sont l'asynchronie et le synchronisme.

Synchronisme et asynchronie


Dans la programmation synchrone, toutes les opérations sont effectuées séquentiellement et vous ne pouvez démarrer une nouvelle tâche qu'après avoir terminé la précédente. Cependant, l'un de ses points «douloureux» est que si l'un des threads travaille sur une tâche depuis très longtemps, le programme entier peut se figer. Il existe des tâches qui ne nécessitent pas de puissance de calcul, mais prennent du temps processeur, qui peuvent être utilisées de manière plus rationnelle en donnant le contrôle à un autre thread. Dans la programmation synchrone, il n'y a aucun moyen de suspendre la tâche en cours afin de compléter ce qui suit dans son intervalle.

À quoi sert l' asynchronie?? Il est nécessaire de faire la distinction entre la véritable asynchronie et l'asynchronie entrée-sortie. Dans notre cas, nous parlons d'entrée-sortie asynchrone. À l'échelle mondiale - afin de gagner du temps et d'utiliser plus efficacement les installations de production. L'asynchronie vous permet de contourner les zones problématiques des threads. En entrée-sortie asynchrone, le thread actuel n'attendra pas l'exécution d'un événement externe, mais donnera le contrôle à un autre thread. Ainsi, en fait, un seul thread est exécuté à la fois. Le thread qui a donné le contrôle entre dans la file d'attente et attend que le contrôle y revienne. Peut-être que d'ici là, l'événement externe attendu se produira et il sera possible de continuer à travailler. Cela vous permettra de basculer entre les tâches afin de minimiser la perte de temps.

Et maintenant, nous pouvons revenir à ce qui estAsyncio . Le fonctionnement de cette boucle d'événements de bibliothèque ( boucle d' événements), qui inclut la file d'attente des tâches et la boucle elle-même. Le cycle contrôle l'exécution des tâches, c'est-à-dire qu'il extrait les tâches de la file d'attente et détermine ce qui va leur arriver. Par exemple, il peut gérer des tâches d'E / S. C'est-à-dire que la boucle d'événements sélectionne la tâche, s'enregistre et commence au bon moment son traitement.

Les coroutines sont des fonctions spéciales qui renvoient le contrôle de cette tâche à la boucle d'événements, c'est-à-dire les renvoient à la file d'attente. Il est nécessaire que ces coroutines soient lancées précisément à travers une série d'événements.

Il y a aussi des futurs- objets dans lesquels le résultat actuel de l'exécution d'une tâche est stocké. Il peut s'agir d'informations indiquant que la tâche n'a pas encore été traitée ou que le résultat a déjà été obtenu; ou il peut y avoir une exception.

En général, la bibliothèque Asyncio est bien connue, cependant, elle présente un certain nombre d'inconvénients que Trio est capable de fermer.

Trio


Selon l'auteur de la bibliothèque, Nathaniel Smith , développant Trio, il a cherché à créer un outil léger et facile à utiliser pour le développeur, qui fournirait les entrées / sorties asynchrones les plus simples et la gestion des erreurs.

Une caractéristique importante de Trio est la gestion de contexte asynchrone, qui n'a pas Asyncio. Pour ce faire, l'auteur a créé en Trio la soi-disant "pépinière"(pépinière) - une zone d'annulation qui prend la responsabilité de l'atomicité (continuité) d'un groupe de fils. L'idée clé est que si dans la «pépinière» l'une des coroutines échoue, alors tous les flux dans la «pépinière» seront terminés avec succès ou annulés. Dans tous les cas, le résultat sera correct. Et seulement lorsque toutes les coroutines sont terminées, après avoir quitté la fonction, le développeur décide lui-même comment procéder.

Autrement dit, «enfants» vous permet d'empêcher la poursuite du traitement des erreurs, ce qui peut conduire au fait que tout «tombera» ou que le résultat sera un résultat incorrect.
C'est exactement ce qui peut arriver avec Asyncio, car dans Asyncio le processus ne s'arrête pas, malgré le fait qu'une erreur s'est produite. Et dans ce cas, d'une part, le développeur ne saura pas exactement ce qui s'est passé au moment de l'erreur, et d'autre part, le traitement se poursuivra.

Exemples


Prenons l'exemple le plus simple de deux fonctionnalités concurrentes:

Asyncio

import asyncio

async def foo1():
    print('  foo1: ')
    await asyncio.sleep(2)
    print('  foo1: ')

async def foo2():
    print('  foo2: ')
    await asyncio.sleep(1)
    print('  foo2: ')

loop = asyncio.get_event_loop()
bundle = asyncio.wait([
    loop.create_task(foo1()),
    loop.create_task(foo2()),
])
try:
    loop.run_until_complete(bundle)
finally:
    loop.close()

Trio

import trio

async def foo1():
    print('  foo1: ')
    await trio.sleep(2)
    print('  foo1: ')

async def foo2():
    print('  foo2: ')
    await trio.sleep(1)
    print('  foo2: ')

async def root():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(foo1)
        nursery.start_soon(foo2)

trio.run(root)

dans les deux cas, le résultat sera le même:

foo1: 
foo2: 
foo2: 
foo1: 

Structurellement, le code Asyncio et Trio dans cet exemple est similaire.

La différence évidente est que Trio ne nécessite pas l'achèvement explicite de la boucle d'événements.

Prenons un exemple un peu plus vivant. Appelons le service Web pour obtenir un horodatage.

Pour Asyncio, nous utiliserons en plus aiohttp :

import time
import asyncio
import aiohttp

URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

async def foo(session, i):
    start = time.time()
    async with session.get(URL) as response:
        content = await response.json()
        print(f'{i} | {content.get("time")} (  {time.time() - start})')

async def root():
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.ensure_future(foo(session, i))
            for i in range(MAX_CLIENTS)
        ]
        await asyncio.wait(tasks)
    print(f'  {time.time() - start}')

ioloop = asyncio.get_event_loop()
try:
    ioloop.run_until_complete(root())
finally:
    ioloop.close()

Pour nous Trio utilisation demande :

import trio
import time
import asks
URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

asks.init('trio')

async def foo(i):
    start = time.time()
    response = await asks.get(URL)
    content = response.json()
    print(f'{i} | {content.get("time")} (  {time.time() - start})')

async def root():
    start = time.time()
    async with trio.open_nursery() as nursery:
        for i in range(MAX_CLIENTS):
            nursery.start_soon(foo, i)

    print(f'  {time.time() - start}')

trio.run(root)

Dans les deux cas, nous obtenons quelque chose comme

0 | 1543837647522 (  0.11855053901672363)
2 | 1543837647535 (  0.1389765739440918)
3 | 1543837647527 (  0.13904547691345215)
4 | 1543837647557 (  0.1591191291809082)
1 | 1543837647607 (  0.2100353240966797)
  0.2102828025817871

Bien. Imaginez que lors de l'exécution de l'une des coroutines, une erreur s'est produite
pour Asyncio.

async def foo(session, i):
    start = time.time()
    if i == 3:
        raise Exception
    async with session.get(URL) as response:
        content = await response.json()
        print(f'{i} | {content.get("time")} (  {time.time() - start})')

1 | 1543839060815 (  0.10857725143432617)
2 | 1543839060844 (  0.10372781753540039)
5 | 1543839060843 (  0.10734415054321289)
4 | 1543839060874 (  0.13985681533813477)
  0.15044045448303223
Traceback (most recent call last):
  File "...py", line 12, in foo
    raise Exception
Exception

pour trio

async def foo(i):
    start = time.time()
    response = await asks.get(URL)
    content = response.json()
    if i == 3:
        raise Exception
    print(f'{i} | {content.get("time")} (  {time.time() - start})')


4 | 1543839223372 (  0.13524699211120605)
2 | 1543839223379 (  0.13848185539245605)
Traceback (most recent call last):
  File "...py", line 28, in <module>
    trio.run(root)
  File "/lib64/python3.6/site-packages/trio/_core/_run.py", line 1337, in run
    raise runner.main_task_outcome.error
  File "...py", line 23, in root
    nursery.start_soon(foo, i)
  File "/lib64/python3.6/site-packages/trio/_core/_run.py", line 397, in __aexit__
    raise combined_error_from_nursery
  File "...py", line 15, in foo
    raise Exception
Exception

On voit clairement que dans Trio, immédiatement après l'apparition de l'erreur, la «zone d'annulation» a fonctionné et deux des quatre tâches qui ne contenaient pas d'erreurs ont été anormalement terminées.

Dans Asyncio, toutes les tâches étaient terminées et ce n'est qu'alors que le trackback est apparu.

Dans l'exemple donné, ce n'est pas important, mais imaginons que les tâches d'une manière ou d'une autre dépendent les unes des autres, et l'ensemble des tâches doit avoir la propriété de l'atomicité. Dans ce cas, une réponse rapide à une erreur devient beaucoup plus importante. Bien sûr, vous pouvez utiliser attendent asyncio.wait (tâches, return_when = FIRST_EXCEPTION) , mais vous devez vous rappeler de terminer correctement les tâches ouvertes.

Voici un autre exemple:

supposons que les coroutines accèdent simultanément à plusieurs services Web similaires et que la première réponse reçue soit importante.

import asyncio
from asyncio import FIRST_COMPLETED
import aiohttp

URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

async def foo(session):
    async with session.get(URL) as response:
        content = await response.json()
        return content.get("time")

async def root():
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.ensure_future(foo(session))
            for i in range(1, MAX_CLIENTS + 1)
        ]
        done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED)
        print(done.pop().result())
        for future in pending:
            future.cancel()

ioloop = asyncio.get_event_loop()
try:
    ioloop.run_until_complete(root())
except:
    ioloop.close()

Tout est assez simple. La seule exigence est de ne pas oublier d'effectuer des tâches qui n'ont pas été terminées.

En Trio, lancer une manœuvre similaire est un peu plus difficile, mais il est presque impossible de laisser les «queues» invisibles tout de suite:

import trio
import asks
URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5
asks.init('trio')

async def foo(session, send_channel, nursery):
    response = await session.request('GET', url=URL)
    content = response.json()
    async with send_channel:
        send_channel.send_nowait(content.get("time"))
    nursery.cancel_scope.cancel()

async def root():
    send_channel, receive_channel = trio.open_memory_channel(1)
    async with send_channel, receive_channel:
        async with trio.open_nursery() as nursery:
            async with asks.Session() as session:
                for i in range(MAX_CLIENTS):
                    nursery.start_soon(foo, session, send_channel.clone(), nursery)

        async with receive_channel:
            x = await receive_channel.receive()
            print(x)

trio.run(root)

nursery.cancel_scope.cancel () - la première coroutine qui se termine appellera une fonction dans la zone d'annulation qui annulera toutes les autres tâches, il n'est donc pas nécessaire de s'en préoccuper séparément.
Certes, pour transférer le résultat de l'exécution de la coroutine vers la fonction qui l'a provoqué, vous devrez initier un canal de communication.

J'espère que cette revue comparative a permis de comprendre les principales caractéristiques de Trio. Merci à tous!

All Articles