Celery + asyncio

Hello, Habr! I want to tell how I solved the problem of effective competitive execution of asyncio tasks in Celery.

KDPV

Introduction


Celery is a large project with a complex history and a heavy burden of backward compatibility. Key architectural decisions were made long before asyncio appeared in python. Therefore, it is even more interesting that somehow you can run the asyncio task in celery out of the box.

Bore mode on
#839. . mode off.

Run asyncio tasks in vanilla celery


You can start the asyncio task from the box:

import asyncio

from .celeryapp import celeryapp


async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run(coro)

Obvious advantages:

  • It works!
  • Simply
  • No additional external dependencies

What is actually wrong?

  • Event Loop is created inside each worker.
  • There is no context switch between executing coroutines
  • At one point in time, the worker executes no more than one coroutine
  • Shared resources are not shared between tasks
  • Boilerplate

That is, asyncio in this case is used for asyncio, but there are no advantages

Let's try trickier?


I fought for performance:

import asyncio
import threading

from .celeryapp import celeryapp

celeryapp.loop = asyncio.get_event_loop()
celeryapp.loop_runner = threading.Thread(
    target=celeryapp.loop.run_forever,
    daemon=True,
)
celeryapp.loop_runner.start()

async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run_coroutine_threadsafe(
        coro=coro,
        loop=celeryapp.loop,
    )

Bingo! Pros:

  • Still working
  • Even more or less effective
  • Even resources shuffle between coroutines within the worker.
  • There are still no additional external dependencies

Bingo? Problems were not long in coming:

  • Celery does not know anything about running corutin
  • You lose control over the execution of tasks
  • You lose control of exceptions
  • Boilerplate

To go with such a wonderful engineering solution in prod, my hand somehow didn’t rise

Formulation of the problem


  • Should work
  • Coroutines must be competitive
  • Resources should be shared between many executable
  • No boilerplate
  • Simple predictable API

That is, my expectations:

import asyncio

from .celeryapp import celeryapp


@celeryapp.task
async def async_task():
    await asyncio.sleep(42)

Summary


I implemented my ideas in the celery-pool-asyncio library . This library is used by our team in the current project, and we have already rolled out to the prod.

Features briefly


Besides directly executing asyncio tasks, celery-pool-asyncio also solves problems:

  • Asynchronous task shedding
  • Corutin support in celery signals

In order to get celery-pool-asyncio to work, I used monkey patching . For each patch used in runtime, it is possible to disable it.

You can read more about all this in the documentation.

Plans


In a good way, you need to integrate the pool into celery. On the other hand, celery developers are prototyping celery 5.0, which will be asynchronous. Is the game worth the candle?

Examples


To demonstrate the capabilities of my celery-pool-asyncio and celery-decorator-taskcls libraries ( article ), a test project was implemented .

Other


I tried to tell the same thing on the mitap


All Articles