Se preparó una traducción del artículo antes del inicio del curso Python Web Developer .
Si su aplicación tiene un proceso largo, puede procesarlo no en la secuencia estándar de solicitud / respuesta, sino en segundo plano.Por ejemplo, en su aplicación, el usuario debe enviar una imagen en miniatura (que probablemente será necesario editar) y confirmar la dirección de correo electrónico. Si su aplicación procesa la imagen y luego envía un correo electrónico de confirmación en el controlador de solicitudes, el usuario final tendrá que esperar por algún motivo para completar ambas tareas antes de volver a cargar o cerrar la página. En cambio, puede transferir estas operaciones a la cola de tareas y dejarlo en un proceso separado para que el procesamiento envíe de inmediato una respuesta al usuario. En este caso, el usuario final podrá hacer otras cosas en el lado del cliente mientras procesa en segundo plano. En este caso, su aplicación también podrá responder libremente a las solicitudes de otros usuarios y clientes.Hoy hablaremos sobre el proceso de configuración y configuración de Celery y Redis para manejar procesos largos en una aplicación Django para resolver tales problemas. También usaremos Docker y Docker Compose para unir todas las piezas y ver cómo probar los trabajos de Celery con pruebas de unidad e integración.Al final de esta guía, aprenderemos:- Integre el apio en Django para crear trabajos en segundo plano.
- Empaque Django, Apio y Redis con Docker.
- Ejecute procesos en segundo plano utilizando un flujo de trabajo separado.
- Guarde los registros de apio en un archivo.
- Configure Flower para monitorear y administrar trabajos y trabajadores de apio.
- Pruebe los trabajos de Apio con pruebas unitarias y de integración.
Tarea en segundo plano
Para mejorar la experiencia del usuario, los procesos largos deben ejecutarse en segundo plano fuera de la secuencia de solicitud / respuesta HTTP normal.Por ejemplo:- Envío de cartas para confirmación;
- Scaping y rastreo web;
- Análisis de los datos;
- Procesamiento de imágenes;
- La generación del informe.
Al crear una aplicación, intente separar las tareas que deben realizarse durante la vida de la solicitud / respuesta, por ejemplo, operaciones CRUD, de las tareas que deben realizarse en segundo plano.El proceso de trabajo
Nuestro objetivo es desarrollar una aplicación Django que use Celery para manejar procesos largos fuera del ciclo de solicitud / respuesta.- El usuario final genera un nuevo trabajo enviando una solicitud POST al servidor.
- En esta vista, el trabajo se agrega a la cola y la identificación del trabajo se devuelve al cliente.
- Usando AJAX, el cliente continúa consultando al servidor para verificar el estado del trabajo, mientras el trabajo se ejecuta en segundo plano.

Creación de proyectos
Clone el proyecto desde el repositorio django-celery y realice un pago en la etiqueta v1 en la rama maestra :$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master
Como en total necesitamos trabajar con tres procesos (Django, Redis, trabajador), usamos Docker para simplificar el trabajo, conectándolos para que podamos ejecutar todo con un comando en una ventana de terminal.Desde la raíz del proyecto, cree imágenes y ejecute contenedores Docker:$ docker-compose up -d --build
Cuando la compilación se complete, vaya a localhost : 1337:
Verifique que las pruebas pasen con éxito:$ docker-compose exec web python -m pytest
======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item
tests/test_tasks.py . [100%]
========================================= 1 passed in 0.47s =========================================
Echemos un vistazo a la estructura del proyecto antes de continuar:├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
├── Dockerfile
├── core
│ ├── __init__.py
│ ├── asgi.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── entrypoint.sh
├── manage.py
├── pytest.ini
├── requirements.txt
├── static
│ ├── bulma.min.css
│ ├── jquery-3.4.1.min.js
│ ├── main.css
│ └── main.js
├── tasks
│ ├── __init__.py
│ ├── apps.py
│ ├── migrations
│ │ └── __init__.py
│ ├── templates
│ │ └── home.html
│ └── views.py
└── tests
├── __init__.py
└── test_tasks.py
Lanzamiento de trabajo
El controlador de eventos se project/static/main.js
suscribe al clic de un botón. Al hacer clic en el servidor envía una solicitud POST-AJAX con el tipo de trabajo adecuado: 1
, 2
o 3
.$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') },
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
En el lado del servidor, ya se ha configurado una vista para procesar la solicitud en project/tasks/views.py
:def run_task(request):
if request.POST:
task_type = request.POST.get("type")
return JsonResponse({"task_type": task_type}, status=202)
Y ahora comienza la diversión: ¡atamos el apio!Configuración de apio
Comencemos agregando Celery y Redis al archivo project/requirements.txt
:celery==4.4.1
Django==3.0.4
redis==3.4.1
pytest==5.4.1
pytest-django==3.8.0
Celery utiliza un agente de mensajes ( RabbitMQ , Redis o AWS Simple Queue Service (SQS)) para simplificar la comunicación entre el trabajador de Celery y la aplicación web. Los mensajes se envían al agente y luego el trabajador los procesa. Después de eso, los resultados se envían al backend.Redis será tanto un corredor como un backend. Agregue Redis and Celery Worker al archivo de la docker-compose.yml
siguiente manera:version: '3.7'
services:
web:
build: ./project
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ./project:/usr/src/app/
ports:
- 1337:8000
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- redis
celery:
build: ./project
command: celery worker --app=core --loglevel=info
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
redis:
image: redis:5-alpine
Presta atención a celery worker --app=core --loglevel=info
:celery worker
solía comenzar a trabajar con apio ;--app=core
se utiliza para iniciar core
la aplicación Celery (que definiremos en breve);--loglevel=info
determina el nivel de registro de información.
Agregue lo siguiente al módulo de configuración del proyecto para que Celery use Redis como agente y backend:CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
Luego cree el archivo sample_tasks.py
en project/tasks
: Aquí, usando el decorador shared_task, definimos una nueva función de tarea de Celery llamada . Recuerde que la tarea en sí no se ejecutará desde el proceso de Django, sino que será realizada por el trabajador de Apio. Ahora agregue el archivo a :# project/tasks/sample_tasks.py
import time
from celery import shared_task
@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True
create_task
celery.py
"project/core"
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
¿Que está pasando aqui?- Primero debe establecer el valor predeterminado para el entorno
DJANGO_SETTINGS_MODULE
para que Celery sepa cómo encontrar el proyecto Django. - Luego creamos una instancia de Celery con un nombre
core
y la colocamos en una variable app
. - Luego cargamos los valores de configuración de Celery desde el objeto de configuración
django.conf
. Usamos namespace = "CELERY" para evitar conflictos con otras configuraciones de Django. Por lo tanto, todas las configuraciones de Celery deben comenzar con un prefijo CELERY_
. - Por último,
app.autodiscover_tasks()
le dice a Celery que busque trabajos de las aplicaciones definidas en settings.INSTALLED_APPS
.
Cambie project/core/__init__.py
para que la aplicación Celery se importe automáticamente al iniciar Django:from .celery import app as celery_app
__all__ = ("celery_app",)
Lanzamiento de trabajo
Actualice la vista para iniciar el trabajo y envíe la identificación:@csrf_exempt
def run_task(request):
if request.POST:
task_type = request.POST.get("type")
task = create_task.delay(int(task_type))
return JsonResponse({"task_id": task.id}, status=202)
No olvides importar la tarea:from tasks.sample_tasks import create_task
Recopile imágenes e implemente nuevos contenedores:$ docker-compose up -d --build
Para comenzar una nueva tarea, haga:$ curl -F type=0 http://localhost:1337/tasks/
Verás algo como esto:{
"task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}
Estado del trabajo
Regrese al controlador de eventos del lado del cliente:$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') },
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
Cuando la respuesta regrese de la solicitud AJAX, enviaremos getStatus()
desde la identificación del trabajo cada segundo:function getStatus(taskID) {
$.ajax({
url: `/tasks/${taskID}/`,
method: 'GET'
})
.done((res) => {
const html = `
<tr>
<td>${res.task_id}</td>
<td>${res.task_status}</td>
<td>${res.task_result}</td>
</tr>`
$('#tasks').prepend(html);
const taskStatus = res.task_status;
if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
setTimeout(function() {
getStatus(res.task_id);
}, 1000);
})
.fail((err) => {
console.log(err)
});
}
Si la respuesta es sí, se agregará una nueva fila a la tabla DOM. Actualice la vista get_status
para devolver el estado:@csrf_exempt
def get_status(request, task_id):
task_result = AsyncResult(task_id)
result = {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result
}
return JsonResponse(result, status=200)
Importar AsyncResult :from celery.result import AsyncResult
Actualizar contenedores:$ docker-compose up -d --build
Ejecute una nueva tarea:$ curl -F type=1 http://localhost:1337/tasks/
Luego extraiga task_id
de la respuesta y llame actualizado get_status
para ver el estado:$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/
{
"task_id": "25278457-0957-4b0b-b1da-2600525f812f",
"task_status": "SUCCESS",
"task_result": true
}
Puede ver la misma información en el navegador:
Troncos De Apio
Actualice el servicio celery
de docker-compose.yml
tal manera que Celery cierre la sesión en un archivo separado:celery:
build: ./project
command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
Agregue un nuevo directorio a "proyecto" y asígnele el nombre "registros" . Luego agregue a este nuevo directorio poner el archivo celery.log
.Actualizar:$ docker-compose up -d --build
Debería ver cómo se llena localmente el archivo de registro después de configurar el volumen :[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
/usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
UserWarning: Using settings.DEBUG leads to a memory
leak, never use this setting in production environments!
warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
succeeded in 10.027462100006233s: True
Tablero de flores
Flower es una herramienta ligera basada en la web para monitorear el apio en tiempo real. Puede realizar un seguimiento de las tareas en ejecución, aumentar o disminuir el grupo de trabajadores, mostrar gráficos y estadísticas, por ejemplo.Añádelo a requirements.txt
:celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1
pytest==5.4.1
pytest-django==3.8.0
Luego agregue el nuevo servicio a docker-compose.yml
:dashboard:
build: ./project
command: flower -A core --port=5555 --broker=redis://redis:6379/0
ports:
- 5555:5555
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
- celery
Y prueba:$ docker-compose up -d --build
Vaya a localhost : 5555 para ver el tablero. Debería ver un trabajador:
ejecute algunas tareas más para probar el panel:
intente agregar más trabajadores y vea cómo afecta el rendimiento:$ docker-compose up -d --build --scale celery=3
Pruebas
Comencemos con la prueba más simple:def test_task():
assert sample_tasks.create_task.run(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run(3)
Agregue el caso de prueba anterior project/tests/test_tasks.py
y agregue la siguiente importación:from tasks import sample_tasks
Ejecute esta prueba:$ docker-compose exec web python -m pytest -k "test_task and not test_home"
Esta prueba tomará aproximadamente un minuto:======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected
tests/test_tasks.py . [100%]
============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================
Vale la pena señalar que en las afirmaciones anteriores usamos el método en su .run
lugar .delay
para iniciar tareas directamente, sin usar el trabajador Celery.¿Desea usar complementos simulados para acelerar las cosas?@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
assert sample_tasks.create_task.run(1)
sample_tasks.create_task.run.assert_called_once_with(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run.call_count == 2
assert sample_tasks.create_task.run(3)
assert sample_tasks.create_task.run.call_count == 3
Importar:from unittest.mock import patch, call
Prueba:$ docker-compose exec web python -m pytest -k "test_mock_task"
======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected
tests/test_tasks.py . [100%]
================================== 1 passed, 2 deselected in 1.13s ==================================
¿Ver? ¡Ahora mucho más rápido!¿Qué pasa con las pruebas de integración completa?def test_task_status(client):
response = client.post(reverse("run_task"), {"type": 0})
content = json.loads(response.content)
task_id = content["task_id"]
assert response.status_code == 202
assert task_id
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
assert response.status_code == 200
while content["task_status"] == "PENDING":
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}
Recuerde que esta prueba utiliza el mismo agente y backend que en el desarrollo. Puede crear una nueva instancia de Celery para probar:app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)
Agregar importación:import json
Y asegúrese de que las pruebas sean exitosas.Conclusión
Hoy presentamos la configuración básica de Celery para realizar tareas a largo plazo en una aplicación en Django. Debe enviar cualquier proceso a la cola de procesamiento que pueda ralentizar el código en el lado del usuario.El apio también se puede utilizar para realizar tareas repetitivas y descomponer tareas complejas que requieren muchos recursos para distribuir la carga computacional en varias máquinas y reducir el tiempo de ejecución y la carga en la máquina que procesa las solicitudes del cliente.Puede encontrar todo el código en este repositorio .
→ Entra en el curso