Seledri + asyncio

Halo, Habr! Saya ingin memberi tahu bagaimana saya memecahkan masalah eksekusi kompetitif yang efektif dari tugas-tugas asyncio di Celery.

KDPV

pengantar


Selery adalah proyek besar dengan sejarah yang kompleks dan beban kompatibilitas yang berat. Keputusan arsitektur utama dibuat jauh sebelum asyncio muncul dengan python. Oleh karena itu, bahkan lebih menarik bahwa Anda dapat menjalankan tugas asyncio dalam seledri di luar kotak.

Mode bore aktif
Secara formal, setelah # 839 . Secara subyektif, bagi saya, mengganti nama paket tidak mengubah arsitektur aplikasi. Mode bore mati.

Jalankan tugas asyncio di seledri vanilla


Anda dapat memulai tugas asyncio dari kotak:

import asyncio

from .celeryapp import celeryapp


async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run(coro)

Keuntungan yang jelas:

  • Berhasil!
  • Secara sederhana
  • Tidak ada ketergantungan eksternal tambahan

Apa yang sebenarnya salah?

  • Loop Peristiwa dibuat di dalam setiap pekerja.
  • Tidak ada saklar konteks antara menjalankan coroutine
  • Pada suatu saat, pekerja mengeksekusi tidak lebih dari satu coroutine
  • Sumber daya bersama tidak dibagi di antara tugas-tugas
  • Pelat boiler

Artinya, asyncio dalam hal ini digunakan untuk asyncio, tetapi tidak ada keuntungan

Mari kita coba lebih rumit?


Saya berjuang untuk kinerja:

import asyncio
import threading

from .celeryapp import celeryapp

celeryapp.loop = asyncio.get_event_loop()
celeryapp.loop_runner = threading.Thread(
    target=celeryapp.loop.run_forever,
    daemon=True,
)
celeryapp.loop_runner.start()

async def async_task():
    await asyncio.sleep(42)


@celeryapp.task
def regular_task():
    coro = async_task()
    asyncio.run_coroutine_threadsafe(
        coro=coro,
        loop=celeryapp.loop,
    )

Bingo! Pro:

  • Masih bekerja
  • Bahkan lebih atau kurang efektif
  • Bahkan sumber daya shuffle antara coroutine dalam pekerja.
  • Masih belum ada ketergantungan eksternal tambahan

Bingo? Masalahnya tidak lama datang:

  • Seledri tidak tahu apa-apa tentang menjalankan corutin
  • Anda kehilangan kendali atas pelaksanaan tugas
  • Anda kehilangan kendali atas pengecualian
  • Pelat boiler

Untuk pergi dengan solusi rekayasa yang begitu luar biasa di tangan, tangan saya entah bagaimana tidak naik

Perumusan masalah


  • Harus bekerja
  • Coroutine harus kompetitif
  • Sumber daya harus dibagi di antara banyak yang dapat dieksekusi
  • Tidak ada boilerplate
  • API yang dapat diprediksi sederhana

Itulah harapan saya:

import asyncio

from .celeryapp import celeryapp


@celeryapp.task
async def async_task():
    await asyncio.sleep(42)

Ringkasan


Saya mengimplementasikan ide-ide saya di perpustakaan selery-pool-asyncio . Perpustakaan ini digunakan oleh tim kami dalam proyek saat ini, dan kami telah meluncurkannya ke prod.

Fitur singkat


Selain secara langsung menjalankan tugas-tugas asyncio, selery-pool-asyncio juga memecahkan masalah:

  • Pelepasan tugas asinkron
  • Dukungan Corutin dalam sinyal seledri

Agar celery-pool-asyncio berfungsi, saya menggunakan patch monyet . Untuk setiap tambalan yang digunakan dalam runtime, dimungkinkan untuk menonaktifkannya.

Anda dapat membaca lebih lanjut tentang semua ini di dokumentasi.

Rencana


Dengan cara yang baik, Anda perlu mengintegrasikan kolam ke dalam seledri. Di sisi lain, pengembang seledri membuat prototip seledri 5.0, yang akan asinkron. Apakah game ini sepadan dengan lilin?

Contohnya


Untuk mendemonstrasikan kemampuan perpustakaan ( artikel ) seledri-kolam-asinko dan seledri-dekorator-tugasku saya , sebuah proyek pengujian dilaksanakan .

Lain


Saya mencoba mengatakan hal yang sama pada mitap


All Articles