Une traduction de l'article a été préparée avant le début du cours Python Web Developer .
Si votre application a un processus long, vous pouvez le traiter non pas dans le flux de demande / rĂ©ponse standard, mais en arriĂšre-plan.Par exemple, dans votre application, l'utilisateur doit envoyer une image miniature (qui, trĂšs probablement, devra ĂȘtre modifiĂ©e) et confirmer l'adresse e-mail. Si votre application traite l'image et envoie ensuite un e-mail pour confirmation dans le gestionnaire de demandes, l'utilisateur final devra attendre une raison quelconque pour terminer les deux tĂąches avant de recharger ou de fermer la page. Au lieu de cela, vous pouvez transfĂ©rer ces opĂ©rations vers la file d'attente des tĂąches et les laisser Ă un processus distinct pour traitement afin d'envoyer immĂ©diatement une rĂ©ponse Ă l'utilisateur. Dans ce cas, l'utilisateur final pourra effectuer d'autres tĂąches cĂŽtĂ© client lors du traitement en arriĂšre-plan. Dans ce cas, votre application pourra Ă©galement rĂ©pondre librement aux demandes d'autres utilisateurs et clients.Aujourd'hui, nous allons parler du processus d'installation et de configuration de Celery et Redis pour gĂ©rer de longs processus dans une application Django pour rĂ©soudre ces problĂšmes. Nous utiliserons Ă©galement Docker et Docker Compose pour assembler toutes les piĂšces et voir comment tester des travaux Celery avec des tests unitaires et d'intĂ©gration.Ă la fin de ce guide, nous apprendrons:- IntĂ©grez Celery dans Django pour crĂ©er des tĂąches d'arriĂšre-plan.
- Pack Django, Celery et Redis avec Docker.
- Exécutez les processus en arriÚre-plan à l'aide d'un flux de travail distinct.
- Enregistrez les journaux de céleri dans un fichier.
- Configurez Flower pour surveiller et administrer les travaux et les travailleurs du céleri.
- Testez les tùches de céleri avec des tests unitaires et d'intégration.
TĂąches d'arriĂšre-plan
Pour amĂ©liorer l'expĂ©rience utilisateur, de longs processus doivent s'exĂ©cuter en arriĂšre-plan en dehors du flux normal de requĂȘtes / rĂ©ponses HTTP.Par exemple:- Envoi de lettres pour confirmation;
- Scaping et exploration du Web;
- L'analyse des données;
- Traitement d'image;
- Génération de rapports.
Lors de la crĂ©ation d'une application, essayez de sĂ©parer les tĂąches qui doivent ĂȘtre effectuĂ©es pendant la durĂ©e de vie de la demande / rĂ©ponse, par exemple, les opĂ©rations CRUD, des tĂąches qui doivent ĂȘtre effectuĂ©es en arriĂšre-plan.Le processus de travail
Notre objectif est de développer une application Django qui utilise Celery pour gérer de longs processus en dehors du cycle de demande / réponse.- L'utilisateur final génÚre un nouveau travail en envoyant une demande POST au serveur.
- Dans cette vue, le travail est ajouté à la file d'attente et l'ID du travail est renvoyé au client.
- Ă l'aide d'AJAX, le client continue d'interroger le serveur pour vĂ©rifier l'Ă©tat du travail, tandis que le travail lui-mĂȘme s'exĂ©cute en arriĂšre-plan.

Création de projet
Clone du projet à partir du dépÎt django-céleri et faire la caisse sur la v1 balise dans le maßtre branche :$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master
Ătant donnĂ© qu'au total, nous devons travailler avec trois processus (Django, Redis, travailleur), nous utilisons Docker pour simplifier le travail, en les connectant afin que nous puissions tout exĂ©cuter avec une seule commande dans une seule fenĂȘtre de terminal.Ă partir de la racine du projet, crĂ©ez des images et lancez des conteneurs Docker:$ docker-compose up -d --build
Une fois la génération terminée, accédez à localhost : 1337:
vérifiez que les tests réussissent:$ docker-compose exec web python -m pytest
======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item
tests/test_tasks.py . [100%]
========================================= 1 passed in 0.47s =========================================
Jetons un coup d'Ćil Ă la structure du projet avant de poursuivre:âââ .gitignore
âââ LICENSE
âââ README.md
âââ docker-compose.yml
âââ project
âââ Dockerfile
âââ core
â âââ __init__.py
â âââ asgi.py
â âââ settings.py
â âââ urls.py
â âââ wsgi.py
âââ entrypoint.sh
âââ manage.py
âââ pytest.ini
âââ requirements.txt
âââ static
â âââ bulma.min.css
â âââ jquery-3.4.1.min.js
â âââ main.css
â âââ main.js
âââ tasks
â âââ __init__.py
â âââ apps.py
â âââ migrations
â â âââ __init__.py
â âââ templates
â â âââ home.html
â âââ views.py
âââ tests
âââ __init__.py
âââ test_tasks.py
Lancement de l'emploi
Le gestionnaire d'événements est project/static/main.js
abonné au clic d'un bouton. En cliquant sur le serveur envoie un AJAX POST-demande avec le type d'emploi approprié: 1
, 2
ou 3
.$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') },
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
CÎté serveur, une vue a déjà été configurée pour traiter la demande dans project/tasks/views.py
:def run_task(request):
if request.POST:
task_type = request.POST.get("type")
return JsonResponse({"task_type": task_type}, status=202)
Et maintenant, le plaisir commence: nous lions le céleri!Configuration du céleri
Commençons par ajouter Celery et Redis au fichier project/requirements.txt
:celery==4.4.1
Django==3.0.4
redis==3.4.1
pytest==5.4.1
pytest-django==3.8.0
Celery utilise un courtier de messages - RabbitMQ , Redis ou AWS Simple Queue Service (SQS) - pour simplifier la communication entre le travailleur Celery et l'application Web. Les messages sont envoyés au courtier, puis traités par le travailleur. AprÚs cela, les résultats sont envoyés au backend.Redis sera à la fois un courtier et un backend. Ajoutez Redis et Celery Worker au fichier docker-compose.yml
comme suit:version: '3.7'
services:
web:
build: ./project
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ./project:/usr/src/app/
ports:
- 1337:8000
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- redis
celery:
build: ./project
command: celery worker --app=core --loglevel=info
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
redis:
image: redis:5-alpine
Faites attention Ă celery worker --app=core --loglevel=info
:celery worker
utilisé pour démarrer travailleur céleri ;--app=core
utilisé pour lancer core
l'application Celery (que nous définirons prochainement);--loglevel=info
détermine le niveau d'enregistrement des informations.
Ajoutez les éléments suivants au module des paramÚtres du projet afin que Celery utilise Redis en tant que courtier et backend:CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
Ensuite, créez le fichier sample_tasks.py
dans project/tasks
: Ici, en utilisant le dĂ©corateur shared_task, nous avons dĂ©fini une nouvelle fonction de tĂąche Celery appelĂ©e . N'oubliez pas que la tĂąche elle-mĂȘme ne sera pas exĂ©cutĂ©e Ă partir du processus Django, elle sera exĂ©cutĂ©e par le travailleur Celery. Ajoutez maintenant le fichier Ă :# project/tasks/sample_tasks.py
import time
from celery import shared_task
@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True
create_task
celery.py
"project/core"
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
Que se passe t-il ici?- Vous devez d'abord définir la valeur par défaut de l'environnement
DJANGO_SETTINGS_MODULE
afin que Celery sache comment trouver le projet Django. - Ensuite, nous avons créé une instance de Celery avec un nom
core
et l' avons placée dans une variable app
. - Ensuite, nous avons chargé les valeurs de configuration de Celery à partir de l'objet settings de
django.conf
. Nous avons utilisé namespace = "CELERY" pour éviter les conflits avec d'autres paramÚtres Django. Par conséquent, tous les paramÚtres de configuration de Celery doivent commencer par un préfixe CELERY_
. - Enfin,
app.autodiscover_tasks()
indique à Celery de rechercher des travaux à partir des applications définies dans settings.INSTALLED_APPS
.
Modifiez project/core/__init__.py
pour que l'application Celery soit automatiquement importée au démarrage de Django:from .celery import app as celery_app
__all__ = ("celery_app",)
Lancement de l'emploi
Actualisez la vue pour démarrer le travail et envoyer l'ID:@csrf_exempt
def run_task(request):
if request.POST:
task_type = request.POST.get("type")
task = create_task.delay(int(task_type))
return JsonResponse({"task_id": task.id}, status=202)
N'oubliez pas d'importer la tĂąche:from tasks.sample_tasks import create_task
Collectez des images et déployez de nouveaux conteneurs:$ docker-compose up -d --build
Pour démarrer une nouvelle tùche, procédez comme suit:$ curl -F type=0 http://localhost:1337/tasks/
Vous verrez quelque chose comme ceci:{
"task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}
Statut du travail
Revenez au gestionnaire d'événements cÎté client:$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') },
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
Lorsque la réponse revient de la demande AJAX, nous enverrons à getStatus()
partir de l'ID du travail chaque seconde:function getStatus(taskID) {
$.ajax({
url: `/tasks/${taskID}/`,
method: 'GET'
})
.done((res) => {
const html = `
<tr>
<td>${res.task_id}</td>
<td>${res.task_status}</td>
<td>${res.task_result}</td>
</tr>`
$('#tasks').prepend(html);
const taskStatus = res.task_status;
if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
setTimeout(function() {
getStatus(res.task_id);
}, 1000);
})
.fail((err) => {
console.log(err)
});
}
Si la réponse est oui, une nouvelle ligne sera ajoutée à la table DOM. Actualisez la vue get_status
pour renvoyer le statut:@csrf_exempt
def get_status(request, task_id):
task_result = AsyncResult(task_id)
result = {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result
}
return JsonResponse(result, status=200)
Importer AsyncResult :from celery.result import AsyncResult
Mettre Ă jour les conteneurs:$ docker-compose up -d --build
Exécutez une nouvelle tùche:$ curl -F type=1 http://localhost:1337/tasks/
Ensuite, extrayez task_id
de la réponse et appelez mis get_status
à jour pour voir l'état:$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/
{
"task_id": "25278457-0957-4b0b-b1da-2600525f812f",
"task_status": "SUCCESS",
"task_result": true
}
Vous pouvez voir les mĂȘmes informations dans le navigateur:
Grumes de céleri
Mettez Ă jour le service celery
de docker-compose.yml
maniÚre à ce que Celery se déconnecte dans un fichier séparé:celery:
build: ./project
command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
Ajoutez un nouveau répertoire à «projet» et nommez-le «logs» . Ensuite, ajoutez à ce nouveau répertoire, placez le fichier celery.log
.Mise Ă jour:$ docker-compose up -d --build
Vous devez voir comment le fichier journal est rempli localement aprÚs avoir défini le volume :[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
/usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
UserWarning: Using settings.DEBUG leads to a memory
leak, never use this setting in production environments!
warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
succeeded in 10.027462100006233s: True
Tableau de bord fleur
Flower est un outil Web léger pour surveiller le céleri en temps réel. Vous pouvez suivre les tùches en cours d'exécution, augmenter ou diminuer le pool de travailleurs, afficher des graphiques et des statistiques, par exemple.Ajoutez-le à requirements.txt
:celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1
pytest==5.4.1
pytest-django==3.8.0
Ajoutez ensuite le nouveau service Ă docker-compose.yml
:dashboard:
build: ./project
command: flower -A core --port=5555 --broker=redis://redis:6379/0
ports:
- 5555:5555
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
- celery
Et test:$ docker-compose up -d --build
Accédez à localhost : 5555 pour afficher le tableau de bord. Vous devriez voir un travailleur:
exécuter quelques tùches supplémentaires pour tester le tableau de bord:
essayer d'ajouter plus de travailleurs et voir comment cela affecte les performances:$ docker-compose up -d --build --scale celery=3
Les tests
Commençons par le test le plus simple:def test_task():
assert sample_tasks.create_task.run(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run(3)
Ajoutez le scénario de test ci-dessus à project/tests/test_tasks.py
et ajoutez l'importation suivante:from tasks import sample_tasks
Exécutez ce test:$ docker-compose exec web python -m pytest -k "test_task and not test_home"
Ce test prendra environ une minute:======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected
tests/test_tasks.py . [100%]
============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================
Il convient de noter que dans l'assertion ci-dessus, nous avons .run
plutÎt utilisé la méthode .delay
pour lancer directement des tùches, sans utiliser Celery Worker.Vous voulez utiliser de faux plugins pour accélérer les choses?@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
assert sample_tasks.create_task.run(1)
sample_tasks.create_task.run.assert_called_once_with(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run.call_count == 2
assert sample_tasks.create_task.run(3)
assert sample_tasks.create_task.run.call_count == 3
Importation:from unittest.mock import patch, call
Tester:$ docker-compose exec web python -m pytest -k "test_mock_task"
======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected
tests/test_tasks.py . [100%]
================================== 1 passed, 2 deselected in 1.13s ==================================
Voir? Maintenant beaucoup plus vite!Qu'en est-il des tests d'intégration complÚte?def test_task_status(client):
response = client.post(reverse("run_task"), {"type": 0})
content = json.loads(response.content)
task_id = content["task_id"]
assert response.status_code == 202
assert task_id
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
assert response.status_code == 200
while content["task_status"] == "PENDING":
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}
N'oubliez pas que ce test utilise le mĂȘme courtier et le mĂȘme backend que dans le dĂ©veloppement. Vous pouvez crĂ©er une nouvelle instance de Celery pour tester:app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)
Ajouter une importation:import json
Et assurez-vous que les tests sont réussis.Conclusion
Aujourd'hui, nous avons introduit la configuration de base de Celery pour effectuer des tĂąches Ă long terme dans une application sur Django. Vous devez envoyer tous les processus Ă la file d'attente de traitement qui pourraient ralentir le code cĂŽtĂ© utilisateur.Le cĂ©leri peut Ă©galement ĂȘtre utilisĂ© pour effectuer des tĂąches rĂ©pĂ©titives et dĂ©composer des tĂąches complexes gourmandes en ressources afin de rĂ©partir la charge de calcul sur plusieurs machines et de rĂ©duire le temps d'exĂ©cution et la charge sur la machine qui traite les demandes des clients.Vous pouvez trouver tout le code dans ce rĂ©fĂ©rentiel .
â Suivez le cours