Céleri + asyncio

Bonjour, Habr! Je veux dire comment j'ai résolu le problème de l'exécution compétitive efficace des tâches asynchrones dans Celery.

KDPV

introduction


Le céleri est un grand projet avec une histoire complexe et un lourd fardeau de compatibilité descendante. Les décisions architecturales clés ont été prises bien avant qu'asyncio n'apparaisse en python. Par conséquent, il est encore plus intéressant de savoir que vous pouvez exécuter la tâche asyncio dans le céleri hors de la boîte.

Mode alésage activé
#839. . mode off.

Exécuter des tâches asynchrones dans du céleri vanillé


Vous pouvez démarrer la tâche asyncio à partir de la boîte:

import asyncio

from .celeryapp import celeryapp


async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run(coro)

Avantages évidents:

  • Ça marche!
  • Simplement
  • Pas de dépendances externes supplémentaires

Qu'est-ce qui ne va pas?

  • La boucle d'événement est créée à l'intérieur de chaque travailleur.
  • Il n'y a pas de changement de contexte entre l'exécution des coroutines
  • À un moment donné, le travailleur n'exécute pas plus d'une coroutine
  • Les ressources partagées ne sont pas partagées entre les tâches
  • Chaudière

Autrement dit, asyncio dans ce cas est utilisé pour asyncio, mais il n'y a aucun avantage

Essayons plus délicat?


Je me suis battu pour la performance:

import asyncio
import threading

from .celeryapp import celeryapp

celeryapp.loop = asyncio.get_event_loop()
celeryapp.loop_runner = threading.Thread(
    target=celeryapp.loop.run_forever,
    daemon=True,
)
celeryapp.loop_runner.start()

async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run_coroutine_threadsafe(
        coro=coro,
        loop=celeryapp.loop,
    )

Bingo! Avantages:

  • Je travaille encore
  • Encore plus ou moins efficace
  • Même les ressources se mélangent entre les coroutines du travailleur.
  • Il n'y a toujours pas de dépendances externes supplémentaires

Bingo? Les problèmes ne tardèrent pas à venir:

  • Le céleri ne sait rien de l'exécution de la corutine
  • Vous perdez le contrôle de l'exécution des tâches
  • Vous perdez le contrôle des exceptions
  • Chaudière

Pour aller avec une solution d'ingénierie aussi merveilleuse en prod, ma main ne s'est pas levée

Formulation du problème


  • Devrait marcher
  • Les coroutines doivent être compétitives
  • Les ressources doivent être partagées entre plusieurs exécutables
  • Pas de passe-partout
  • API simple et prévisible

Autrement dit, mes attentes:

import asyncio

from .celeryapp import celeryapp


@celeryapp.task
async def async_task():
    await asyncio.sleep(42)

Sommaire


J'ai implémenté mes idées dans la bibliothèque celery-pool-asyncio . Cette bibliothèque est utilisée par notre équipe dans le projet en cours, et nous avons déjà déployé à la prod.

Caractéristiques brièvement


Outre l'exécution directe des tâches asynchrones, celery-pool-asyncio résout également les problèmes:

  • Délestage de tâches asynchrone
  • Prise en charge de la corutine dans les signaux de céleri

Afin de faire fonctionner céleri-pool-asyncio, j'ai utilisé le patch de singe . Pour chaque patch utilisé en runtime, il est possible de le désactiver.

Vous pouvez en savoir plus sur tout cela dans la documentation.

Des plans


Dans le bon sens, vous devez intégrer la piscine au céleri. D'un autre côté, les développeurs de céleri prototypent le céleri 5.0, qui sera asynchrone. Le jeu en vaut-il la chandelle?

Exemples


Pour démontrer les capacités de mes bibliothèques celery-pool-asyncio et celery-decorator-taskcls ( article ), un projet de test a été implémenté .

Autre


J'ai essayé de dire la même chose sur le mitap


All Articles