Emplois asynchrones à Django avec céleri

Une traduction de l'article a été préparée avant le début du cours Python Web Developer .



Si votre application a un processus long, vous pouvez le traiter non pas dans le flux de demande / réponse standard, mais en arriÚre-plan.

Par exemple, dans votre application, l'utilisateur doit envoyer une image miniature (qui, trĂšs probablement, devra ĂȘtre modifiĂ©e) et confirmer l'adresse e-mail. Si votre application traite l'image et envoie ensuite un e-mail pour confirmation dans le gestionnaire de demandes, l'utilisateur final devra attendre une raison quelconque pour terminer les deux tĂąches avant de recharger ou de fermer la page. Au lieu de cela, vous pouvez transfĂ©rer ces opĂ©rations vers la file d'attente des tĂąches et les laisser Ă  un processus distinct pour traitement afin d'envoyer immĂ©diatement une rĂ©ponse Ă  l'utilisateur. Dans ce cas, l'utilisateur final pourra effectuer d'autres tĂąches cĂŽtĂ© client lors du traitement en arriĂšre-plan. Dans ce cas, votre application pourra Ă©galement rĂ©pondre librement aux demandes d'autres utilisateurs et clients.

Aujourd'hui, nous allons parler du processus d'installation et de configuration de Celery et Redis pour gérer de longs processus dans une application Django pour résoudre ces problÚmes. Nous utiliserons également Docker et Docker Compose pour assembler toutes les piÚces et voir comment tester des travaux Celery avec des tests unitaires et d'intégration.

À la fin de ce guide, nous apprendrons:

  • IntĂ©grez Celery dans Django pour crĂ©er des tĂąches d'arriĂšre-plan.
  • Pack Django, Celery et Redis avec Docker.
  • ExĂ©cutez les processus en arriĂšre-plan Ă  l'aide d'un flux de travail distinct.
  • Enregistrez les journaux de cĂ©leri dans un fichier.
  • Configurez Flower pour surveiller et administrer les travaux et les travailleurs du cĂ©leri.
  • Testez les tĂąches de cĂ©leri avec des tests unitaires et d'intĂ©gration.

TĂąches d'arriĂšre-plan


Pour amĂ©liorer l'expĂ©rience utilisateur, de longs processus doivent s'exĂ©cuter en arriĂšre-plan en dehors du flux normal de requĂȘtes / rĂ©ponses HTTP.

Par exemple:

  • Envoi de lettres pour confirmation;
  • Scaping et exploration du Web;
  • L'analyse des donnĂ©es;
  • Traitement d'image;
  • GĂ©nĂ©ration de rapports.

Lors de la crĂ©ation d'une application, essayez de sĂ©parer les tĂąches qui doivent ĂȘtre effectuĂ©es pendant la durĂ©e de vie de la demande / rĂ©ponse, par exemple, les opĂ©rations CRUD, des tĂąches qui doivent ĂȘtre effectuĂ©es en arriĂšre-plan.

Le processus de travail


Notre objectif est de développer une application Django qui utilise Celery pour gérer de longs processus en dehors du cycle de demande / réponse.

  1. L'utilisateur final génÚre un nouveau travail en envoyant une demande POST au serveur.
  2. Dans cette vue, le travail est ajouté à la file d'attente et l'ID du travail est renvoyé au client.
  3. À l'aide d'AJAX, le client continue d'interroger le serveur pour vĂ©rifier l'Ă©tat du travail, tandis que le travail lui-mĂȘme s'exĂ©cute en arriĂšre-plan.



Création de projet


Clone du projet à partir du dépÎt django-céleri et faire la caisse sur la v1 balise dans le maßtre branche :

$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master

Étant donnĂ© qu'au total, nous devons travailler avec trois processus (Django, Redis, travailleur), nous utilisons Docker pour simplifier le travail, en les connectant afin que nous puissions tout exĂ©cuter avec une seule commande dans une seule fenĂȘtre de terminal.

À partir de la racine du projet, crĂ©ez des images et lancez des conteneurs Docker:

$ docker-compose up -d --build

Une fois la génération terminée, accédez à localhost : 1337:



vérifiez que les tests réussissent:

$ docker-compose exec web python -m pytest

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item

tests/test_tasks.py .                                                                         [100%]

========================================= 1 passed in 0.47s =========================================

Jetons un coup d'Ɠil à la structure du projet avant de poursuivre:

├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── pytest.ini
    ├── requirements.txt
    ├── static
    │   ├── bulma.min.css
    │   ├── jquery-3.4.1.min.js
    │   ├── main.css
    │   └── main.js
    ├── tasks
    │   ├── __init__.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── templates
    │   │   └── home.html
    │   └── views.py
    └── tests
        ├── __init__.py
        └── test_tasks.py

Lancement de l'emploi


Le gestionnaire d'événements est project/static/main.jsabonné au clic d'un bouton. En cliquant sur le serveur envoie un AJAX POST-demande avec le type d'emploi approprié: 1, 2ou 3.

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

CÎté serveur, une vue a déjà été configurée pour traiter la demande dans project/tasks/views.py:

def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        return JsonResponse({"task_type": task_type}, status=202)

Et maintenant, le plaisir commence: nous lions le céleri!

Configuration du céleri


Commençons par ajouter Celery et Redis au fichier project/requirements.txt:

celery==4.4.1
Django==3.0.4
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Celery utilise un courtier de messages - RabbitMQ , Redis ou AWS Simple Queue Service (SQS) - pour simplifier la communication entre le travailleur Celery et l'application Web. Les messages sont envoyés au courtier, puis traités par le travailleur. AprÚs cela, les résultats sont envoyés au backend.

Redis sera Ă  la fois un courtier et un backend. Ajoutez Redis et Celery Worker au fichier docker-compose.ymlcomme suit:

version: '3.7'

services:
  web:
    build: ./project
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./project:/usr/src/app/
    ports:
      - 1337:8000
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    build: ./project
    command: celery worker --app=core --loglevel=info
    volumes:
      - ./project:/usr/src/app
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:5-alpine

Faites attention Ă  celery worker --app=core --loglevel=info:

  1. celery workerutilisé pour démarrer travailleur céleri ;
  2. --app=coreutilisé pour lancer core l'application Celery (que nous définirons prochainement);
  3. --loglevel=infodétermine le niveau d'enregistrement des informations.

Ajoutez les éléments suivants au module des paramÚtres du projet afin que Celery utilise Redis en tant que courtier et backend:

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")

Ensuite, crĂ©ez le fichier sample_tasks.pydans project/tasks: Ici, en utilisant le dĂ©corateur shared_task, nous avons dĂ©fini une nouvelle fonction de tĂąche Celery appelĂ©e . N'oubliez pas que la tĂąche elle-mĂȘme ne sera pas exĂ©cutĂ©e Ă  partir du processus Django, elle sera exĂ©cutĂ©e par le travailleur Celery. Ajoutez maintenant le fichier Ă  :

# project/tasks/sample_tasks.py

import time

from celery import shared_task

@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True


create_task



celery.py"project/core"

import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

Que se passe t-il ici?

  1. Vous devez d'abord définir la valeur par défaut de l'environnement DJANGO_SETTINGS_MODULEafin que Celery sache comment trouver le projet Django.
  2. Ensuite, nous avons créé une instance de Celery avec un nom coreet l' avons placée dans une variable app.
  3. Ensuite, nous avons chargé les valeurs de configuration de Celery à partir de l'objet settings de django.conf. Nous avons utilisé namespace = "CELERY" pour éviter les conflits avec d'autres paramÚtres Django. Par conséquent, tous les paramÚtres de configuration de Celery doivent commencer par un préfixe CELERY_.
  4. Enfin, app.autodiscover_tasks()indique à Celery de rechercher des travaux à partir des applications définies dans settings.INSTALLED_APPS.

Modifiez project/core/__init__.pypour que l'application Celery soit automatiquement importée au démarrage de Django:

from .celery import app as celery_app


__all__ = ("celery_app",)

Lancement de l'emploi


Actualisez la vue pour démarrer le travail et envoyer l'ID:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        task = create_task.delay(int(task_type))
        return JsonResponse({"task_id": task.id}, status=202)

N'oubliez pas d'importer la tĂąche:

from tasks.sample_tasks import create_task

Collectez des images et déployez de nouveaux conteneurs:

$ docker-compose up -d --build

Pour démarrer une nouvelle tùche, procédez comme suit:

$ curl -F type=0 http://localhost:1337/tasks/

Vous verrez quelque chose comme ceci:

{
  "task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}

Statut du travail


Revenez au gestionnaire d'événements cÎté client:

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

Lorsque la réponse revient de la demande AJAX, nous enverrons à getStatus()partir de l'ID du travail chaque seconde:

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}/`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      <tr>
        <td>${res.task_id}</td>
        <td>${res.task_status}</td>
        <td>${res.task_result}</td>
      </tr>`
    $('#tasks').prepend(html);

    const taskStatus = res.task_status;

    if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
    setTimeout(function() {
      getStatus(res.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err)
  });
}

Si la réponse est oui, une nouvelle ligne sera ajoutée à la table DOM. Actualisez la vue get_statuspour renvoyer le statut:

@csrf_exempt
def get_status(request, task_id):
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return JsonResponse(result, status=200)

Importer AsyncResult :

from celery.result import AsyncResult

Mettre Ă  jour les conteneurs:

$ docker-compose up -d --build

Exécutez une nouvelle tùche:

$ curl -F type=1 http://localhost:1337/tasks/

Ensuite, extrayez task_idde la réponse et appelez mis get_statusà jour pour voir l'état:

$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/

{
    "task_id": "25278457-0957-4b0b-b1da-2600525f812f",
    "task_status": "SUCCESS",
    "task_result": true
}

Vous pouvez voir les mĂȘmes informations dans le navigateur:



Grumes de céleri


Mettez à jour le service celeryde docker-compose.ymlmaniÚre à ce que Celery se déconnecte dans un fichier séparé:

celery:
  build: ./project
  command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
  volumes:
    - ./project:/usr/src/app
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

Ajoutez un nouveau répertoire à «projet» et nommez-le «logs» . Ensuite, ajoutez à ce nouveau répertoire, placez le fichier celery.log.

Mise Ă  jour:

$ docker-compose up -d --build

Vous devez voir comment le fichier journal est rempli localement aprÚs avoir défini le volume :

[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
    /usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
    UserWarning: Using settings.DEBUG leads to a memory
    leak, never use this setting in production environments!
    warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
    Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
    Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
    succeeded in 10.027462100006233s: True

Tableau de bord fleur


Flower est un outil Web léger pour surveiller le céleri en temps réel. Vous pouvez suivre les tùches en cours d'exécution, augmenter ou diminuer le pool de travailleurs, afficher des graphiques et des statistiques, par exemple.

Ajoutez-le Ă  requirements.txt:

celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Ajoutez ensuite le nouveau service Ă  docker-compose.yml:

dashboard:
  build: ./project
  command:  flower -A core --port=5555 --broker=redis://redis:6379/0
  ports:
    - 5555:5555
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis
    - celery

Et test:

$ docker-compose up -d --build

Accédez à localhost : 5555 pour afficher le tableau de bord. Vous devriez voir un travailleur:



exécuter quelques tùches supplémentaires pour tester le tableau de bord:



essayer d'ajouter plus de travailleurs et voir comment cela affecte les performances:

$ docker-compose up -d --build --scale celery=3

Les tests


Commençons par le test le plus simple:

def test_task():
    assert sample_tasks.create_task.run(1)
    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run(3)

Ajoutez le scénario de test ci-dessus à project/tests/test_tasks.pyet ajoutez l'importation suivante:

from tasks import sample_tasks


Exécutez ce test:

$ docker-compose exec web python -m pytest -k "test_task and not test_home"

Ce test prendra environ une minute:

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================

Il convient de noter que dans l'assertion ci-dessus, nous avons .runplutÎt utilisé la méthode .delaypour lancer directement des tùches, sans utiliser Celery Worker.
Vous voulez utiliser de faux plugins pour accélérer les choses?

@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
    assert sample_tasks.create_task.run(1)
    sample_tasks.create_task.run.assert_called_once_with(1)

    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run.call_count == 2

    assert sample_tasks.create_task.run(3)
    assert sample_tasks.create_task.run.call_count == 3

Importation:

from unittest.mock import patch, call

Tester:

$ docker-compose exec web python -m pytest -k "test_mock_task"

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

================================== 1 passed, 2 deselected in 1.13s ==================================

Voir? Maintenant beaucoup plus vite!

Qu'en est-il des tests d'intégration complÚte?

def test_task_status(client):
    response = client.post(reverse("run_task"), {"type": 0})
    content = json.loads(response.content)
    task_id = content["task_id"]
    assert response.status_code == 202
    assert task_id

    response = client.get(reverse("get_status", args=[task_id]))
    content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
    assert response.status_code == 200

    while content["task_status"] == "PENDING":
        response = client.get(reverse("get_status", args=[task_id]))
        content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}

N'oubliez pas que ce test utilise le mĂȘme courtier et le mĂȘme backend que dans le dĂ©veloppement. Vous pouvez crĂ©er une nouvelle instance de Celery pour tester:

app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)

Ajouter une importation:

import json

Et assurez-vous que les tests sont réussis.

Conclusion


Aujourd'hui, nous avons introduit la configuration de base de Celery pour effectuer des tùches à long terme dans une application sur Django. Vous devez envoyer tous les processus à la file d'attente de traitement qui pourraient ralentir le code cÎté utilisateur.

Le cĂ©leri peut Ă©galement ĂȘtre utilisĂ© pour effectuer des tĂąches rĂ©pĂ©titives et dĂ©composer des tĂąches complexes gourmandes en ressources afin de rĂ©partir la charge de calcul sur plusieurs machines et de rĂ©duire le temps d'exĂ©cution et la charge sur la machine qui traite les demandes des clients.

Vous pouvez trouver tout le code dans ce référentiel .



→ Suivez le cours

All Articles