Django与Celery中的异步作业

Python Web Developer课程开始之前准备了文章的翻译



如果您的应用程序有任何冗长的过程,则可以在标准请求/响应流中而不是在后台处理它。

例如,在您的应用程序中,用户必须发送缩略图(很可能需要对其进行编辑)并确认电子邮件地址。如果您的应用程序处理了图像,然后在请求处理程序中发送了一封电子邮件以进行确认,则最终用户将不得不等待某种原因来完成这两项任务,然后才能重新加载或关闭页面。相反,您可以将这些操作转移到任务队列,然后将其留给单独的进程进行处理,以立即向用户发送响应。在这种情况下,最终用户将能够在客户端进行后台处理的同时做其他事情。在这种情况下,您的应用程序还可以自由响应其他用户和客户端的请求。

今天,我们将讨论在Django应用程序中设置和配置Celery和Redis以处理冗长的过程以解决此类问题的过程。我们还将使用Docker和Docker Compose将所有部分绑定在一起,并查看如何使用单元测试和集成测试来测试Celery作业。

到本指南结束时,我们将学习:

  • 将Celery集成到Django中以创建后台作业。
  • 使用Docker打包Django,Celery和Redis。
  • 使用单独的工作流程在后台运行流程。
  • 将Celery日志保存到文件。
  • 设置Flower来监视和管理芹菜工作和工人。
  • 使用单元测试和集成测试来测试Celery作业。

后台任务


为了改善用户体验,冗长的进程应在常规HTTP请求/响应流之外的后台运行。

例如:

  • 发送确认信;
  • 网站脱网和爬网;
  • 数据分析;
  • 图像处理;
  • 报告生成。

创建应用程序时,请尝试将请求/响应期间应执行的任务(例如CRUD操作)与应在后台执行的任务分开。

工作过程


我们的目标是开发一个使用Celery来处理请求/响应周期之外的冗长流程的Django应用程序。

  1. 最终用户通过向服务器发送POST请求来生成新作业。
  2. 在此视图中,作业被添加到队列中,并且作业ID被发送回客户端。
  3. 当作业本身在后台运行时,客户端使用AJAX继续查询服务器以检查作业的状态。



项目创建


克隆从项目Django的芹菜储存库,并做了结帐V1标签分支

$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master

由于总共需要使用三个进程(Django,Redis,worker),因此我们使用Docker简化了工作,将它们连接在一起,以便我们可以在一个终端窗口中使用一个命令运行所有内容。

从项目根目录创建映像并启动Docker容器:

$ docker-compose up -d --build

构建完成后,请转到localhost:1337:



验证测试是否成功通过:

$ docker-compose exec web python -m pytest

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item

tests/test_tasks.py .                                                                         [100%]

========================================= 1 passed in 0.47s =========================================

在继续之前,让我们看一下项目的结构:

├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── pytest.ini
    ├── requirements.txt
    ├── static
    │   ├── bulma.min.css
    │   ├── jquery-3.4.1.min.js
    │   ├── main.css
    │   └── main.js
    ├── tasks
    │   ├── __init__.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── templates
    │   │   └── home.html
    │   └── views.py
    └── tests
        ├── __init__.py
        └── test_tasks.py

职位发布


事件处理程序project/static/main.js订阅了单击按钮的操作。通过点击服务器发送带有适当的作业类型的AJAX POST请求:123

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

在服务器端,已经配置了一个视图来处理请求project/tasks/views.py

def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        return JsonResponse({"task_type": task_type}, status=202)

现在,乐趣开始了:我们与芹菜并列!

芹菜设置


首先,将Celery和Redis添加到文件中project/requirements.txt

celery==4.4.1
Django==3.0.4
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Celery使用消息代理RabbitMQRedisAWS Simple Queue Service(SQS))来简化Celery worker与Web应用程序之间的通信。消息发送到代理,然后由工作人员处理。之后,结果将发送到后端。

Redis将既是经纪人又是后端。将Redis和Celery Worker添加到文件中docker-compose.yml,如下所示:

version: '3.7'

services:
  web:
    build: ./project
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./project:/usr/src/app/
    ports:
      - 1337:8000
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    build: ./project
    command: celery worker --app=core --loglevel=info
    volumes:
      - ./project:/usr/src/app
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:5-alpine

注意celery worker --app=core --loglevel=info

  1. celery worker用来开办芹菜工人;
  2. --app=core用于启动Celery core 应用程序(稍后将对其进行定义);
  3. --loglevel=info确定信息记录级别

将以下内容添加到项目设置模块,以便Celery将Redis用作代理和后端:

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")

然后sample_tasks.py 以下位置创建文件project/tasks在这里,使用shared_task装饰器我们定义了一个名为的新Celery任务函数 请记住,任务本身不会从Django进程中执行,而是由Celery工作者执行。 现在将文件添加

# project/tasks/sample_tasks.py

import time

from celery import shared_task

@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True


create_task



celery.py"project/core"

import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

这里发生了什么?

  1. 首先,您需要为环境设置默认值,DJANGO_SETTINGS_MODULE以便Celery知道如何查找Django项目。
  2. 然后,我们用名称创建了Celery实例,core并将其放在变量中app
  3. 然后我们从中的设置对象加载了Celery配置值django.conf我们使用命名空间=“ CELERY”来防止与其他Django设置冲突。因此,Celery的所有配置设置都必须以prefix开头CELERY_
  4. 最后,app.autodiscover_tasks()告诉Celery从中定义的应用程序中查找作业settings.INSTALLED_APPS

进行更改,project/core/__init__.py以便在启动Django时自动导入Celery应用程序:

from .celery import app as celery_app


__all__ = ("celery_app",)

职位发布


刷新视图以开始作业并发送ID:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        task = create_task.delay(int(task_type))
        return JsonResponse({"task_id": task.id}, status=202)

不要忘记导入任务:

from tasks.sample_tasks import create_task

收集映像并部署新容器:

$ docker-compose up -d --build

要开始新任务,请执行以下操作:

$ curl -F type=0 http://localhost:1337/tasks/

您将看到如下内容:

{
  "task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}

工作现状


返回客户端事件处理程序:

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

当响应从AJAX请求返回时,我们将getStatus()每秒从作业ID 发送一次:

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}/`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      <tr>
        <td>${res.task_id}</td>
        <td>${res.task_status}</td>
        <td>${res.task_result}</td>
      </tr>`
    $('#tasks').prepend(html);

    const taskStatus = res.task_status;

    if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
    setTimeout(function() {
      getStatus(res.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err)
  });
}

如果答案是肯定的,那么新行将被添加到DOM表中。刷新视图get_status以返回状态:

@csrf_exempt
def get_status(request, task_id):
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return JsonResponse(result, status=200)

导入AsyncResult

from celery.result import AsyncResult

更新容器:

$ docker-compose up -d --build

运行一个新任务:

$ curl -F type=1 http://localhost:1337/tasks/

然后task_id从响应中提取并调用更新get_status以查看状态:

$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/

{
    "task_id": "25278457-0957-4b0b-b1da-2600525f812f",
    "task_status": "SUCCESS",
    "task_result": true
}

您可以在浏览器中看到相同的信息:



原木芹菜


更新服务celerydocker-compose.yml这样一种方式,西芹中一个单独的文件注销:

celery:
  build: ./project
  command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
  volumes:
    - ./project:/usr/src/app
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

“项目”中添加一个新目录,并将其命名为“ logs”然后将文件添加到此新目录celery.log

更新:

$ docker-compose up -d --build

设置音量后,您应该看到如何在本地填充日志文件

[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
    /usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
    UserWarning: Using settings.DEBUG leads to a memory
    leak, never use this setting in production environments!
    warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
    Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
    Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
    succeeded in 10.027462100006233s: True

花仪表板


Flower是一个基于Web的轻量级工具,用于实时监控Celery。例如,您可以跟踪正在运行的任务,增加或减少工作人员池,显示图形和统计信息。

将其添加到requirements.txt

celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

然后将新服务添加到docker-compose.yml

dashboard:
  build: ./project
  command:  flower -A core --port=5555 --broker=redis://redis:6379/0
  ports:
    - 5555:5555
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis
    - celery

并测试:

$ docker-compose up -d --build

转到localhost:5555查看仪表板。您应该看到一个工作人员:



运行更多任务以测试仪表板:



尝试添加更多工作人员,看看它如何影响性能:

$ docker-compose up -d --build --scale celery=3

测验


让我们从最简单的测试开始:

def test_task():
    assert sample_tasks.create_task.run(1)
    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run(3)

将上面的测试用例添加到project/tests/test_tasks.py并添加以下导入:

from tasks import sample_tasks


运行此测试:

$ docker-compose exec web python -m pytest -k "test_task and not test_home"

该测试大约需要一分钟:

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================

值得注意的是,在断言的上面我们使用的方法.run,而不是.delay直接启动任务,而无需使用芹菜工人。
是否想使用模拟插件来加快速度?

@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
    assert sample_tasks.create_task.run(1)
    sample_tasks.create_task.run.assert_called_once_with(1)

    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run.call_count == 2

    assert sample_tasks.create_task.run(3)
    assert sample_tasks.create_task.run.call_count == 3

进口:

from unittest.mock import patch, call

测试:

$ docker-compose exec web python -m pytest -k "test_mock_task"

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

================================== 1 passed, 2 deselected in 1.13s ==================================

看到?现在更快了!

全面集成测试呢?

def test_task_status(client):
    response = client.post(reverse("run_task"), {"type": 0})
    content = json.loads(response.content)
    task_id = content["task_id"]
    assert response.status_code == 202
    assert task_id

    response = client.get(reverse("get_status", args=[task_id]))
    content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
    assert response.status_code == 200

    while content["task_status"] == "PENDING":
        response = client.get(reverse("get_status", args=[task_id]))
        content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}

请记住,此测试使用与开发中相同的代理和后端。您可以创建一个新的Celery实例进行测试:

app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)

添加导入:

import json

并确保测试成功。

结论


今天,我们介绍了Celery的基本配置,以在Django的应用程序中执行长期任务。您应将任何可能会减慢用户端代码速度的进程发送到处理队列。

Celery还可以用于执行重复性任务并分解复杂的资源密集型任务,以便在多台计算机上分配计算负载并减少执行时间和处理客户端请求的计算机上的负载。

您可以在此存储库中找到所有代码



上课

All Articles