Trio - Asynchrone Programmierung für Menschen

Bild

Python hat eine Trio- Bibliothek - eine asynchrone Programmierbibliothek.
Das Kennenlernen von Trio ist vor allem für diejenigen interessant, die an Asyncio arbeiten, da es eine gute Alternative ist, mit der Sie einige der Probleme lösen können, die Asyncio nicht lösen kann. In diesem Test werden wir uns überlegen, was Trio ist und welche Funktionen es uns bietet.

Für diejenigen, die gerade erst mit der Arbeit in der asynchronen Programmierung beginnen, schlage ich vor, eine kleine Einführung zu Asynchronität und Synchronität zu lesen.

Synchronität und Asynchronität


Bei der synchronen Programmierung werden alle Vorgänge nacheinander ausgeführt, und Sie können eine neue Aufgabe erst starten, nachdem Sie die vorherige abgeschlossen haben. Einer der „Schmerzpunkte“ ist jedoch, dass das gesamte Programm einfrieren kann, wenn einer der Threads sehr lange an einer Aufgabe gearbeitet hat. Es gibt Aufgaben, die keine Rechenleistung erfordern, aber Prozessorzeit in Anspruch nehmen. Diese können rationaler verwendet werden, indem einem anderen Thread die Kontrolle übertragen wird. Bei der synchronen Programmierung gibt es keine Möglichkeit, die aktuelle Aufgabe anzuhalten, um Folgendes in ihrer Lücke zu vervollständigen.

Wofür ist Asynchronität?? Es muss zwischen echter Asynchronität und Eingabe-Ausgabe-Asynchronität unterschieden werden. In unserem Fall handelt es sich um eine asynchrone Eingabe / Ausgabe. Weltweit - um Zeit zu sparen und Produktionsanlagen effizienter zu nutzen. Mit Asynchronität können Sie die Problembereiche von Threads umgehen. Bei asynchroner Eingabe / Ausgabe wartet der aktuelle Thread nicht auf die Ausführung eines externen Ereignisses, sondern gibt einem anderen Thread die Kontrolle. Somit wird tatsächlich immer nur ein Thread gleichzeitig ausgeführt. Der Thread, der die Kontrolle gegeben hat, wird in die Warteschlange gestellt und wartet darauf, dass die Kontrolle zu ihr zurückkehrt. Möglicherweise tritt zu diesem Zeitpunkt das erwartete externe Ereignis ein und es ist möglich, weiter zu arbeiten. Auf diese Weise können Sie zwischen Aufgaben wechseln, um Zeitverschwendung zu minimieren.

Und jetzt können wir zu dem zurückkehren, was istAsyncio . Der Betrieb dieser Bibliothek Ereignisschleife (Ereignisschleife), die die Aufgabenwarteschlange und die Schleife selbst enthält. Der Zyklus steuert die Ausführung von Aufgaben, dh er zieht Aufgaben aus der Warteschlange und bestimmt, was damit geschehen wird. Beispielsweise werden möglicherweise E / A-Aufgaben verarbeitet. Das heißt, die Ereignisschleife wählt die Aufgabe aus, registriert sich und beginnt zum richtigen Zeitpunkt mit ihrer Verarbeitung.

Coroutinen sind spezielle Funktionen, die die Steuerung dieser Aufgabe an die Ereignisschleife zurückgeben, dh an die Warteschlange zurückgeben. Es ist notwendig, dass diese Coroutinen genau durch eine Reihe von Ereignissen gestartet werden.

Es gibt auch Futures- Objekte, in denen das aktuelle Ergebnis der Ausführung einer Aufgabe gespeichert ist. Dies kann eine Information sein, dass die Aufgabe noch nicht verarbeitet wurde oder das Ergebnis bereits erhalten wurde. oder es kann eine Ausnahme geben.

Im Allgemeinen ist die Asyncio-Bibliothek bekannt, weist jedoch eine Reihe von Nachteilen auf, die Trio schließen kann.

Trio


Laut dem Autor der Bibliothek, Nathaniel Smith , wollte er bei der Entwicklung von Trio ein leichtes und einfach zu verwendendes Tool für den Entwickler entwickeln, das die einfachste asynchrone Eingabe / Ausgabe und Fehlerbehandlung bietet.

Ein wichtiges Merkmal von Trio ist das asynchrone Kontextmanagement, über das Asyncio nicht verfügt. Zu diesem Zweck schuf der Autor im Trio den sogenannten "Kindergarten"(Kindergarten) - ein Bereich der Löschung, der die Verantwortung für die Atomizität (Kontinuität) einer Gruppe von Fäden übernimmt. Die Schlüsselidee ist, dass wenn im „Kindergarten“ eine der Coroutinen ausfällt, alle Abläufe im „Kindergarten“ entweder erfolgreich abgeschlossen oder abgebrochen werden. In jedem Fall ist das Ergebnis korrekt. Und erst wenn alle Coroutinen abgeschlossen sind, entscheidet der Entwickler nach dem Beenden der Funktion selbst, wie er vorgehen soll.

Das heißt, mit "Kinder" können Sie die Fortsetzung der Fehlerverarbeitung verhindern, was dazu führen kann, dass entweder alles "fällt" oder das Ergebnis ein falsches Ergebnis ist.
Genau dies kann mit Asyncio passieren, da in Asyncio der Prozess trotz der Tatsache, dass ein Fehler aufgetreten ist, nicht gestoppt wird. In diesem Fall weiß der Entwickler erstens nicht, was genau zum Zeitpunkt des Fehlers passiert ist, und zweitens wird die Verarbeitung fortgesetzt.

Beispiele


Betrachten Sie das einfachste Beispiel für zwei konkurrierende Funktionen:

Asyncio

import asyncio

async def foo1():
    print('  foo1: ')
    await asyncio.sleep(2)
    print('  foo1: ')

async def foo2():
    print('  foo2: ')
    await asyncio.sleep(1)
    print('  foo2: ')

loop = asyncio.get_event_loop()
bundle = asyncio.wait([
    loop.create_task(foo1()),
    loop.create_task(foo2()),
])
try:
    loop.run_until_complete(bundle)
finally:
    loop.close()

Trio

import trio

async def foo1():
    print('  foo1: ')
    await trio.sleep(2)
    print('  foo1: ')

async def foo2():
    print('  foo2: ')
    await trio.sleep(1)
    print('  foo2: ')

async def root():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(foo1)
        nursery.start_soon(foo2)

trio.run(root)

In beiden Fällen ist das Ergebnis dasselbe:

foo1: 
foo2: 
foo2: 
foo1: 

Strukturell ist der Asyncio und Trio Code in diesem Beispiel ähnlich.

Der offensichtliche Unterschied besteht darin, dass Trio nicht die explizite Vervollständigung der Ereignisschleife erfordert.

Betrachten Sie ein etwas lebhafteres Beispiel. Rufen wir den Webdienst an, um einen Zeitstempel zu erhalten.

Für Asyncio verwenden wir zusätzlich aiohttp :

import time
import asyncio
import aiohttp

URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

async def foo(session, i):
    start = time.time()
    async with session.get(URL) as response:
        content = await response.json()
        print(f'{i} | {content.get("time")} (  {time.time() - start})')

async def root():
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.ensure_future(foo(session, i))
            for i in range(MAX_CLIENTS)
        ]
        await asyncio.wait(tasks)
    print(f'  {time.time() - start}')

ioloop = asyncio.get_event_loop()
try:
    ioloop.run_until_complete(root())
finally:
    ioloop.close()

Für Trio verwenden wir fragt :

import trio
import time
import asks
URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

asks.init('trio')

async def foo(i):
    start = time.time()
    response = await asks.get(URL)
    content = response.json()
    print(f'{i} | {content.get("time")} (  {time.time() - start})')

async def root():
    start = time.time()
    async with trio.open_nursery() as nursery:
        for i in range(MAX_CLIENTS):
            nursery.start_soon(foo, i)

    print(f'  {time.time() - start}')

trio.run(root)

In beiden Fällen bekommen wir so etwas wie

0 | 1543837647522 (  0.11855053901672363)
2 | 1543837647535 (  0.1389765739440918)
3 | 1543837647527 (  0.13904547691345215)
4 | 1543837647557 (  0.1591191291809082)
1 | 1543837647607 (  0.2100353240966797)
  0.2102828025817871

Gut. Stellen Sie sich vor, während der Ausführung einer der Coroutinen ist ein Fehler
für Asyncio aufgetreten .

async def foo(session, i):
    start = time.time()
    if i == 3:
        raise Exception
    async with session.get(URL) as response:
        content = await response.json()
        print(f'{i} | {content.get("time")} (  {time.time() - start})')

1 | 1543839060815 (  0.10857725143432617)
2 | 1543839060844 (  0.10372781753540039)
5 | 1543839060843 (  0.10734415054321289)
4 | 1543839060874 (  0.13985681533813477)
  0.15044045448303223
Traceback (most recent call last):
  File "...py", line 12, in foo
    raise Exception
Exception

für Trio

async def foo(i):
    start = time.time()
    response = await asks.get(URL)
    content = response.json()
    if i == 3:
        raise Exception
    print(f'{i} | {content.get("time")} (  {time.time() - start})')


4 | 1543839223372 (  0.13524699211120605)
2 | 1543839223379 (  0.13848185539245605)
Traceback (most recent call last):
  File "...py", line 28, in <module>
    trio.run(root)
  File "/lib64/python3.6/site-packages/trio/_core/_run.py", line 1337, in run
    raise runner.main_task_outcome.error
  File "...py", line 23, in root
    nursery.start_soon(foo, i)
  File "/lib64/python3.6/site-packages/trio/_core/_run.py", line 397, in __aexit__
    raise combined_error_from_nursery
  File "...py", line 15, in foo
    raise Exception
Exception

Es ist deutlich zu sehen, dass in Trio unmittelbar nach dem Auftreten des Fehlers der "Abbruchbereich" funktionierte und zwei der vier Aufgaben, die keine Fehler enthielten, abnormal beendet wurden.

In Asyncio wurden alle Aufgaben abgeschlossen, und erst dann wurde der Trackback angezeigt.

Im gegebenen Beispiel ist dies nicht wichtig, aber stellen wir uns vor, dass die Aufgaben auf die eine oder andere Weise voneinander abhängen und die Aufgaben die Eigenschaft der Atomizität haben müssen. In diesem Fall wird die rechtzeitige Reaktion auf einen Fehler viel wichtiger. Natürlich können Sie await asyncio.wait (Aufgaben, return_when = FIRST_EXCEPTION) verwenden , aber Sie müssen daran denken, geöffnete Aufgaben korrekt abzuschließen.

Hier ist ein weiteres Beispiel:

Angenommen, Coroutinen greifen gleichzeitig auf mehrere ähnliche Webdienste zu, und die erste empfangene Antwort ist wichtig.

import asyncio
from asyncio import FIRST_COMPLETED
import aiohttp

URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5

async def foo(session):
    async with session.get(URL) as response:
        content = await response.json()
        return content.get("time")

async def root():
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.ensure_future(foo(session))
            for i in range(1, MAX_CLIENTS + 1)
        ]
        done, pending = await asyncio.wait(tasks, return_when=FIRST_COMPLETED)
        print(done.pop().result())
        for future in pending:
            future.cancel()

ioloop = asyncio.get_event_loop()
try:
    ioloop.run_until_complete(root())
except:
    ioloop.close()

Alles ist ziemlich einfach. Die einzige Voraussetzung ist, sich daran zu erinnern, Aufgaben zu erledigen, die noch nicht erledigt wurden.

In Trio ist es etwas schwieriger, ein ähnliches Manöver zu starten, aber es ist fast unmöglich, die „Schwänze“ sofort unsichtbar zu lassen:

import trio
import asks
URL = 'https://yandex.ru/time/sync.json?geo=213'
MAX_CLIENTS = 5
asks.init('trio')

async def foo(session, send_channel, nursery):
    response = await session.request('GET', url=URL)
    content = response.json()
    async with send_channel:
        send_channel.send_nowait(content.get("time"))
    nursery.cancel_scope.cancel()

async def root():
    send_channel, receive_channel = trio.open_memory_channel(1)
    async with send_channel, receive_channel:
        async with trio.open_nursery() as nursery:
            async with asks.Session() as session:
                for i in range(MAX_CLIENTS):
                    nursery.start_soon(foo, session, send_channel.clone(), nursery)

        async with receive_channel:
            x = await receive_channel.receive()
            print(x)

trio.run(root)

nursery.cancel_scope.cancel () - Die erste abgeschlossene Coroutine ruft eine Funktion im Rückgängig-Bereich auf, die alle anderen Aufgaben abbricht, sodass Sie sich nicht separat darum kümmern müssen.
Richtig, um das Ergebnis der Coroutine-Ausführung auf die Funktion zu übertragen, die es verursacht hat, müssen Sie einen Kommunikationskanal initiieren.

Hoffentlich hat diese vergleichende Überprüfung ein Verständnis der Hauptmerkmale von Trio geliefert. Danke an alle!

All Articles