Aipo + asyncio

Olá Habr! Quero contar como resolvi o problema da execução competitiva eficaz de tarefas assíncronas no Aipo.

KDPV

Introdução


O aipo é um projeto grande, com uma história complexa e um fardo pesado de compatibilidade com versões anteriores. As principais decisões arquiteturais foram tomadas muito antes de o assíncio aparecer em python. Portanto, é ainda mais interessante que, de alguma forma, você possa executar a tarefa assíncrona no aipo imediatamente.

Modo de furo ativado
Formalmente, após o nº 839 . Subjetivamente, para mim, renomear um pacote não altera a arquitetura do aplicativo. Modo de furo desativado.

Executar tarefas assíncronas no aipo de baunilha


Você pode iniciar a tarefa assíncrona a partir da caixa:

import asyncio

from .celeryapp import celeryapp


async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run(coro)

Vantagens óbvias:

  • Funciona!
  • Simplesmente
  • Sem dependências externas adicionais

O que está realmente errado?

  • O loop de eventos é criado dentro de cada trabalhador.
  • Não há alternância de contexto entre a execução de corotinas
  • Em determinado momento, o trabalhador não executa mais que uma corotina
  • Recursos compartilhados não são compartilhados entre tarefas
  • Chapa de ebulição

Ou seja, assíncrono, neste caso, é usado para assíncrono, mas não há vantagens

Vamos tentar mais complicado?


Eu lutei pelo desempenho:

import asyncio
import threading

from .celeryapp import celeryapp

celeryapp.loop = asyncio.get_event_loop()
celeryapp.loop_runner = threading.Thread(
    target=celeryapp.loop.run_forever,
    daemon=True,
)
celeryapp.loop_runner.start()

async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run_coroutine_threadsafe(
        coro=coro,
        loop=celeryapp.loop,
    )

Bingo! Prós:

  • Ainda trabalhando
  • Ainda mais ou menos eficaz
  • Até os recursos se misturam entre as rotinas dentro do trabalhador.
  • Ainda não há dependências externas adicionais

Bingo? Os problemas não demoraram a chegar:

  • O aipo não sabe nada sobre o funcionamento de corutin
  • Você perde o controle sobre a execução de tarefas
  • Você perde o controle das exceções
  • Chapa de ebulição

Para usar uma solução de engenharia tão maravilhosa em prod, minha mão não subiu

Formulação do problema


  • Deveria trabalhar
  • As corotinas devem ser competitivas
  • Os recursos devem ser compartilhados entre muitos executáveis
  • Sem clichê
  • API previsível simples

Ou seja, minhas expectativas:

import asyncio

from .celeryapp import celeryapp


@celeryapp.task
async def async_task():
    await asyncio.sleep(42)

Sumário


Eu implementei minhas idéias na biblioteca aipo-piscina-asyncio . Essa biblioteca é usada por nossa equipe no projeto atual e já lançamos o produto.

Apresenta brevemente


Além de executar diretamente tarefas assíncronas, o celery-pool-asyncio também resolve problemas:

  • Descarga de tarefa assíncrona
  • Suporte Corutin em sinais de aipo

Para fazer o aipo-piscina-assíncio funcionar, usei o patch de macacos . Para cada patch usado em tempo de execução, é possível desativá-lo.

Você pode ler mais sobre tudo isso na documentação.

Planos


De uma maneira boa, você precisa integrar a piscina no aipo. Por outro lado, os desenvolvedores de aipo estão criando um protótipo do aipo 5.0, que será assíncrono. O jogo vale a pena?

Exemplos


Para demonstrar os recursos das minhas bibliotecas aipo-piscina-assíncio e aipo-decorador-tarefas ( artigo ), um projeto de teste foi implementado .

De outros


Eu tentei dizer a mesma coisa no mitap


All Articles