Sellerie + Asyncio

Hallo Habr! Ich möchte erzählen, wie ich das Problem der effektiven Ausführung von Asyncio-Aufgaben in Sellerie im Wettbewerb gelöst habe.

KDPV

Einführung


Sellerie ist ein großes Projekt mit einer komplexen Geschichte und einer hohen Belastung durch Abwärtskompatibilität. Wichtige architektonische Entscheidungen wurden getroffen, lange bevor Asyncio in Python erschien. Umso interessanter ist es, dass Sie die Asyncio-Aufgabe in Sellerie sofort ausführen können.

Bohrmodus ein
Formal nach # 839 . Subjektiv ändert das Umbenennen eines Pakets für mich nichts an der Architektur der Anwendung. Bohrmodus aus.

Führen Sie Asyncio-Aufgaben in Vanille-Sellerie aus


Sie können die Asyncio-Aufgabe über das folgende Feld starten:

import asyncio

from .celeryapp import celeryapp


async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run(coro)

Offensichtliche Vorteile:

  • Es klappt!
  • Einfach
  • Keine zusätzlichen externen Abhängigkeiten

Was ist eigentlich falsch?

  • In jedem Worker wird eine Ereignisschleife erstellt.
  • Es gibt keinen Kontextwechsel zwischen der Ausführung von Coroutinen
  • Zu einem bestimmten Zeitpunkt führt der Worker nicht mehr als eine Coroutine aus
  • Freigegebene Ressourcen werden nicht zwischen Aufgaben geteilt
  • Boilerplate

Das heißt, Asyncio wird in diesem Fall für Asyncio verwendet, es gibt jedoch keine Vorteile

Versuchen wir es kniffliger?


Ich habe um Leistung gekämpft:

import asyncio
import threading

from .celeryapp import celeryapp

celeryapp.loop = asyncio.get_event_loop()
celeryapp.loop_runner = threading.Thread(
    target=celeryapp.loop.run_forever,
    daemon=True,
)
celeryapp.loop_runner.start()

async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run_coroutine_threadsafe(
        coro=coro,
        loop=celeryapp.loop,
    )

Bingo! Vorteile:

  • Immer noch am arbeiten
  • Noch mehr oder weniger effektiv
  • Sogar Ressourcen mischen zwischen den Coroutinen innerhalb des Arbeiters.
  • Es gibt noch keine zusätzlichen externen Abhängigkeiten

Bingo? Probleme ließen nicht lange auf sich warten:

  • Sellerie weiß nichts über Corutin
  • Sie verlieren die Kontrolle über die Ausführung von Aufgaben
  • Sie verlieren die Kontrolle über Ausnahmen
  • Boilerplate

Um mit solch einer wunderbaren technischen Lösung in Prod zu gehen, hob sich meine Hand irgendwie nicht

Formulierung des Problems


  • Sollte arbeiten
  • Coroutinen müssen wettbewerbsfähig sein
  • Ressourcen sollten von vielen ausführbaren Dateien gemeinsam genutzt werden
  • Kein Boilerplate
  • Einfache vorhersehbare API

Das heißt, meine Erwartungen:

import asyncio

from .celeryapp import celeryapp


@celeryapp.task
async def async_task():
    await asyncio.sleep(42)

Zusammenfassung


Ich habe meine Ideen in der Sellerie-Pool-Asyncio-Bibliothek umgesetzt . Diese Bibliothek wird von unserem Team im aktuellen Projekt verwendet, und wir haben bereits das Produkt eingeführt.

Features kurz


Sellerie-Pool-Asyncio führt nicht nur Asyncio-Aufgaben direkt aus, sondern löst auch folgende Probleme:

  • Asynchrones Task-Shedding
  • Corutin-Unterstützung bei Selleriesignalen

Um Sellerie-Pool-Asyncio zum Laufen zu bringen, habe ich Affen-Patches verwendet . Für jeden zur Laufzeit verwendeten Patch kann er deaktiviert werden.

Weitere Informationen hierzu finden Sie in der Dokumentation.

Pläne


Auf eine gute Weise müssen Sie den Pool in Sellerie integrieren. Auf der anderen Seite entwickeln Sellerieentwickler Prototypen für Sellerie 5.0, die asynchron sein werden. Ist das Spiel die Kerze wert?

Beispiele


Um die Fähigkeiten meiner Bibliotheken für Sellerie-Pool-Asyncio und Sellerie-Dekorator-Taskcls ( Artikel ) zu demonstrieren , wurde ein Testprojekt implementiert .

Andere


Ich habe versucht, dasselbe auf dem Mitap zu sagen


All Articles