Empleos Asincrónicos en Django con Apio

Se preparó una traducción del artículo antes del inicio del curso Python Web Developer .



Si su aplicación tiene un proceso largo, puede procesarlo no en la secuencia estándar de solicitud / respuesta, sino en segundo plano.

Por ejemplo, en su aplicación, el usuario debe enviar una imagen en miniatura (que probablemente será necesario editar) y confirmar la dirección de correo electrónico. Si su aplicación procesa la imagen y luego envía un correo electrónico de confirmación en el controlador de solicitudes, el usuario final tendrá que esperar por algún motivo para completar ambas tareas antes de volver a cargar o cerrar la página. En cambio, puede transferir estas operaciones a la cola de tareas y dejarlo en un proceso separado para que el procesamiento envíe de inmediato una respuesta al usuario. En este caso, el usuario final podrá hacer otras cosas en el lado del cliente mientras procesa en segundo plano. En este caso, su aplicación también podrá responder libremente a las solicitudes de otros usuarios y clientes.

Hoy hablaremos sobre el proceso de configuración y configuración de Celery y Redis para manejar procesos largos en una aplicación Django para resolver tales problemas. También usaremos Docker y Docker Compose para unir todas las piezas y ver cómo probar los trabajos de Celery con pruebas de unidad e integración.

Al final de esta guía, aprenderemos:

  • Integre el apio en Django para crear trabajos en segundo plano.
  • Empaque Django, Apio y Redis con Docker.
  • Ejecute procesos en segundo plano utilizando un flujo de trabajo separado.
  • Guarde los registros de apio en un archivo.
  • Configure Flower para monitorear y administrar trabajos y trabajadores de apio.
  • Pruebe los trabajos de Apio con pruebas unitarias y de integración.

Tarea en segundo plano


Para mejorar la experiencia del usuario, los procesos largos deben ejecutarse en segundo plano fuera de la secuencia de solicitud / respuesta HTTP normal.

Por ejemplo:

  • Envío de cartas para confirmación;
  • Scaping y rastreo web;
  • Análisis de los datos;
  • Procesamiento de imágenes;
  • La generación del informe.

Al crear una aplicación, intente separar las tareas que deben realizarse durante la vida de la solicitud / respuesta, por ejemplo, operaciones CRUD, de las tareas que deben realizarse en segundo plano.

El proceso de trabajo


Nuestro objetivo es desarrollar una aplicación Django que use Celery para manejar procesos largos fuera del ciclo de solicitud / respuesta.

  1. El usuario final genera un nuevo trabajo enviando una solicitud POST al servidor.
  2. En esta vista, el trabajo se agrega a la cola y la identificación del trabajo se devuelve al cliente.
  3. Usando AJAX, el cliente continúa consultando al servidor para verificar el estado del trabajo, mientras el trabajo se ejecuta en segundo plano.



Creación de proyectos


Clone el proyecto desde el repositorio django-celery y realice un pago en la etiqueta v1 en la rama maestra :

$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master

Como en total necesitamos trabajar con tres procesos (Django, Redis, trabajador), usamos Docker para simplificar el trabajo, conectándolos para que podamos ejecutar todo con un comando en una ventana de terminal.

Desde la raíz del proyecto, cree imágenes y ejecute contenedores Docker:

$ docker-compose up -d --build

Cuando la compilación se complete, vaya a localhost : 1337:



Verifique que las pruebas pasen con éxito:

$ docker-compose exec web python -m pytest

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item

tests/test_tasks.py .                                                                         [100%]

========================================= 1 passed in 0.47s =========================================

Echemos un vistazo a la estructura del proyecto antes de continuar:

├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── pytest.ini
    ├── requirements.txt
    ├── static
    │   ├── bulma.min.css
    │   ├── jquery-3.4.1.min.js
    │   ├── main.css
    │   └── main.js
    ├── tasks
    │   ├── __init__.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── templates
    │   │   └── home.html
    │   └── views.py
    └── tests
        ├── __init__.py
        └── test_tasks.py

Lanzamiento de trabajo


El controlador de eventos se project/static/main.jssuscribe al clic de un botón. Al hacer clic en el servidor envía una solicitud POST-AJAX con el tipo de trabajo adecuado: 1, 2o 3.

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

En el lado del servidor, ya se ha configurado una vista para procesar la solicitud en project/tasks/views.py:

def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        return JsonResponse({"task_type": task_type}, status=202)

Y ahora comienza la diversión: ¡atamos el apio!

Configuración de apio


Comencemos agregando Celery y Redis al archivo project/requirements.txt:

celery==4.4.1
Django==3.0.4
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Celery utiliza un agente de mensajes ( RabbitMQ , Redis o AWS Simple Queue Service (SQS)) para simplificar la comunicación entre el trabajador de Celery y la aplicación web. Los mensajes se envían al agente y luego el trabajador los procesa. Después de eso, los resultados se envían al backend.

Redis será tanto un corredor como un backend. Agregue Redis and Celery Worker al archivo de la docker-compose.ymlsiguiente manera:

version: '3.7'

services:
  web:
    build: ./project
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./project:/usr/src/app/
    ports:
      - 1337:8000
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    build: ./project
    command: celery worker --app=core --loglevel=info
    volumes:
      - ./project:/usr/src/app
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:5-alpine

Presta atención a celery worker --app=core --loglevel=info:

  1. celery workersolía comenzar a trabajar con apio ;
  2. --app=corese utiliza para iniciar core la aplicación Celery (que definiremos en breve);
  3. --loglevel=infodetermina el nivel de registro de información.

Agregue lo siguiente al módulo de configuración del proyecto para que Celery use Redis como agente y backend:

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")

Luego cree el archivo sample_tasks.pyen project/tasks: Aquí, usando el decorador shared_task, definimos una nueva función de tarea de Celery llamada . Recuerde que la tarea en sí no se ejecutará desde el proceso de Django, sino que será realizada por el trabajador de Apio. Ahora agregue el archivo a :

# project/tasks/sample_tasks.py

import time

from celery import shared_task

@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True


create_task



celery.py"project/core"

import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

¿Que está pasando aqui?

  1. Primero debe establecer el valor predeterminado para el entorno DJANGO_SETTINGS_MODULEpara que Celery sepa cómo encontrar el proyecto Django.
  2. Luego creamos una instancia de Celery con un nombre corey la colocamos en una variable app.
  3. Luego cargamos los valores de configuración de Celery desde el objeto de configuración django.conf. Usamos namespace = "CELERY" para evitar conflictos con otras configuraciones de Django. Por lo tanto, todas las configuraciones de Celery deben comenzar con un prefijo CELERY_.
  4. Por último, app.autodiscover_tasks()le dice a Celery que busque trabajos de las aplicaciones definidas en settings.INSTALLED_APPS.

Cambie project/core/__init__.pypara que la aplicación Celery se importe automáticamente al iniciar Django:

from .celery import app as celery_app


__all__ = ("celery_app",)

Lanzamiento de trabajo


Actualice la vista para iniciar el trabajo y envíe la identificación:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        task = create_task.delay(int(task_type))
        return JsonResponse({"task_id": task.id}, status=202)

No olvides importar la tarea:

from tasks.sample_tasks import create_task

Recopile imágenes e implemente nuevos contenedores:

$ docker-compose up -d --build

Para comenzar una nueva tarea, haga:

$ curl -F type=0 http://localhost:1337/tasks/

Verás algo como esto:

{
  "task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}

Estado del trabajo


Regrese al controlador de eventos del lado del cliente:

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

Cuando la respuesta regrese de la solicitud AJAX, enviaremos getStatus()desde la identificación del trabajo cada segundo:

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}/`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      <tr>
        <td>${res.task_id}</td>
        <td>${res.task_status}</td>
        <td>${res.task_result}</td>
      </tr>`
    $('#tasks').prepend(html);

    const taskStatus = res.task_status;

    if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
    setTimeout(function() {
      getStatus(res.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err)
  });
}

Si la respuesta es sí, se agregará una nueva fila a la tabla DOM. Actualice la vista get_statuspara devolver el estado:

@csrf_exempt
def get_status(request, task_id):
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return JsonResponse(result, status=200)

Importar AsyncResult :

from celery.result import AsyncResult

Actualizar contenedores:

$ docker-compose up -d --build

Ejecute una nueva tarea:

$ curl -F type=1 http://localhost:1337/tasks/

Luego extraiga task_idde la respuesta y llame actualizado get_statuspara ver el estado:

$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/

{
    "task_id": "25278457-0957-4b0b-b1da-2600525f812f",
    "task_status": "SUCCESS",
    "task_result": true
}

Puede ver la misma información en el navegador:



Troncos De Apio


Actualice el servicio celeryde docker-compose.ymltal manera que Celery cierre la sesión en un archivo separado:

celery:
  build: ./project
  command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
  volumes:
    - ./project:/usr/src/app
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

Agregue un nuevo directorio a "proyecto" y asígnele el nombre "registros" . Luego agregue a este nuevo directorio poner el archivo celery.log.

Actualizar:

$ docker-compose up -d --build

Debería ver cómo se llena localmente el archivo de registro después de configurar el volumen :

[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
    /usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
    UserWarning: Using settings.DEBUG leads to a memory
    leak, never use this setting in production environments!
    warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
    Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
    Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
    succeeded in 10.027462100006233s: True

Tablero de flores


Flower es una herramienta ligera basada en la web para monitorear el apio en tiempo real. Puede realizar un seguimiento de las tareas en ejecución, aumentar o disminuir el grupo de trabajadores, mostrar gráficos y estadísticas, por ejemplo.

Añádelo a requirements.txt:

celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Luego agregue el nuevo servicio a docker-compose.yml:

dashboard:
  build: ./project
  command:  flower -A core --port=5555 --broker=redis://redis:6379/0
  ports:
    - 5555:5555
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis
    - celery

Y prueba:

$ docker-compose up -d --build

Vaya a localhost : 5555 para ver el tablero. Debería ver un trabajador:



ejecute algunas tareas más para probar el panel:



intente agregar más trabajadores y vea cómo afecta el rendimiento:

$ docker-compose up -d --build --scale celery=3

Pruebas


Comencemos con la prueba más simple:

def test_task():
    assert sample_tasks.create_task.run(1)
    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run(3)

Agregue el caso de prueba anterior project/tests/test_tasks.pyy agregue la siguiente importación:

from tasks import sample_tasks


Ejecute esta prueba:

$ docker-compose exec web python -m pytest -k "test_task and not test_home"

Esta prueba tomará aproximadamente un minuto:

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================

Vale la pena señalar que en las afirmaciones anteriores usamos el método en su .runlugar .delaypara iniciar tareas directamente, sin usar el trabajador Celery.
¿Desea usar complementos simulados para acelerar las cosas?

@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
    assert sample_tasks.create_task.run(1)
    sample_tasks.create_task.run.assert_called_once_with(1)

    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run.call_count == 2

    assert sample_tasks.create_task.run(3)
    assert sample_tasks.create_task.run.call_count == 3

Importar:

from unittest.mock import patch, call

Prueba:

$ docker-compose exec web python -m pytest -k "test_mock_task"

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

================================== 1 passed, 2 deselected in 1.13s ==================================

¿Ver? ¡Ahora mucho más rápido!

¿Qué pasa con las pruebas de integración completa?

def test_task_status(client):
    response = client.post(reverse("run_task"), {"type": 0})
    content = json.loads(response.content)
    task_id = content["task_id"]
    assert response.status_code == 202
    assert task_id

    response = client.get(reverse("get_status", args=[task_id]))
    content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
    assert response.status_code == 200

    while content["task_status"] == "PENDING":
        response = client.get(reverse("get_status", args=[task_id]))
        content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}

Recuerde que esta prueba utiliza el mismo agente y backend que en el desarrollo. Puede crear una nueva instancia de Celery para probar:

app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)

Agregar importación:

import json

Y asegúrese de que las pruebas sean exitosas.

Conclusión


Hoy presentamos la configuración básica de Celery para realizar tareas a largo plazo en una aplicación en Django. Debe enviar cualquier proceso a la cola de procesamiento que pueda ralentizar el código en el lado del usuario.

El apio también se puede utilizar para realizar tareas repetitivas y descomponer tareas complejas que requieren muchos recursos para distribuir la carga computacional en varias máquinas y reducir el tiempo de ejecución y la carga en la máquina que procesa las solicitudes del cliente.

Puede encontrar todo el código en este repositorio .



Entra en el curso

All Articles