وظائف غير متزامنة في جانغو مع الكرفس

تم إعداد ترجمة للمقال قبل بدء دورة Python Web Developer .



إذا كان التطبيق الخاص بك يحتوي على أي عملية طويلة ، فيمكنك معالجته ليس في تدفق الطلب / الاستجابة القياسي ، ولكن في الخلفية.

على سبيل المثال ، في التطبيق الخاص بك ، يجب على المستخدم إرسال صورة مصغرة (والتي ، على الأرجح ، ستحتاج إلى تعديل) وتأكيد عنوان البريد الإلكتروني. إذا قام تطبيقك بمعالجة الصورة ثم أرسل بريدًا إلكترونيًا للتأكيد في معالج الطلب ، فسيتعين على المستخدم النهائي الانتظار لسبب ما لإكمال كلتا المهمتين قبل إعادة تحميل الصفحة أو إغلاقها. بدلاً من ذلك ، يمكنك نقل هذه العمليات إلى قائمة انتظار المهام وتركها لعملية منفصلة للمعالجة لإرسال استجابة إلى المستخدم على الفور. في هذه الحالة ، سيتمكن المستخدم النهائي من القيام بأشياء أخرى من جانب العميل أثناء المعالجة في الخلفية. في هذه الحالة ، سيكون تطبيقك قادرًا أيضًا على الاستجابة بحرية للطلبات الواردة من المستخدمين والعملاء الآخرين.

سنتحدث اليوم عن عملية إعداد وتكوين Celery و Redis للتعامل مع العمليات طويلة الأمد في تطبيق Django لحل هذه المشاكل. سنستخدم أيضًا Docker و Docker Compose لربط جميع القطع معًا ومعرفة كيفية اختبار وظائف الكرفس مع اختبارات الوحدة والتكامل.

في نهاية هذا الدليل ، سنتعلم:

  • دمج الكرفس في Django لخلق وظائف الخلفية.
  • احزم Django و Celery و Redis مع Docker.
  • قم بتشغيل العمليات في الخلفية باستخدام سير عمل منفصل.
  • حفظ سجلات الكرفس في ملف.
  • قم بإعداد زهرة لرصد وإدارة وظائف والعمال الكرفس.
  • اختبار وظائف الكرفس مع اختبارات الوحدة والتكامل.

مهام فى الخلفيه


لتحسين تجربة المستخدم ، يجب تشغيل العمليات المطولة في الخلفية خارج تيار طلب / استجابة HTTP العادي.

على سبيل المثال:

  • إرسال رسائل للتأكيد ؛
  • تجديف الويب والزحف ؛
  • تحليل البيانات؛
  • معالجة الصورة؛
  • إنشاء تقرير.

عند إنشاء تطبيق ، حاول فصل المهام التي يجب القيام بها خلال عمر الطلب / الاستجابة ، على سبيل المثال ، عمليات CRUD ، من المهام التي يجب القيام بها في الخلفية.

عملية العمل


هدفنا هو تطوير تطبيق Django يستخدم الكرفس للتعامل مع العمليات المطولة خارج دورة الطلب / الاستجابة.

  1. ينشئ المستخدم النهائي مهمة جديدة عن طريق إرسال طلب POST إلى الخادم.
  2. في طريقة العرض هذه ، تتم إضافة المهمة إلى قائمة الانتظار ، ويتم إرسال معرف الوظيفة إلى العميل.
  3. باستخدام AJAX ، يستمر العميل في الاستعلام عن الخادم للتحقق من حالة المهمة ، بينما تعمل المهمة نفسها في الخلفية.



إنشاء المشروع


قم باستنساخ المشروع من مستودع django-celery وقم بالخروج على علامة v1 في الفرع الرئيسي :

$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master

نظرًا لأننا نحتاج بشكل عام إلى العمل مع ثلاث عمليات (Django ، Redis ، worker) ، فإننا نستخدم Docker لتبسيط العمل ، وربطهم حتى نتمكن من تشغيل كل شيء باستخدام أمر واحد في نافذة طرفية واحدة.

من جذر المشروع ، قم بإنشاء الصور وتشغيل حاويات Docker:

$ docker-compose up -d --build

عند اكتمال البناء ، انتقل إلى localhost : 1337:



تحقق من نجاح الاختبارات بنجاح:

$ docker-compose exec web python -m pytest

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item

tests/test_tasks.py .                                                                         [100%]

========================================= 1 passed in 0.47s =========================================

دعونا نلقي نظرة على هيكل المشروع قبل المضي قدما:

├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── pytest.ini
    ├── requirements.txt
    ├── static
    │   ├── bulma.min.css
    │   ├── jquery-3.4.1.min.js
    │   ├── main.css
    │   └── main.js
    ├── tasks
    │   ├── __init__.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── templates
    │   │   └── home.html
    │   └── views.py
    └── tests
        ├── __init__.py
        └── test_tasks.py

بدء العمل


معالج الحدث project/static/main.jsمشترك في نقرة زر. بالضغط على الخادم يرسل AJAX ما بعد الطلب مع نوع العمل المناسب: 1، 2أو 3.

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

على جانب الخادم ، تم تكوين طريقة عرض بالفعل لمعالجة الطلب في project/tasks/views.py:

def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        return JsonResponse({"task_type": task_type}, status=202)

والآن يبدأ المرح: نحن نربط الكرفس!

إعداد الكرفس


لنبدأ بإضافة Celery و Redis إلى الملف project/requirements.txt:

celery==4.4.1
Django==3.0.4
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

يستخدم الكرفس و سيط رسالة - RabbitMQ ، رديس، أو أوس بسيط خدمة انتظار (SQS) -to تبسيط التواصل بين العمال والكرفس تطبيق ويب. يتم إرسال الرسائل إلى الوسيط ، ثم تتم معالجتها بواسطة العامل. بعد ذلك ، يتم إرسال النتائج إلى الواجهة الخلفية.

سيكون Redis وسيطًا وخلفية. أضف Redis and Celery Worker إلى الملف docker-compose.ymlكما يلي:

version: '3.7'

services:
  web:
    build: ./project
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./project:/usr/src/app/
    ports:
      - 1337:8000
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    build: ./project
    command: celery worker --app=core --loglevel=info
    volumes:
      - ./project:/usr/src/app
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:5-alpine

انتبه إلى celery worker --app=core --loglevel=info:

  1. celery workerتستخدم لبدء عامل الكرفس .
  2. --app=coreتستخدم لإطلاق core تطبيق الكرفس (الذي سنعرفه قريبًا) ؛
  3. --loglevel=infoيحدد مستوى تسجيل المعلومات.

أضف ما يلي إلى وحدة إعدادات المشروع بحيث تستخدم كرفس Redis كوسيط وخلفية:

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")

ثم قم بإنشاء الملف sample_tasks.pyفي project/tasks: هنا ، باستخدام decor_task decorator ، قمنا بتعريف وظيفة مهمة كرفس جديدة تسمى . تذكر أن المهمة نفسها لن يتم تنفيذها من عملية Django ، بل سيتم تنفيذها بواسطة عامل الكرفس. أضف الملف الآن إلى :

# project/tasks/sample_tasks.py

import time

from celery import shared_task

@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True


create_task



celery.py"project/core"

import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

ماذا يجري هنا؟

  1. تحتاج أولاً إلى تعيين القيمة الافتراضية للبيئة DJANGO_SETTINGS_MODULEحتى يعرف Celery كيفية العثور على مشروع Django.
  2. ثم أنشأنا نسخة من الكرفس باسم coreووضعناها في متغير app.
  3. ثم قمنا بتحميل قيم تكوين الكرفس من كائن الإعدادات من django.conf. استخدمنا مساحة الاسم = "CELERY" لمنع التعارضات مع إعدادات Django الأخرى. لذلك ، يجب أن تبدأ جميع إعدادات التكوين لـ Celery ببادئة CELERY_.
  4. أخيرًا ، app.autodiscover_tasks()تخبر Celery أن تبحث عن وظائف من التطبيقات المحددة في settings.INSTALLED_APPS.

التغيير project/core/__init__.pyبحيث يتم استيراد تطبيق Celery تلقائيًا عند بدء Django:

from .celery import app as celery_app


__all__ = ("celery_app",)

بدء العمل


قم بتحديث العرض لبدء المهمة وإرسال المعرّف:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        task = create_task.delay(int(task_type))
        return JsonResponse({"task_id": task.id}, status=202)

لا تنس استيراد المهمة:

from tasks.sample_tasks import create_task

اجمع الصور وانشر حاويات جديدة:

$ docker-compose up -d --build

لبدء مهمة جديدة ، قم بما يلي:

$ curl -F type=0 http://localhost:1337/tasks/

سترى شيئا مثل هذا:

{
  "task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}

الحالة الوظيفية


العودة إلى معالج الأحداث من جانب العميل:

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

عندما تعود الاستجابة من طلب AJAX ، سنرسل getStatus()من معرف الوظيفة كل ثانية:

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}/`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      <tr>
        <td>${res.task_id}</td>
        <td>${res.task_status}</td>
        <td>${res.task_result}</td>
      </tr>`
    $('#tasks').prepend(html);

    const taskStatus = res.task_status;

    if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
    setTimeout(function() {
      getStatus(res.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err)
  });
}

إذا كانت الإجابة بنعم ، فسيتم إضافة صف جديد إلى جدول DOM. قم بتجديد العرض get_statusلإرجاع الحالة:

@csrf_exempt
def get_status(request, task_id):
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return JsonResponse(result, status=200)

استيراد AsyncResult :

from celery.result import AsyncResult

حاويات التحديث:

$ docker-compose up -d --build

شغّل مهمة جديدة:

$ curl -F type=1 http://localhost:1337/tasks/

ثم استخرج task_idمن الرد واتصل بالمكالمة get_statusلمعرفة الحالة:

$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/

{
    "task_id": "25278457-0957-4b0b-b1da-2600525f812f",
    "task_status": "SUCCESS",
    "task_result": true
}

يمكنك مشاهدة نفس المعلومات في المتصفح:



سجلات الكرفس


تحديث الخدمة celeryفي docker-compose.ymlحال من الأحوال من النوع الذي الكرفس بتسجيل الخروج في ملف منفصل:

celery:
  build: ./project
  command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
  volumes:
    - ./project:/usr/src/app
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

أضف دليلًا جديدًا إلى "مشروع" واسمه "سجلات" . ثم أضف إلى هذا الدليل الجديد وضع الملف celery.log.

تحديث:

$ docker-compose up -d --build

يجب أن ترى كيف يتم تعبئة ملف السجل محليًا بعد تعيين حجم الصوت :

[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
    /usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
    UserWarning: Using settings.DEBUG leads to a memory
    leak, never use this setting in production environments!
    warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
    Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
    Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
    succeeded in 10.027462100006233s: True

لوحة زهرة


زهرة هي أداة خفيفة الوزن على شبكة الإنترنت لرصد الكرفس في الوقت الحقيقي. يمكنك تتبع مهام التشغيل ، وزيادة أو تقليل مجموعة العمال ، وعرض الرسوم البيانية والإحصاءات ، على سبيل المثال.

أضفه إلى requirements.txt:

celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

ثم أضف الخدمة الجديدة إلى docker-compose.yml:

dashboard:
  build: ./project
  command:  flower -A core --port=5555 --broker=redis://redis:6379/0
  ports:
    - 5555:5555
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis
    - celery

واختبر:

$ docker-compose up -d --build

انتقل إلى localhost : 5555 لعرض لوحة القيادة. من المفترض أن ترى عاملاً واحدًا: قم



بتشغيل بعض المهام الإضافية لاختبار لوحة التحكم:



حاول إضافة المزيد من العمال وانظر كيف يؤثر ذلك على الأداء:

$ docker-compose up -d --build --scale celery=3

الاختبارات


لنبدأ بأبسط اختبار:

def test_task():
    assert sample_tasks.create_task.run(1)
    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run(3)

أضف حالة الاختبار أعلاه project/tests/test_tasks.pyوأضف الاستيراد التالي:

from tasks import sample_tasks


قم بإجراء هذا الاختبار:

$ docker-compose exec web python -m pytest -k "test_task and not test_home"

سيستغرق هذا الاختبار حوالي دقيقة:

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================

تجدر الإشارة إلى أنه في التأكيد أعلاه استخدمنا الطريقة .runبدلاً من ذلك .delayلإطلاق المهام مباشرة ، دون استخدام عامل الكرفس.
هل تريد استخدام الإضافات الوهمية لتسريع الأمور؟

@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
    assert sample_tasks.create_task.run(1)
    sample_tasks.create_task.run.assert_called_once_with(1)

    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run.call_count == 2

    assert sample_tasks.create_task.run(3)
    assert sample_tasks.create_task.run.call_count == 3

استيراد:

from unittest.mock import patch, call

اختبار:

$ docker-compose exec web python -m pytest -k "test_mock_task"

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

================================== 1 passed, 2 deselected in 1.13s ==================================

نرى؟ الآن أسرع بكثير!

ماذا عن اختبار التكامل الكامل؟

def test_task_status(client):
    response = client.post(reverse("run_task"), {"type": 0})
    content = json.loads(response.content)
    task_id = content["task_id"]
    assert response.status_code == 202
    assert task_id

    response = client.get(reverse("get_status", args=[task_id]))
    content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
    assert response.status_code == 200

    while content["task_status"] == "PENDING":
        response = client.get(reverse("get_status", args=[task_id]))
        content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}

تذكر أن هذا الاختبار يستخدم نفس الوسيط والخلفية كما في التطوير. يمكنك إنشاء نسخة جديدة من الكرفس للاختبار:

app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)

إضافة استيراد:

import json

وتأكد من نجاح الاختبارات.

استنتاج


قدمنا ​​اليوم التكوين الأساسي لـ Celery لأداء مهام طويلة الأمد في تطبيق على Django. يجب عليك إرسال أي عمليات إلى قائمة انتظار المعالجة التي قد تبطئ الشفرة من جانب المستخدم.

يمكن أيضًا استخدام الكرفس لأداء المهام المتكررة وتفكيك المهام المعقدة كثيفة الموارد من أجل توزيع الحمل الحسابي على عدة أجهزة وتقليل وقت التنفيذ والحمل على الجهاز الذي يعالج طلبات العميل.

يمكنك العثور على جميع التعليمات البرمجية في هذا المستودع .



احصل على الدورة

All Articles