哈Ha!我想告诉我如何解决在Celery中有效竞争执行异步任务的问题。
介绍
芹菜是一个大型项目,具有复杂的历史和向后兼容的沉重负担。在asyncio出现在python之前很久就做出了重要的架构决策。因此,您可以以某种方式开箱即用地运行celery异步任务,这更加有趣。在香草芹菜中运行异步任务
您可以从框中启动asyncio任务:import asyncio
from .celeryapp import celeryapp
async def async_task():
await asyncio.sleep(42)
@celeryapp.task
def regular_task():
coro = async_task()
asyncio.run(coro)
明显优势:到底有什么问题?- 在每个工作程序内部创建事件循环。
- 执行协程之间没有上下文切换
- 在某个时间点,工人执行不超过一个协程
- 任务之间不共享共享资源
- 样板
也就是说,在这种情况下,asyncio用于asyncio,但是没有优势让我们尝试更棘手吗?
我为表现而战:import asyncio
import threading
from .celeryapp import celeryapp
celeryapp.loop = asyncio.get_event_loop()
celeryapp.loop_runner = threading.Thread(
target=celeryapp.loop.run_forever,
daemon=True,
)
celeryapp.loop_runner.start()
async def async_task():
await asyncio.sleep(42)
@celeryapp.task
def regular_task():
coro = async_task()
asyncio.run_coroutine_threadsafe(
coro=coro,
loop=celeryapp.loop,
)
答对了!优点:- 还在工作
- 或多或少有效
- 甚至在工人内部的协程之间,资源也会洗牌。
- 仍然没有其他外部依赖项
答对了?问题很快就出现了:- 芹菜对运行corutin一无所知
- 您无法控制任务的执行
- 您无法控制例外
- 样板
为了提供如此出色的工程解决方案,我的手不知何故没有抬起问题的提法
- 应该管用
- 协程必须具有竞争力
- 资源应在许多可执行文件之间共享
- 没有样板
- 简单的可预测API
那就是我的期望:import asyncio
from .celeryapp import celeryapp
@celeryapp.task
async def async_task():
await asyncio.sleep(42)
摘要
我在celery-pool-asyncio库中实现了我的想法。我们的团队在当前项目中使用了该库,并且我们已经将其发布到产品中。简要介绍
除了直接执行异步任务外,celery-pool-asyncio还解决了以下问题:为了使celery-pool-asyncio正常工作,我使用了猴子修补程序。对于运行时使用的每个修补程序,可以将其禁用。您可以在文档中详细了解所有这些内容。计划
以一种很好的方式,您需要将池集成到芹菜中。另一方面,芹菜开发人员正在开发芹菜5.0的原型,这将是异步的。这个游戏值得吗?例子
为了演示我的celery-pool-asyncio和celery-decorator-taskcls库(文章)的功能,实施了一个测试项目。其他