Asynchrone Jobs in Django mit Sellerie

Vor Beginn des Python Web Developer- Kurses wurde eine Übersetzung des Artikels erstellt .



Wenn Ihre Anwendung einen lÀngeren Prozess hat, können Sie ihn nicht im Standard-Anforderungs- / Antwort-Stream, sondern im Hintergrund verarbeiten.

In Ihrer Anwendung muss der Benutzer beispielsweise ein Miniaturbild senden (das höchstwahrscheinlich bearbeitet werden muss) und die E-Mail-Adresse bestĂ€tigen. Wenn Ihre Anwendung das Bild verarbeitet und dann eine E-Mail zur BestĂ€tigung im Anforderungshandler sendet, muss der Endbenutzer aus irgendeinem Grund warten, um beide Aufgaben auszufĂŒhren, bevor er die Seite neu lĂ€dt oder schließt. Stattdessen können Sie diese VorgĂ€nge in die Aufgabenwarteschlange ĂŒbertragen und sie einem separaten Prozess zur Verarbeitung ĂŒberlassen, um sofort eine Antwort an den Benutzer zu senden. In diesem Fall kann der Endbenutzer wĂ€hrend der Verarbeitung im Hintergrund andere Aufgaben auf der Clientseite ausfĂŒhren. In diesem Fall kann Ihre Anwendung auch frei auf Anfragen anderer Benutzer und Clients reagieren.

Heute werden wir ĂŒber den Prozess des Einrichtens und Konfigurierens von Sellerie und Redis sprechen , um lang laufende Prozesse in einer Django-Anwendung zu handhaben und solche Probleme zu lösen. Wir werden auch Docker und Docker Compose verwenden, um alle Teile zusammenzufĂŒgen und zu sehen, wie Sellerie-Jobs mit Unit- und Integrationstests getestet werden.

Am Ende dieses Handbuchs werden wir lernen:

  • Integrieren Sie Sellerie in Django, um Hintergrundjobs zu erstellen.
  • Pack Django, Sellerie und Redis mit Docker.
  • FĂŒhren Sie Prozesse im Hintergrund mit einem separaten Workflow aus.
  • Speichern Sie Sellerieprotokolle in einer Datei.
  • Richten Sie Flower ein , um Selleriejobs und -arbeiter zu ĂŒberwachen und zu verwalten.
  • Testen Sie Sellerie-Jobs mit Unit- und Integrationstests.

Hintergrundaufgaben


Um die Benutzererfahrung zu verbessern, sollten lange Prozesse im Hintergrund außerhalb des normalen HTTP-Anforderungs- / Antwortstroms ausgefĂŒhrt werden.

Zum Beispiel:

  • Senden von Briefen zur BestĂ€tigung;
  • Web-Scaping und Crawlen;
  • Datenanalyse;
  • Bildverarbeitung;
  • Berichterstellung.

Versuchen Sie beim Erstellen einer Anwendung, Aufgaben, die wĂ€hrend der Laufzeit der Anforderung / Antwort ausgefĂŒhrt werden sollen, z. B. CRUD-VorgĂ€nge, von Aufgaben zu trennen, die im Hintergrund ausgefĂŒhrt werden sollen.

Der Arbeitsprozess


Unser Ziel ist es, eine Django-Anwendung zu entwickeln, die Sellerie verwendet, um langwierige Prozesse außerhalb des Anforderungs- / Antwortzyklus abzuwickeln.

  1. Der Endbenutzer generiert einen neuen Job, indem er eine POST-Anforderung an den Server sendet.
  2. In dieser Ansicht wird der Job zur Warteschlange hinzugefĂŒgt und die Job-ID an den Client zurĂŒckgesendet.
  3. Mit AJAX fragt der Client den Server weiterhin ab, um den Status des Jobs zu ĂŒberprĂŒfen, wĂ€hrend der Job selbst im Hintergrund ausgefĂŒhrt wird.



Projekterstellung


Klonen Sie das Projekt aus der django-Sellerie - Repository und macht eine Kasse auf dem v1 - Tag in dem Master - Zweig :

$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master

Da wir insgesamt mit drei Prozessen arbeiten mĂŒssen (Django, Redis, Worker), verwenden wir Docker, um die Arbeit zu vereinfachen und sie zu verbinden, sodass wir alles mit einem Befehl in einem Terminalfenster ausfĂŒhren können.

Erstellen Sie im Projektstamm Bilder und starten Sie Docker-Container:

$ docker-compose up -d --build

Wenn der Build abgeschlossen ist, gehen Sie zu localhost : 1337:



Stellen Sie sicher, dass die Tests erfolgreich bestanden wurden:

$ docker-compose exec web python -m pytest

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item

tests/test_tasks.py .                                                                         [100%]

========================================= 1 passed in 0.47s =========================================

Schauen wir uns die Struktur des Projekts an, bevor wir fortfahren:

├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── pytest.ini
    ├── requirements.txt
    ├── static
    │   ├── bulma.min.css
    │   ├── jquery-3.4.1.min.js
    │   ├── main.css
    │   └── main.js
    ├── tasks
    │   ├── __init__.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── templates
    │   │   └── home.html
    │   └── views.py
    └── tests
        ├── __init__.py
        └── test_tasks.py

Jobstart


Der Ereignishandler ist project/static/main.jsper Knopfdruck abonniert. Durch einen Klick auf dem Server sendet eine AJAX - POST-Anfrage mit dem entsprechenden Auftragstyp: 1, 2oder 3.

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

Auf der Serverseite wurde bereits eine Ansicht konfiguriert, um die Anforderung zu verarbeiten in project/tasks/views.py:

def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        return JsonResponse({"task_type": task_type}, status=202)

Und jetzt beginnt der Spaß: Wir binden Sellerie!

Sellerie-Setup


Beginnen wir mit dem HinzufĂŒgen von Sellerie und Redis zur Datei project/requirements.txt:

celery==4.4.1
Django==3.0.4
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Celery verwendet einen Nachrichtenbroker - RabbitMQ , Redis oder AWS Simple Queue Service (SQS) -, um die Kommunikation zwischen Celery Worker und Webanwendung zu vereinfachen. Nachrichten werden an den Broker gesendet und dann vom Mitarbeiter verarbeitet. Danach werden die Ergebnisse an das Backend gesendet.

Redis wird sowohl ein Broker als auch ein Backend sein. FĂŒgen Sie der Datei Redis und Celery Worker docker-compose.ymlwie folgt hinzu:

version: '3.7'

services:
  web:
    build: ./project
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./project:/usr/src/app/
    ports:
      - 1337:8000
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    build: ./project
    command: celery worker --app=core --loglevel=info
    volumes:
      - ./project:/usr/src/app
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:5-alpine

Achten Sie auf celery worker --app=core --loglevel=info:

  1. celery workerverwendet, um Sellerie Arbeiter zu starten ;
  2. --app=corewird verwendet, um core die Sellerie- Anwendung zu starten (die wir in KĂŒrze definieren werden);
  3. --loglevel=infobestimmt den Grad der Informationsprotokollierung.

FĂŒgen Sie dem Projekteinstellungsmodul Folgendes hinzu, damit Celery Redis als Broker und Backend verwendet:

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")

Erstellen Sie dann die Datei sample_tasks.pyin project/tasks: Hier haben wir mit dem Dekorator shared_task eine neue Sellerie- Taskfunktion namens definiert . Denken Sie daran, dass die Aufgabe selbst nicht vom Django-Prozess ausgefĂŒhrt wird, sondern vom Sellerie-Arbeiter. FĂŒgen Sie nun die Datei hinzu zu :

# project/tasks/sample_tasks.py

import time

from celery import shared_task

@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True


create_task



celery.py"project/core"

import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

Was ist denn hier los?

  1. Zuerst mĂŒssen Sie den Standardwert fĂŒr die Umgebung festlegen, DJANGO_SETTINGS_MODULEdamit Sellerie weiß, wie das Django-Projekt gefunden wird.
  2. Dann haben wir eine Instanz von Sellerie mit einem Namen erstellt coreund in eine Variable eingefĂŒgt app.
  3. Dann haben wir Sellerie-Konfigurationswerte aus dem Einstellungsobjekt von geladen django.conf. Wir haben namespace = "CELERY" verwendet , um Konflikte mit anderen Django-Einstellungen zu vermeiden. Daher mĂŒssen alle Konfigurationseinstellungen fĂŒr Sellerie mit einem PrĂ€fix beginnen CELERY_.
  4. Zuletzt app.autodiscover_tasks()weist Celery an, nach Jobs in den in definierten Anwendungen zu suchen settings.INSTALLED_APPS.

Ändern Sie dies project/core/__init__.pyso, dass die Sellerie-Anwendung beim Starten von Django automatisch importiert wird:

from .celery import app as celery_app


__all__ = ("celery_app",)

Jobstart


Aktualisieren Sie die Ansicht, um den Job zu starten und die ID zu senden:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        task = create_task.delay(int(task_type))
        return JsonResponse({"task_id": task.id}, status=202)

Vergessen Sie nicht, die Aufgabe zu importieren:

from tasks.sample_tasks import create_task

Sammeln Sie Bilder und stellen Sie neue Container bereit:

$ docker-compose up -d --build

Gehen Sie wie folgt vor, um eine neue Aufgabe zu starten:

$ curl -F type=0 http://localhost:1337/tasks/

Sie werden so etwas sehen:

{
  "task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}

Beruflicher Status


ZurĂŒck zum clientseitigen Ereignishandler:

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

Wenn die Antwort von der AJAX-Anfrage zurĂŒckkommt, senden wir getStatus()jede Sekunde von der Job-ID:

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}/`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      <tr>
        <td>${res.task_id}</td>
        <td>${res.task_status}</td>
        <td>${res.task_result}</td>
      </tr>`
    $('#tasks').prepend(html);

    const taskStatus = res.task_status;

    if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
    setTimeout(function() {
      getStatus(res.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err)
  });
}

Wenn die Antwort Ja lautet, wird der DOM-Tabelle eine neue Zeile hinzugefĂŒgt. Aktualisieren Sie die Ansicht get_status, um den Status zurĂŒckzugeben:

@csrf_exempt
def get_status(request, task_id):
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return JsonResponse(result, status=200)

Import AsyncResult :

from celery.result import AsyncResult

Container aktualisieren:

$ docker-compose up -d --build

FĂŒhren Sie eine neue Aufgabe aus:

$ curl -F type=1 http://localhost:1337/tasks/

Extrahieren Sie dann task_idaus der Antwort und rufen Sie aktualisiert get_statusan, um den Status anzuzeigen:

$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/

{
    "task_id": "25278457-0957-4b0b-b1da-2600525f812f",
    "task_status": "SUCCESS",
    "task_result": true
}

Sie können die gleichen Informationen im Browser sehen:



Protokolle Sellerie


Aktualisieren Sie den Dienst celeryin docker-compose.ymleiner Weise , dass Sellerie in einer separaten Datei abmeldet:

celery:
  build: ./project
  command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
  volumes:
    - ./project:/usr/src/app
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

FĂŒgen Sie "Projekt" ein neues Verzeichnis hinzu und nennen Sie es "Protokolle" . FĂŒgen Sie dann die Datei in dieses neue Verzeichnis ein celery.log.

Aktualisieren:

$ docker-compose up -d --build

Sie sollten sehen, wie die Protokolldatei nach dem Einstellen des Volumes lokal gefĂŒllt wird :

[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
    /usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
    UserWarning: Using settings.DEBUG leads to a memory
    leak, never use this setting in production environments!
    warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
    Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
    Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
    succeeded in 10.027462100006233s: True

Blumen-Dashboard


Flower ist ein leichtes webbasiertes Tool zur Überwachung von Sellerie in Echtzeit. Sie können beispielsweise laufende Aufgaben verfolgen, den Mitarbeiterpool vergrĂ¶ĂŸern oder verkleinern, Diagramme und Statistiken anzeigen.

FĂŒgen Sie es hinzu requirements.txt:

celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

FĂŒgen Sie dann den neuen Dienst hinzu zu docker-compose.yml:

dashboard:
  build: ./project
  command:  flower -A core --port=5555 --broker=redis://redis:6379/0
  ports:
    - 5555:5555
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis
    - celery

Und testen:

$ docker-compose up -d --build

Gehen Sie zu localhost : 5555, um das Dashboard anzuzeigen. Sie sollten einen Mitarbeiter sehen:



FĂŒhren Sie einige weitere Aufgaben aus, um das Dashboard zu testen:



FĂŒgen Sie weitere Mitarbeiter hinzu und sehen Sie, wie sich dies auf die Leistung auswirkt:

$ docker-compose up -d --build --scale celery=3

Tests


Beginnen wir mit dem einfachsten Test:

def test_task():
    assert sample_tasks.create_task.run(1)
    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run(3)

FĂŒgen Sie den obigen Testfall hinzu project/tests/test_tasks.pyund fĂŒgen Sie den folgenden Import hinzu:

from tasks import sample_tasks


FĂŒhren Sie diesen Test aus:

$ docker-compose exec web python -m pytest -k "test_task and not test_home"

Dieser Test dauert ungefÀhr eine Minute:

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================

Es ist erwÀhnenswert, dass wir in den obigen Aussagen .runstattdessen die Methode verwendet haben .delay, um Aufgaben direkt zu starten, ohne Sellerie-Arbeiter zu verwenden.
Möchten Sie Mock-Plugins verwenden , um die Dinge zu beschleunigen?

@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
    assert sample_tasks.create_task.run(1)
    sample_tasks.create_task.run.assert_called_once_with(1)

    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run.call_count == 2

    assert sample_tasks.create_task.run(3)
    assert sample_tasks.create_task.run.call_count == 3

Importieren:

from unittest.mock import patch, call

PrĂŒfung:

$ docker-compose exec web python -m pytest -k "test_mock_task"

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

================================== 1 passed, 2 deselected in 1.13s ==================================

Sehen? Jetzt viel schneller!

Was ist mit vollstÀndigen Integrationstests?

def test_task_status(client):
    response = client.post(reverse("run_task"), {"type": 0})
    content = json.loads(response.content)
    task_id = content["task_id"]
    assert response.status_code == 202
    assert task_id

    response = client.get(reverse("get_status", args=[task_id]))
    content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
    assert response.status_code == 200

    while content["task_status"] == "PENDING":
        response = client.get(reverse("get_status", args=[task_id]))
        content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}

Denken Sie daran, dass dieser Test denselben Broker und dasselbe Backend wie in der Entwicklung verwendet. Sie können eine neue Instanz von Sellerie zum Testen erstellen:

app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)

Import hinzufĂŒgen:

import json

Und stellen Sie sicher, dass die Tests erfolgreich sind.

Fazit


Heute haben wir die Grundkonfiguration von Sellerie eingefĂŒhrt, um langfristige Aufgaben in einer Anwendung auf Django auszufĂŒhren. Sie sollten alle Prozesse an die Verarbeitungswarteschlange senden, die den Code auf der Benutzerseite verlangsamen könnten.

Sellerie kann auch verwendet werden, um sich wiederholende Aufgaben auszufĂŒhren und komplexe ressourcenintensive Aufgaben zu zerlegen, um die Rechenlast auf mehrere Maschinen zu verteilen und die AusfĂŒhrungszeit und die Last auf der Maschine zu reduzieren, die Clientanforderungen verarbeitet.

Sie finden den gesamten Code in diesem Repository .



→ Steig in den Kurs ein

All Articles