Trabalhos assĂ­ncronos no Django com aipo

Uma tradução do artigo foi preparada antes do início do curso Python Web Developer .



Se o seu aplicativo tiver algum processo demorado, você poderá processá-lo não no fluxo de solicitação / resposta padrão, mas em segundo plano.

Por exemplo, no seu aplicativo, o usuário deve enviar uma imagem em miniatura (que, provavelmente, precisará ser editada) e confirmar o endereço de email. Se seu aplicativo processar a imagem e enviar um email para confirmação no manipulador de solicitações, o usuário final terá que esperar por algum motivo para concluir as duas tarefas antes de recarregar ou fechar a página. Em vez disso, você pode transferir essas operações para a fila de tarefas e deixar em um processo separado para processamento para enviar imediatamente uma resposta ao usuário. Nesse caso, o usuário final poderá fazer outras coisas no lado do cliente durante o processamento em segundo plano. Nesse caso, seu aplicativo também poderá responder livremente a solicitações de outros usuários e clientes.

Hoje falaremos sobre o processo de instalação e configuração do Celery e Redis para lidar com processos demorados em um aplicativo Django para resolver esses problemas. Também usaremos o Docker e o Docker Compose para juntar todas as peças e ver como testar os trabalhos do Aipo com testes de unidade e integração.

No final deste guia, aprenderemos:

  • Integre o Aipo no Django para criar trabalhos em segundo plano.
  • Pacote Django, Aipo e Redis com Docker.
  • Execute processos em segundo plano usando um fluxo de trabalho separado.
  • Salve os logs do aipo em um arquivo.
  • Configure o Flower para monitorar e administrar trabalhos e trabalhadores do aipo.
  • Teste os trabalhos do aipo com testes de unidade e integração.

Tarefas em segundo plano


Para melhorar a experiência do usuário, processos demorados devem ser executados em segundo plano fora do fluxo normal de solicitação / resposta HTTP.

Por exemplo:

  • Envio de cartas para confirmação;
  • Web scaping e rastreamento;
  • Análise de dados;
  • Processamento de imagem;
  • Geração de relatĂłrio.

Ao criar um aplicativo, tente separar as tarefas que devem ser executadas durante a vida útil da solicitação / resposta, por exemplo, operações CRUD, das tarefas que devem ser executadas em segundo plano.

O processo de trabalho


Nosso objetivo é desenvolver um aplicativo Django que use o Celery para lidar com processos demorados fora do ciclo de solicitação / resposta.

  1. O usuário final gera um novo trabalho enviando uma solicitação POST ao servidor.
  2. Nesta visĂŁo, o trabalho Ă© adicionado Ă  fila e o ID do trabalho Ă© enviado de volta ao cliente.
  3. Usando o AJAX, o cliente continua a consultar o servidor para verificar o status do trabalho, enquanto o próprio trabalho está sendo executado em segundo plano.



Criação de projeto


Clone o projeto no repositório django-aipo e faça um checkout na tag v1 no ramo principal :

$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master

Como no total precisamos trabalhar com trĂŞs processos (Django, Redis, worker), usamos o Docker para simplificar o trabalho, conectando-os para que possamos executar tudo com um comando em uma janela do terminal.

Na raiz do projeto, crie imagens e inicie os contĂŞineres do Docker:

$ docker-compose up -d --build

Quando a construção for concluída, vá para localhost : 1337:



Verifique se os testes foram aprovados com ĂŞxito:

$ docker-compose exec web python -m pytest

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item

tests/test_tasks.py .                                                                         [100%]

========================================= 1 passed in 0.47s =========================================

Vamos dar uma olhada na estrutura do projeto antes de prosseguir:

├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── pytest.ini
    ├── requirements.txt
    ├── static
    │   ├── bulma.min.css
    │   ├── jquery-3.4.1.min.js
    │   ├── main.css
    │   └── main.js
    ├── tasks
    │   ├── __init__.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── templates
    │   │   └── home.html
    │   └── views.py
    └── tests
        ├── __init__.py
        └── test_tasks.py

Lançamento de emprego


O manipulador de eventos Ă© project/static/main.jsinscrito no clique de um botĂŁo. Ao clicar sobre o servidor envia um POST-request AJAX com o tipo de trabalho apropriado: 1, 2ou 3.

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

No lado do servidor, uma visualização já foi configurada para processar a solicitação em project/tasks/views.py:

def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        return JsonResponse({"task_type": task_type}, status=202)

E agora começa a diversão: amarramos o aipo!

Configuração de Aipo


Vamos começar adicionando Aipo e Redis ao arquivo project/requirements.txt:

celery==4.4.1
Django==3.0.4
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

O Celery usa um intermediário de mensagens - RabbitMQ , Redis ou AWS Simple Queue Service (SQS) - para simplificar a comunicação entre o trabalhador do Celery e o aplicativo Web. As mensagens são enviadas ao intermediário e processadas pelo trabalhador. Depois disso, os resultados são enviados para o back-end.

Redis será um corretor e um back-end. Adicione Redis e Celery Worker ao arquivo da docker-compose.ymlseguinte maneira:

version: '3.7'

services:
  web:
    build: ./project
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./project:/usr/src/app/
    ports:
      - 1337:8000
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    build: ./project
    command: celery worker --app=core --loglevel=info
    volumes:
      - ./project:/usr/src/app
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:5-alpine

Preste atenção a celery worker --app=core --loglevel=info:

  1. celery workercostumava iniciar trabalhador de aipo ;
  2. --app=coreusado para iniciar core o aplicativo Celery (que definiremos em breve);
  3. --loglevel=infodetermina o nível de registro de informações.

Adicione o seguinte ao módulo de configurações do projeto para que o Celery use o Redis como um broker e back-end:

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")

Em seguida, crie o arquivo sample_tasks.pyem project/tasks: Aqui, usando o decorador shared_task, definimos uma nova função de tarefa do Celery chamada . Lembre-se de que a tarefa em si não será executada a partir do processo Django, será executada pelo trabalhador do Celery. Agora adicione o arquivo a :

# project/tasks/sample_tasks.py

import time

from celery import shared_task

@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True


create_task



celery.py"project/core"

import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

O que está acontecendo aqui?

  1. Primeiro, vocĂŞ precisa definir o valor padrĂŁo para o ambiente DJANGO_SETTINGS_MODULEpara que o Celery saiba como encontrar o projeto Django.
  2. Em seguida, criamos uma instância do aipo com um nome coree a colocamos em uma variável app.
  3. Em seguida, carregamos os valores de configuração do aipo a partir do objeto de configurações de django.conf. Usamos namespace = "CELERY" para evitar conflitos com outras configurações do Django. Portanto, todas as definições de configuração do Aipo devem começar com um prefixo CELERY_.
  4. Por fim, app.autodiscover_tasks()pede ao Celery para procurar trabalhos nos aplicativos definidos em settings.INSTALLED_APPS.

Altere project/core/__init__.pypara que o aplicativo Celery seja importado automaticamente ao iniciar o Django:

from .celery import app as celery_app


__all__ = ("celery_app",)

Lançamento de emprego


Atualize a visualização para iniciar o trabalho e envie o ID:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        task = create_task.delay(int(task_type))
        return JsonResponse({"task_id": task.id}, status=202)

Não esqueça de importar a tarefa:

from tasks.sample_tasks import create_task

Colete imagens e implante novos contĂŞineres:

$ docker-compose up -d --build

Para iniciar uma nova tarefa, faça:

$ curl -F type=0 http://localhost:1337/tasks/

Você verá algo assim:

{
  "task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}

Status de trabalho


Retorne ao manipulador de eventos do lado do cliente:

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

Quando a resposta retornar da solicitação AJAX, enviaremos a getStatus()partir da identificação do trabalho a cada segundo:

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}/`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      <tr>
        <td>${res.task_id}</td>
        <td>${res.task_status}</td>
        <td>${res.task_result}</td>
      </tr>`
    $('#tasks').prepend(html);

    const taskStatus = res.task_status;

    if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
    setTimeout(function() {
      getStatus(res.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err)
  });
}

Se a resposta for sim, uma nova linha será adicionada à tabela DOM. Atualize a visualização get_statuspara retornar o status:

@csrf_exempt
def get_status(request, task_id):
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return JsonResponse(result, status=200)

Importar AsyncResult :

from celery.result import AsyncResult

Atualizar contĂŞineres:

$ docker-compose up -d --build

Execute uma nova tarefa:

$ curl -F type=1 http://localhost:1337/tasks/

Em seguida, extraia task_idda resposta e chame atualizado get_statuspara ver o status:

$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/

{
    "task_id": "25278457-0957-4b0b-b1da-2600525f812f",
    "task_status": "SUCCESS",
    "task_result": true
}

Você pode ver as mesmas informações no navegador:



Logs Aipo


Atualizar o serviço celeryde docker-compose.ymltal forma que aipo faz logoff em um arquivo separado:

celery:
  build: ./project
  command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
  volumes:
    - ./project:/usr/src/app
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

Adicione um novo diretĂłrio ao "projeto" e denomine "logs" . Em seguida, adicione a este novo diretĂłrio coloque o arquivo celery.log.

Atualizar:

$ docker-compose up -d --build

VocĂŞ deve ver como o arquivo de log Ă© preenchido localmente apĂłs definir o volume :

[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
    /usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
    UserWarning: Using settings.DEBUG leads to a memory
    leak, never use this setting in production environments!
    warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
    Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
    Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
    succeeded in 10.027462100006233s: True

Painel de flores


Flower é uma ferramenta leve e baseada na Web para monitorar o aipo em tempo real. Você pode rastrear tarefas em execução, aumentar ou diminuir o pool de trabalhadores, exibir gráficos e estatísticas, por exemplo.

Adicione a requirements.txt:

celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Em seguida, adicione o novo serviço a docker-compose.yml:

dashboard:
  build: ./project
  command:  flower -A core --port=5555 --broker=redis://redis:6379/0
  ports:
    - 5555:5555
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis
    - celery

E teste:

$ docker-compose up -d --build

Vá para localhost : 5555 para visualizar o painel. Você deve ver um trabalhador:



Execute mais algumas tarefas para testar o painel:



Tente adicionar mais trabalhadores e veja como isso afeta o desempenho:

$ docker-compose up -d --build --scale celery=3

Testes


Vamos começar com o teste mais simples:

def test_task():
    assert sample_tasks.create_task.run(1)
    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run(3)

Adicione o caso de teste acima a project/tests/test_tasks.pye adicione a seguinte importação:

from tasks import sample_tasks


Execute este teste:

$ docker-compose exec web python -m pytest -k "test_task and not test_home"

Este teste levará cerca de um minuto:

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================

É interessante notar que no assert do acima foi utilizado o método .runem vez .delayde lançar diretamente tarefas, sem a utilização de trabalhador aipo.
Deseja usar plugins simulados para acelerar as coisas?

@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
    assert sample_tasks.create_task.run(1)
    sample_tasks.create_task.run.assert_called_once_with(1)

    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run.call_count == 2

    assert sample_tasks.create_task.run(3)
    assert sample_tasks.create_task.run.call_count == 3

Importar:

from unittest.mock import patch, call

Teste:

$ docker-compose exec web python -m pytest -k "test_mock_task"

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

================================== 1 passed, 2 deselected in 1.13s ==================================

Vejo? Agora muito mais rápido!

E o teste de integração total?

def test_task_status(client):
    response = client.post(reverse("run_task"), {"type": 0})
    content = json.loads(response.content)
    task_id = content["task_id"]
    assert response.status_code == 202
    assert task_id

    response = client.get(reverse("get_status", args=[task_id]))
    content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
    assert response.status_code == 200

    while content["task_status"] == "PENDING":
        response = client.get(reverse("get_status", args=[task_id]))
        content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}

Lembre-se de que este teste usa o mesmo broker e back-end que no desenvolvimento. Você pode criar uma nova instância do Aipo para teste:

app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)

Adicionar importação:

import json

E verifique se os testes foram bem-sucedidos.

ConclusĂŁo


Hoje nós introduzimos a configuração básica do Celery para executar tarefas de longo prazo em um aplicativo no Django. Você deve enviar quaisquer processos para a fila de processamento que possam retardar o código no lado do usuário.

O aipo também pode ser usado para executar tarefas repetitivas e decompor tarefas complexas que consomem muitos recursos, a fim de distribuir a carga computacional em várias máquinas e reduzir o tempo de execução e a carga na máquina que processa solicitações do cliente.

VocĂŞ pode encontrar todo o cĂłdigo neste repositĂłrio .



→ Entre no curso

All Articles