Pekerjaan Asinkron di Django dengan Seledri

Terjemahan artikel disiapkan sebelum dimulainya kursus Pengembang Web Python .



Jika aplikasi Anda memiliki proses yang panjang, Anda dapat memprosesnya bukan di aliran permintaan / respons standar, tetapi di latar belakang.

Misalnya, dalam aplikasi Anda, pengguna harus mengirim gambar mini (yang, kemungkinan besar, perlu diedit) dan mengonfirmasi alamat email. Jika aplikasi Anda memproses gambar dan kemudian mengirim email untuk konfirmasi di penangan permintaan, maka pengguna akhir harus menunggu beberapa alasan untuk menyelesaikan kedua tugas sebelum memuat ulang atau menutup halaman. Sebagai gantinya, Anda dapat mentransfer operasi ini ke antrian tugas dan membiarkannya ke proses terpisah untuk diproses agar segera mengirim respons kepada pengguna. Dalam hal ini, pengguna akhir akan dapat melakukan hal-hal lain di sisi klien saat memproses di latar belakang. Dalam hal ini, aplikasi Anda juga akan dapat dengan bebas menanggapi permintaan dari pengguna dan klien lain.

Hari ini kita akan berbicara tentang proses pengaturan dan konfigurasi Celery dan Redis untuk menangani proses yang berjalan lama dalam aplikasi Django untuk menyelesaikan masalah seperti itu. Kami juga akan menggunakan Docker dan Docker Compose untuk menggabungkan semua bagian dan melihat bagaimana menguji pekerjaan Seledri dengan unit dan tes integrasi.

Pada akhir panduan ini, kita akan belajar:

  • Integrasikan Celery di Django untuk menciptakan pekerjaan latar belakang.
  • Kemas Django, Seledri, dan Redis dengan Docker.
  • Jalankan proses di latar belakang menggunakan alur kerja yang terpisah.
  • Simpan log Seledri ke file.
  • Siapkan Bunga untuk memantau dan mengelola pekerjaan dan pekerja Seledri.
  • Tes pekerjaan Seledri dengan unit dan tes integrasi.

Tugas Latar Belakang


Untuk meningkatkan pengalaman pengguna, proses yang panjang harus berjalan di latar belakang di luar aliran permintaan / respons HTTP normal.

Contohnya:

  • Mengirim surat untuk konfirmasi;
  • Scaping dan crawling web;
  • Analisis data;
  • Pengolahan citra;
  • Pembuatan Laporan.

Saat membuat aplikasi, coba pisahkan tugas-tugas yang harus dilakukan selama umur permintaan / respons, misalnya, operasi CRUD, dari tugas-tugas yang harus dilakukan di latar belakang.

Proses kerja


Tujuan kami adalah untuk mengembangkan aplikasi Django yang menggunakan Seledri untuk menangani proses yang panjang di luar siklus permintaan / respons.

  1. Pengguna akhir menghasilkan pekerjaan baru dengan mengirimkan permintaan POST ke server.
  2. Dalam tampilan ini, pekerjaan ditambahkan ke antrian, dan id pekerjaan dikirim kembali ke klien.
  3. Menggunakan AJAX, klien terus meminta server untuk memeriksa status pekerjaan, sementara pekerjaan itu sendiri berjalan di latar belakang.



Pembuatan proyek


Kloning proyek dari repositori django-seledri dan lakukan checkout pada tag v1 di cabang master :

$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master

Karena secara total kita perlu bekerja dengan tiga proses (Django, Redis, pekerja), kita menggunakan Docker untuk menyederhanakan pekerjaan, menghubungkannya sehingga kita dapat menjalankan semuanya dengan satu perintah di satu jendela terminal.

Dari root proyek, buat gambar dan luncurkan wadah Docker:

$ docker-compose up -d --build

Ketika build selesai, pergi ke localhost : 1337:



Pastikan tes berhasil:

$ docker-compose exec web python -m pytest

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item

tests/test_tasks.py .                                                                         [100%]

========================================= 1 passed in 0.47s =========================================

Mari kita lihat struktur proyek sebelum melanjutkan:

โ”œโ”€โ”€ .gitignore
โ”œโ”€โ”€ LICENSE
โ”œโ”€โ”€ README.md
โ”œโ”€โ”€ docker-compose.yml
โ””โ”€โ”€ project
    โ”œโ”€โ”€ Dockerfile
    โ”œโ”€โ”€ core
    โ”‚   โ”œโ”€โ”€ __init__.py
    โ”‚   โ”œโ”€โ”€ asgi.py
    โ”‚   โ”œโ”€โ”€ settings.py
    โ”‚   โ”œโ”€โ”€ urls.py
    โ”‚   โ””โ”€โ”€ wsgi.py
    โ”œโ”€โ”€ entrypoint.sh
    โ”œโ”€โ”€ manage.py
    โ”œโ”€โ”€ pytest.ini
    โ”œโ”€โ”€ requirements.txt
    โ”œโ”€โ”€ static
    โ”‚   โ”œโ”€โ”€ bulma.min.css
    โ”‚   โ”œโ”€โ”€ jquery-3.4.1.min.js
    โ”‚   โ”œโ”€โ”€ main.css
    โ”‚   โ””โ”€โ”€ main.js
    โ”œโ”€โ”€ tasks
    โ”‚   โ”œโ”€โ”€ __init__.py
    โ”‚   โ”œโ”€โ”€ apps.py
    โ”‚   โ”œโ”€โ”€ migrations
    โ”‚   โ”‚   โ””โ”€โ”€ __init__.py
    โ”‚   โ”œโ”€โ”€ templates
    โ”‚   โ”‚   โ””โ”€โ”€ home.html
    โ”‚   โ””โ”€โ”€ views.py
    โ””โ”€โ”€ tests
        โ”œโ”€โ”€ __init__.py
        โ””โ”€โ”€ test_tasks.py

Peluncuran pekerjaan


Pengatur acara project/static/main.jsberlangganan klik tombol. Dengan mengklik pada server mengirimkan AJAX POST-permintaan dengan jenis pekerjaan yang sesuai: 1, 2atau 3.

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

Di sisi server, tampilan telah dikonfigurasi untuk memproses permintaan di project/tasks/views.py:

def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        return JsonResponse({"task_type": task_type}, status=202)

Dan sekarang kesenangan dimulai: kita ikat Seledri!

Pengaturan Seledri


Mari kita mulai dengan menambahkan Seledri dan Redis ke file project/requirements.txt:

celery==4.4.1
Django==3.0.4
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Selery menggunakan broker pesan - RabbitMQ , Redis, atau AWS Simple Queue Service (SQS) โ€”untuk menyederhanakan komunikasi antara pekerja Selery dan aplikasi web. Pesan dikirim ke broker, dan kemudian diproses oleh pekerja. Setelah itu, hasilnya dikirim ke backend.

Redis akan menjadi broker dan backend. Tambahkan Redis dan Seledri Pekerja ke file docker-compose.ymlsebagai berikut:

version: '3.7'

services:
  web:
    build: ./project
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./project:/usr/src/app/
    ports:
      - 1337:8000
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    build: ./project
    command: celery worker --app=core --loglevel=info
    volumes:
      - ./project:/usr/src/app
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:5-alpine

Perhatikan celery worker --app=core --loglevel=info:

  1. celery workerdigunakan untuk memulai pekerja Seledri ;
  2. --app=coredigunakan untuk meluncurkan core aplikasi Seledri (yang akan segera kita definisikan);
  3. --loglevel=infomenentukan tingkat pencatatan informasi.

Tambahkan berikut ini ke modul pengaturan proyek sehingga Celery menggunakan Redis sebagai broker dan backend:

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")

Kemudian buat file sample_tasks.pydi project/tasks: Di sini, menggunakan dekorator shared_task, kami mendefinisikan fungsi tugas Seledri baru yang disebut . Ingatlah bahwa tugas itu sendiri tidak akan dieksekusi dari proses Django, itu akan dilakukan oleh pekerja Seledri. Sekarang tambahkan file ke :

# project/tasks/sample_tasks.py

import time

from celery import shared_task

@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True


create_task



celery.py"project/core"

import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

Apa yang terjadi di sini?

  1. Pertama, Anda perlu mengatur nilai default untuk lingkungan DJANGO_SETTINGS_MODULEsehingga Celery tahu cara menemukan proyek Django.
  2. Kemudian kami membuat instance dari Seledri dengan nama coredan menempatkannya dalam variabel app.
  3. Lalu kami memuat nilai konfigurasi Selery dari objek pengaturan django.conf. Kami menggunakan namespace = "CELERY" untuk mencegah konflik dengan pengaturan Django lainnya. Karena itu, semua pengaturan konfigurasi untuk Seledri harus dimulai dengan awalan CELERY_.
  4. Terakhir, app.autodiscover_tasks()memberitahu Celery untuk mencari pekerjaan dari aplikasi yang ditentukan dalam settings.INSTALLED_APPS.

Ubah project/core/__init__.pysehingga aplikasi Seledri diimpor secara otomatis saat memulai Django:

from .celery import app as celery_app


__all__ = ("celery_app",)

Peluncuran pekerjaan


Refresh view untuk memulai pekerjaan dan mengirim id:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        task = create_task.delay(int(task_type))
        return JsonResponse({"task_id": task.id}, status=202)

Jangan lupa untuk mengimpor tugas:

from tasks.sample_tasks import create_task

Kumpulkan gambar dan gunakan wadah baru:

$ docker-compose up -d --build

Untuk memulai tugas baru, lakukan:

$ curl -F type=0 http://localhost:1337/tasks/

Anda akan melihat sesuatu seperti ini:

{
  "task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}

Status pekerjaan


Kembali ke pengendali acara sisi klien:

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

Ketika respons kembali dari permintaan AJAX, kami akan mengirim getStatus()dari id pekerjaan setiap detik:

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}/`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      <tr>
        <td>${res.task_id}</td>
        <td>${res.task_status}</td>
        <td>${res.task_result}</td>
      </tr>`
    $('#tasks').prepend(html);

    const taskStatus = res.task_status;

    if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
    setTimeout(function() {
      getStatus(res.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err)
  });
}

Jika jawabannya adalah ya, maka baris baru akan ditambahkan ke tabel DOM. Segarkan tampilan get_statusuntuk mengembalikan status:

@csrf_exempt
def get_status(request, task_id):
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return JsonResponse(result, status=200)

Impor AsyncResult :

from celery.result import AsyncResult

Perbarui wadah:

$ docker-compose up -d --build

Jalankan tugas baru:

$ curl -F type=1 http://localhost:1337/tasks/

Kemudian ekstrak task_iddari respons dan panggilan yang diperbarui get_statusuntuk melihat status:

$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/

{
    "task_id": "25278457-0957-4b0b-b1da-2600525f812f",
    "task_status": "SUCCESS",
    "task_result": true
}

Anda dapat melihat informasi yang sama di browser:



Log Seledri


Memperbarui layanan celerydalam docker-compose.ymlsedemikian rupa sehingga Seledri log off dalam file terpisah:

celery:
  build: ./project
  command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
  volumes:
    - ./project:/usr/src/app
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

Tambahkan direktori baru ke "proyek" dan beri nama "log" . Kemudian tambahkan ke direktori baru ini meletakkan file celery.log.

Memperbarui:

$ docker-compose up -d --build

Anda harus melihat bagaimana file log diisi secara lokal setelah mengatur volume :

[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
    /usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
    UserWarning: Using settings.DEBUG leads to a memory
    leak, never use this setting in production environments!
    warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
    Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
    Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
    succeeded in 10.027462100006233s: True

Dasbor Bunga


Flower adalah alat berbasis web yang ringan untuk memantau Celery secara real time. Anda dapat melacak tugas yang sedang berjalan, menambah atau mengurangi kumpulan pekerja, menampilkan grafik dan statistik, misalnya.

Tambahkan ke requirements.txt:

celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Kemudian tambahkan layanan baru ke docker-compose.yml:

dashboard:
  build: ./project
  command:  flower -A core --port=5555 --broker=redis://redis:6379/0
  ports:
    - 5555:5555
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis
    - celery

Dan uji:

$ docker-compose up -d --build

Pergi ke localhost : 5555 untuk melihat dasbor. Anda akan melihat satu pekerja:



Jalankan beberapa tugas lagi untuk menguji dasbor:



Coba tambahkan lebih banyak pekerja dan lihat bagaimana itu memengaruhi kinerja:

$ docker-compose up -d --build --scale celery=3

Tes


Mari kita mulai dengan tes paling sederhana:

def test_task():
    assert sample_tasks.create_task.run(1)
    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run(3)

Tambahkan test case di atas ke project/tests/test_tasks.pydan tambahkan impor berikut:

from tasks import sample_tasks


Jalankan tes ini:

$ docker-compose exec web python -m pytest -k "test_task and not test_home"

Tes ini akan memakan waktu sekitar satu menit:

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================

Perlu dicatat bahwa dalam pernyataan di atas kami menggunakan metode .runsebagai gantinya .delayuntuk langsung memulai tugas, tanpa menggunakan pekerja Seledri.
Ingin menggunakan plugin tiruan untuk mempercepatnya?

@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
    assert sample_tasks.create_task.run(1)
    sample_tasks.create_task.run.assert_called_once_with(1)

    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run.call_count == 2

    assert sample_tasks.create_task.run(3)
    assert sample_tasks.create_task.run.call_count == 3

Impor:

from unittest.mock import patch, call

Uji:

$ docker-compose exec web python -m pytest -k "test_mock_task"

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

================================== 1 passed, 2 deselected in 1.13s ==================================

Lihat? Sekarang jauh lebih cepat!

Bagaimana dengan pengujian integrasi penuh?

def test_task_status(client):
    response = client.post(reverse("run_task"), {"type": 0})
    content = json.loads(response.content)
    task_id = content["task_id"]
    assert response.status_code == 202
    assert task_id

    response = client.get(reverse("get_status", args=[task_id]))
    content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
    assert response.status_code == 200

    while content["task_status"] == "PENDING":
        response = client.get(reverse("get_status", args=[task_id]))
        content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}

Ingatlah bahwa tes ini menggunakan broker dan backend yang sama seperti dalam pengembangan. Anda dapat membuat instance baru dari Seledri untuk pengujian:

app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)

Tambahkan impor:

import json

Dan pastikan tesnya berhasil.

Kesimpulan


Hari ini kami memperkenalkan konfigurasi dasar Selery untuk melakukan tugas jangka panjang dalam aplikasi di Django. Anda harus mengirim proses apa pun ke antrian pemrosesan yang dapat memperlambat kode di sisi pengguna.

Seledri juga dapat digunakan untuk melakukan tugas berulang dan menguraikan tugas-tugas yang intensif sumber daya untuk mendistribusikan beban komputasi pada beberapa mesin dan mengurangi waktu eksekusi dan beban pada mesin yang memproses permintaan klien.

Anda dapat menemukan semua kode di repositori ini .



โ†’ Ikuti kursus

All Articles