Hello, Habr! I want to tell how I solved the problem of effective competitive execution of asyncio tasks in Celery.
Introduction
Celery is a large project with a complex history and a heavy burden of backward compatibility. Key architectural decisions were made long before asyncio appeared in python. Therefore, it is even more interesting that somehow you can run the asyncio task in celery out of the box.Run asyncio tasks in vanilla celery
You can start the asyncio task from the box:import asyncio
from .celeryapp import celeryapp
async def async_task():
await asyncio.sleep(42)
@celeryapp.task
def regular_task():
coro = async_task()
asyncio.run(coro)
Obvious advantages:- It works!
- Simply
- No additional external dependencies
What is actually wrong?- Event Loop is created inside each worker.
- There is no context switch between executing coroutines
- At one point in time, the worker executes no more than one coroutine
- Shared resources are not shared between tasks
- Boilerplate
That is, asyncio in this case is used for asyncio, but there are no advantagesLet's try trickier?
I fought for performance:import asyncio
import threading
from .celeryapp import celeryapp
celeryapp.loop = asyncio.get_event_loop()
celeryapp.loop_runner = threading.Thread(
target=celeryapp.loop.run_forever,
daemon=True,
)
celeryapp.loop_runner.start()
async def async_task():
await asyncio.sleep(42)
@celeryapp.task
def regular_task():
coro = async_task()
asyncio.run_coroutine_threadsafe(
coro=coro,
loop=celeryapp.loop,
)
Bingo! Pros:- Still working
- Even more or less effective
- Even resources shuffle between coroutines within the worker.
- There are still no additional external dependencies
Bingo? Problems were not long in coming:- Celery does not know anything about running corutin
- You lose control over the execution of tasks
- You lose control of exceptions
- Boilerplate
To go with such a wonderful engineering solution in prod, my hand somehow didnβt riseFormulation of the problem
- Should work
- Coroutines must be competitive
- Resources should be shared between many executable
- No boilerplate
- Simple predictable API
That is, my expectations:import asyncio
from .celeryapp import celeryapp
@celeryapp.task
async def async_task():
await asyncio.sleep(42)
Summary
I implemented my ideas in the celery-pool-asyncio library . This library is used by our team in the current project, and we have already rolled out to the prod.Features briefly
Besides directly executing asyncio tasks, celery-pool-asyncio also solves problems:- Asynchronous task shedding
- Corutin support in celery signals
In order to get celery-pool-asyncio to work, I used monkey patching . For each patch used in runtime, it is possible to disable it.You can read more about all this in the documentation.Plans
In a good way, you need to integrate the pool into celery. On the other hand, celery developers are prototyping celery 5.0, which will be asynchronous. Is the game worth the candle?Examples
To demonstrate the capabilities of my celery-pool-asyncio and celery-decorator-taskcls libraries ( article ), a test project was implemented .Other
I tried to tell the same thing on the mitap