芹菜+异步

哈Ha!我想告诉我如何解决在Celery中有效竞争执行异步任务的问题。

KDPV

介绍


芹菜是一个大型项目,具有复杂的历史和向后兼容的沉重负担。在asyncio出现在python之前很久就做出了重要的架构决策。因此,您可以以某种方式开箱即用地运行celery异步任务,这更加有趣。

钻孔模式开启
#839. . mode off.

在香草芹菜中运行异步任务


您可以从框中启动asyncio任务:

import asyncio

from .celeryapp import celeryapp


async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run(coro)

明显优势:

  • 有用!
  • 只是
  • 没有其他外部依赖

到底有什么问题?

  • 在每个工作程序内部创建事件循环。
  • 执行协程之间没有上下文切换
  • 在某个时间点,工人执行不超过一个协程
  • 任务之间不共享共享资源
  • 样板

也就是说,在这种情况下,asyncio用于asyncio,但是没有优势

让我们尝试更棘手吗?


我为表现而战:

import asyncio
import threading

from .celeryapp import celeryapp

celeryapp.loop = asyncio.get_event_loop()
celeryapp.loop_runner = threading.Thread(
    target=celeryapp.loop.run_forever,
    daemon=True,
)
celeryapp.loop_runner.start()

async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run_coroutine_threadsafe(
        coro=coro,
        loop=celeryapp.loop,
    )

答对了!优点:

  • 还在工作
  • 或多或少有效
  • 甚至在工人内部的协程之间,资源也会洗牌。
  • 仍然没有其他外部依赖项

答对了?问题很快就出现了:

  • 芹菜对运行corutin一无所知
  • 您无法控制任务的执行
  • 您无法控制例外
  • 样板

为了提供如此出色的工程解决方案,我的手不知何故没有抬起

问题的提法


  • 应该管用
  • 协程必须具有竞争力
  • 资源应在许多可执行文件之间共享
  • 没有样板
  • 简单的可预测API

那就是我的期望:

import asyncio

from .celeryapp import celeryapp


@celeryapp.task
async def async_task():
    await asyncio.sleep(42)

摘要


我在celery-pool-asyncio库中实现了我的想法我们的团队在当前项目中使用了该库,并且我们已经将其发布到产品中。

简要介绍


除了直接执行异步任务外,celery-pool-asyncio还解决了以下问题:

  • 异步任务清除
  • 芹菜信号中的Corutin支持

为了使celery-pool-asyncio正常工作,我使用了猴子修补程序对于运行时使用的每个修补程序,可以将其禁用。

您可以在文档中详细了解所有这些内容。

计划


以一种很好的方式,您需要将池集成到芹菜中。另一方面,芹菜开发人员正在开发芹菜5.0的原型,这将是异步的。这个游戏值得吗?

例子


为了演示我的celery-pool-asynciocelery-decorator-taskcls库文章)的功能,实施了一个测试项目

其他


我试图在mitap上讲同样的事情


All Articles