Bonjour, Habr! Je veux dire comment j'ai résolu le problème de l'exécution compétitive efficace des tâches asynchrones dans Celery.
introduction
Le céleri est un grand projet avec une histoire complexe et un lourd fardeau de compatibilité descendante. Les décisions architecturales clés ont été prises bien avant qu'asyncio n'apparaisse en python. Par conséquent, il est encore plus intéressant de savoir que vous pouvez exécuter la tâche asyncio dans le céleri hors de la boîte.Exécuter des tâches asynchrones dans du céleri vanillé
Vous pouvez démarrer la tâche asyncio à partir de la boîte:import asyncio
from .celeryapp import celeryapp
async def async_task():
await asyncio.sleep(42)
@celeryapp.task
def regular_task():
coro = async_task()
asyncio.run(coro)
Avantages évidents:- Ça marche!
- Simplement
- Pas de dépendances externes supplémentaires
Qu'est-ce qui ne va pas?- La boucle d'événement est créée à l'intérieur de chaque travailleur.
- Il n'y a pas de changement de contexte entre l'exécution des coroutines
- À un moment donné, le travailleur n'exécute pas plus d'une coroutine
- Les ressources partagées ne sont pas partagées entre les tâches
- Chaudière
Autrement dit, asyncio dans ce cas est utilisé pour asyncio, mais il n'y a aucun avantageEssayons plus délicat?
Je me suis battu pour la performance:import asyncio
import threading
from .celeryapp import celeryapp
celeryapp.loop = asyncio.get_event_loop()
celeryapp.loop_runner = threading.Thread(
target=celeryapp.loop.run_forever,
daemon=True,
)
celeryapp.loop_runner.start()
async def async_task():
await asyncio.sleep(42)
@celeryapp.task
def regular_task():
coro = async_task()
asyncio.run_coroutine_threadsafe(
coro=coro,
loop=celeryapp.loop,
)
Bingo! Avantages:- Je travaille encore
- Encore plus ou moins efficace
- Même les ressources se mélangent entre les coroutines du travailleur.
- Il n'y a toujours pas de dépendances externes supplémentaires
Bingo? Les problèmes ne tardèrent pas à venir:- Le céleri ne sait rien de l'exécution de la corutine
- Vous perdez le contrôle de l'exécution des tâches
- Vous perdez le contrôle des exceptions
- Chaudière
Pour aller avec une solution d'ingénierie aussi merveilleuse en prod, ma main ne s'est pas levéeFormulation du problème
- Devrait marcher
- Les coroutines doivent être compétitives
- Les ressources doivent être partagées entre plusieurs exécutables
- Pas de passe-partout
- API simple et prévisible
Autrement dit, mes attentes:import asyncio
from .celeryapp import celeryapp
@celeryapp.task
async def async_task():
await asyncio.sleep(42)
Sommaire
J'ai implémenté mes idées dans la bibliothèque celery-pool-asyncio . Cette bibliothèque est utilisée par notre équipe dans le projet en cours, et nous avons déjà déployé à la prod.Caractéristiques brièvement
Outre l'exécution directe des tâches asynchrones, celery-pool-asyncio résout également les problèmes:- Délestage de tâches asynchrone
- Prise en charge de la corutine dans les signaux de céleri
Afin de faire fonctionner céleri-pool-asyncio, j'ai utilisé le patch de singe . Pour chaque patch utilisé en runtime, il est possible de le désactiver.Vous pouvez en savoir plus sur tout cela dans la documentation.Des plans
Dans le bon sens, vous devez intégrer la piscine au céleri. D'un autre côté, les développeurs de céleri prototypent le céleri 5.0, qui sera asynchrone. Le jeu en vaut-il la chandelle?Exemples
Pour démontrer les capacités de mes bibliothèques celery-pool-asyncio et celery-decorator-taskcls ( article ), un projet de test a été implémenté .Autre
J'ai essayé de dire la même chose sur le mitap