Vor Beginn des Python Web Developer- Kurses wurde eine Ăbersetzung des Artikels erstellt .
Wenn Ihre Anwendung einen lĂ€ngeren Prozess hat, können Sie ihn nicht im Standard-Anforderungs- / Antwort-Stream, sondern im Hintergrund verarbeiten.In Ihrer Anwendung muss der Benutzer beispielsweise ein Miniaturbild senden (das höchstwahrscheinlich bearbeitet werden muss) und die E-Mail-Adresse bestĂ€tigen. Wenn Ihre Anwendung das Bild verarbeitet und dann eine E-Mail zur BestĂ€tigung im Anforderungshandler sendet, muss der Endbenutzer aus irgendeinem Grund warten, um beide Aufgaben auszufĂŒhren, bevor er die Seite neu lĂ€dt oder schlieĂt. Stattdessen können Sie diese VorgĂ€nge in die Aufgabenwarteschlange ĂŒbertragen und sie einem separaten Prozess zur Verarbeitung ĂŒberlassen, um sofort eine Antwort an den Benutzer zu senden. In diesem Fall kann der Endbenutzer wĂ€hrend der Verarbeitung im Hintergrund andere Aufgaben auf der Clientseite ausfĂŒhren. In diesem Fall kann Ihre Anwendung auch frei auf Anfragen anderer Benutzer und Clients reagieren.Heute werden wir ĂŒber den Prozess des Einrichtens und Konfigurierens von Sellerie und Redis sprechen , um lang laufende Prozesse in einer Django-Anwendung zu handhaben und solche Probleme zu lösen. Wir werden auch Docker und Docker Compose verwenden, um alle Teile zusammenzufĂŒgen und zu sehen, wie Sellerie-Jobs mit Unit- und Integrationstests getestet werden.Am Ende dieses Handbuchs werden wir lernen:- Integrieren Sie Sellerie in Django, um Hintergrundjobs zu erstellen.
- Pack Django, Sellerie und Redis mit Docker.
- FĂŒhren Sie Prozesse im Hintergrund mit einem separaten Workflow aus.
- Speichern Sie Sellerieprotokolle in einer Datei.
- Richten Sie Flower ein , um Selleriejobs und -arbeiter zu ĂŒberwachen und zu verwalten.
- Testen Sie Sellerie-Jobs mit Unit- und Integrationstests.
Hintergrundaufgaben
Um die Benutzererfahrung zu verbessern, sollten lange Prozesse im Hintergrund auĂerhalb des normalen HTTP-Anforderungs- / Antwortstroms ausgefĂŒhrt werden.Zum Beispiel:- Senden von Briefen zur BestĂ€tigung;
- Web-Scaping und Crawlen;
- Datenanalyse;
- Bildverarbeitung;
- Berichterstellung.
Versuchen Sie beim Erstellen einer Anwendung, Aufgaben, die wĂ€hrend der Laufzeit der Anforderung / Antwort ausgefĂŒhrt werden sollen, z. B. CRUD-VorgĂ€nge, von Aufgaben zu trennen, die im Hintergrund ausgefĂŒhrt werden sollen.Der Arbeitsprozess
Unser Ziel ist es, eine Django-Anwendung zu entwickeln, die Sellerie verwendet, um langwierige Prozesse auĂerhalb des Anforderungs- / Antwortzyklus abzuwickeln.- Der Endbenutzer generiert einen neuen Job, indem er eine POST-Anforderung an den Server sendet.
- In dieser Ansicht wird der Job zur Warteschlange hinzugefĂŒgt und die Job-ID an den Client zurĂŒckgesendet.
- Mit AJAX fragt der Client den Server weiterhin ab, um den Status des Jobs zu ĂŒberprĂŒfen, wĂ€hrend der Job selbst im Hintergrund ausgefĂŒhrt wird.

Projekterstellung
Klonen Sie das Projekt aus der django-Sellerie - Repository und macht eine Kasse auf dem v1 - Tag in dem Master - Zweig :$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master
Da wir insgesamt mit drei Prozessen arbeiten mĂŒssen (Django, Redis, Worker), verwenden wir Docker, um die Arbeit zu vereinfachen und sie zu verbinden, sodass wir alles mit einem Befehl in einem Terminalfenster ausfĂŒhren können.Erstellen Sie im Projektstamm Bilder und starten Sie Docker-Container:$ docker-compose up -d --build
Wenn der Build abgeschlossen ist, gehen Sie zu localhost : 1337:
Stellen Sie sicher, dass die Tests erfolgreich bestanden wurden:$ docker-compose exec web python -m pytest
======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item
tests/test_tasks.py . [100%]
========================================= 1 passed in 0.47s =========================================
Schauen wir uns die Struktur des Projekts an, bevor wir fortfahren:âââ .gitignore
âââ LICENSE
âââ README.md
âââ docker-compose.yml
âââ project
âââ Dockerfile
âââ core
â âââ __init__.py
â âââ asgi.py
â âââ settings.py
â âââ urls.py
â âââ wsgi.py
âââ entrypoint.sh
âââ manage.py
âââ pytest.ini
âââ requirements.txt
âââ static
â âââ bulma.min.css
â âââ jquery-3.4.1.min.js
â âââ main.css
â âââ main.js
âââ tasks
â âââ __init__.py
â âââ apps.py
â âââ migrations
â â âââ __init__.py
â âââ templates
â â âââ home.html
â âââ views.py
âââ tests
âââ __init__.py
âââ test_tasks.py
Jobstart
Der Ereignishandler ist project/static/main.js
per Knopfdruck abonniert. Durch einen Klick auf dem Server sendet eine AJAX - POST-Anfrage mit dem entsprechenden Auftragstyp: 1
, 2
oder 3
.$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') },
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
Auf der Serverseite wurde bereits eine Ansicht konfiguriert, um die Anforderung zu verarbeiten in project/tasks/views.py
:def run_task(request):
if request.POST:
task_type = request.POST.get("type")
return JsonResponse({"task_type": task_type}, status=202)
Und jetzt beginnt der SpaĂ: Wir binden Sellerie!Sellerie-Setup
Beginnen wir mit dem HinzufĂŒgen von Sellerie und Redis zur Datei project/requirements.txt
:celery==4.4.1
Django==3.0.4
redis==3.4.1
pytest==5.4.1
pytest-django==3.8.0
Celery verwendet einen Nachrichtenbroker - RabbitMQ , Redis oder AWS Simple Queue Service (SQS) -, um die Kommunikation zwischen Celery Worker und Webanwendung zu vereinfachen. Nachrichten werden an den Broker gesendet und dann vom Mitarbeiter verarbeitet. Danach werden die Ergebnisse an das Backend gesendet.Redis wird sowohl ein Broker als auch ein Backend sein. FĂŒgen Sie der Datei Redis und Celery Worker docker-compose.yml
wie folgt hinzu:version: '3.7'
services:
web:
build: ./project
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ./project:/usr/src/app/
ports:
- 1337:8000
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- redis
celery:
build: ./project
command: celery worker --app=core --loglevel=info
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
redis:
image: redis:5-alpine
Achten Sie auf celery worker --app=core --loglevel=info
:celery worker
verwendet, um Sellerie Arbeiter zu starten ;--app=core
wird verwendet, um core
die Sellerie- Anwendung zu starten (die wir in KĂŒrze definieren werden);--loglevel=info
bestimmt den Grad der Informationsprotokollierung.
FĂŒgen Sie dem Projekteinstellungsmodul Folgendes hinzu, damit Celery Redis als Broker und Backend verwendet:CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
Erstellen Sie dann die Datei sample_tasks.py
in project/tasks
: Hier haben wir mit dem Dekorator shared_task eine neue Sellerie- Taskfunktion namens definiert . Denken Sie daran, dass die Aufgabe selbst nicht vom Django-Prozess ausgefĂŒhrt wird, sondern vom Sellerie-Arbeiter. FĂŒgen Sie nun die Datei hinzu zu :# project/tasks/sample_tasks.py
import time
from celery import shared_task
@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True
create_task
celery.py
"project/core"
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
Was ist denn hier los?- Zuerst mĂŒssen Sie den Standardwert fĂŒr die Umgebung festlegen,
DJANGO_SETTINGS_MODULE
damit Sellerie weiĂ, wie das Django-Projekt gefunden wird. - Dann haben wir eine Instanz von Sellerie mit einem Namen erstellt
core
und in eine Variable eingefĂŒgt app
. - Dann haben wir Sellerie-Konfigurationswerte aus dem Einstellungsobjekt von geladen
django.conf
. Wir haben namespace = "CELERY" verwendet , um Konflikte mit anderen Django-Einstellungen zu vermeiden. Daher mĂŒssen alle Konfigurationseinstellungen fĂŒr Sellerie mit einem PrĂ€fix beginnen CELERY_
. - Zuletzt
app.autodiscover_tasks()
weist Celery an, nach Jobs in den in definierten Anwendungen zu suchen settings.INSTALLED_APPS
.
Ăndern Sie dies project/core/__init__.py
so, dass die Sellerie-Anwendung beim Starten von Django automatisch importiert wird:from .celery import app as celery_app
__all__ = ("celery_app",)
Jobstart
Aktualisieren Sie die Ansicht, um den Job zu starten und die ID zu senden:@csrf_exempt
def run_task(request):
if request.POST:
task_type = request.POST.get("type")
task = create_task.delay(int(task_type))
return JsonResponse({"task_id": task.id}, status=202)
Vergessen Sie nicht, die Aufgabe zu importieren:from tasks.sample_tasks import create_task
Sammeln Sie Bilder und stellen Sie neue Container bereit:$ docker-compose up -d --build
Gehen Sie wie folgt vor, um eine neue Aufgabe zu starten:$ curl -F type=0 http://localhost:1337/tasks/
Sie werden so etwas sehen:{
"task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}
Beruflicher Status
ZurĂŒck zum clientseitigen Ereignishandler:$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') },
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
Wenn die Antwort von der AJAX-Anfrage zurĂŒckkommt, senden wir getStatus()
jede Sekunde von der Job-ID:function getStatus(taskID) {
$.ajax({
url: `/tasks/${taskID}/`,
method: 'GET'
})
.done((res) => {
const html = `
<tr>
<td>${res.task_id}</td>
<td>${res.task_status}</td>
<td>${res.task_result}</td>
</tr>`
$('#tasks').prepend(html);
const taskStatus = res.task_status;
if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
setTimeout(function() {
getStatus(res.task_id);
}, 1000);
})
.fail((err) => {
console.log(err)
});
}
Wenn die Antwort Ja lautet, wird der DOM-Tabelle eine neue Zeile hinzugefĂŒgt. Aktualisieren Sie die Ansicht get_status
, um den Status zurĂŒckzugeben:@csrf_exempt
def get_status(request, task_id):
task_result = AsyncResult(task_id)
result = {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result
}
return JsonResponse(result, status=200)
Import AsyncResult :from celery.result import AsyncResult
Container aktualisieren:$ docker-compose up -d --build
FĂŒhren Sie eine neue Aufgabe aus:$ curl -F type=1 http://localhost:1337/tasks/
Extrahieren Sie dann task_id
aus der Antwort und rufen Sie aktualisiert get_status
an, um den Status anzuzeigen:$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/
{
"task_id": "25278457-0957-4b0b-b1da-2600525f812f",
"task_status": "SUCCESS",
"task_result": true
}
Sie können die gleichen Informationen im Browser sehen:
Protokolle Sellerie
Aktualisieren Sie den Dienst celery
in docker-compose.yml
einer Weise , dass Sellerie in einer separaten Datei abmeldet:celery:
build: ./project
command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
FĂŒgen Sie "Projekt" ein neues Verzeichnis hinzu und nennen Sie es "Protokolle" . FĂŒgen Sie dann die Datei in dieses neue Verzeichnis ein celery.log
.Aktualisieren:$ docker-compose up -d --build
Sie sollten sehen, wie die Protokolldatei nach dem Einstellen des Volumes lokal gefĂŒllt wird :[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
/usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
UserWarning: Using settings.DEBUG leads to a memory
leak, never use this setting in production environments!
warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
succeeded in 10.027462100006233s: True
Blumen-Dashboard
Flower ist ein leichtes webbasiertes Tool zur Ăberwachung von Sellerie in Echtzeit. Sie können beispielsweise laufende Aufgaben verfolgen, den Mitarbeiterpool vergröĂern oder verkleinern, Diagramme und Statistiken anzeigen.FĂŒgen Sie es hinzu requirements.txt
:celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1
pytest==5.4.1
pytest-django==3.8.0
FĂŒgen Sie dann den neuen Dienst hinzu zu docker-compose.yml
:dashboard:
build: ./project
command: flower -A core --port=5555 --broker=redis://redis:6379/0
ports:
- 5555:5555
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
- celery
Und testen:$ docker-compose up -d --build
Gehen Sie zu localhost : 5555, um das Dashboard anzuzeigen. Sie sollten einen Mitarbeiter sehen:
FĂŒhren Sie einige weitere Aufgaben aus, um das Dashboard zu testen:
FĂŒgen Sie weitere Mitarbeiter hinzu und sehen Sie, wie sich dies auf die Leistung auswirkt:$ docker-compose up -d --build --scale celery=3
Tests
Beginnen wir mit dem einfachsten Test:def test_task():
assert sample_tasks.create_task.run(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run(3)
FĂŒgen Sie den obigen Testfall hinzu project/tests/test_tasks.py
und fĂŒgen Sie den folgenden Import hinzu:from tasks import sample_tasks
FĂŒhren Sie diesen Test aus:$ docker-compose exec web python -m pytest -k "test_task and not test_home"
Dieser Test dauert ungefÀhr eine Minute:======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected
tests/test_tasks.py . [100%]
============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================
Es ist erwÀhnenswert, dass wir in den obigen Aussagen .run
stattdessen die Methode verwendet haben .delay
, um Aufgaben direkt zu starten, ohne Sellerie-Arbeiter zu verwenden.Möchten Sie Mock-Plugins verwenden , um die Dinge zu beschleunigen?@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
assert sample_tasks.create_task.run(1)
sample_tasks.create_task.run.assert_called_once_with(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run.call_count == 2
assert sample_tasks.create_task.run(3)
assert sample_tasks.create_task.run.call_count == 3
Importieren:from unittest.mock import patch, call
PrĂŒfung:$ docker-compose exec web python -m pytest -k "test_mock_task"
======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected
tests/test_tasks.py . [100%]
================================== 1 passed, 2 deselected in 1.13s ==================================
Sehen? Jetzt viel schneller!Was ist mit vollstÀndigen Integrationstests?def test_task_status(client):
response = client.post(reverse("run_task"), {"type": 0})
content = json.loads(response.content)
task_id = content["task_id"]
assert response.status_code == 202
assert task_id
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
assert response.status_code == 200
while content["task_status"] == "PENDING":
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}
Denken Sie daran, dass dieser Test denselben Broker und dasselbe Backend wie in der Entwicklung verwendet. Sie können eine neue Instanz von Sellerie zum Testen erstellen:app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)
Import hinzufĂŒgen:import json
Und stellen Sie sicher, dass die Tests erfolgreich sind.Fazit
Heute haben wir die Grundkonfiguration von Sellerie eingefĂŒhrt, um langfristige Aufgaben in einer Anwendung auf Django auszufĂŒhren. Sie sollten alle Prozesse an die Verarbeitungswarteschlange senden, die den Code auf der Benutzerseite verlangsamen könnten.Sellerie kann auch verwendet werden, um sich wiederholende Aufgaben auszufĂŒhren und komplexe ressourcenintensive Aufgaben zu zerlegen, um die Rechenlast auf mehrere Maschinen zu verteilen und die AusfĂŒhrungszeit und die Last auf der Maschine zu reduzieren, die Clientanforderungen verarbeitet.Sie finden den gesamten Code in diesem Repository .
â Steig in den Kurs ein