تم إعداد ترجمة للمقال قبل بدء دورة Python Web Developer .
إذا كان التطبيق الخاص بك يحتوي على أي عملية طويلة ، فيمكنك معالجته ليس في تدفق الطلب / الاستجابة القياسي ، ولكن في الخلفية.على سبيل المثال ، في التطبيق الخاص بك ، يجب على المستخدم إرسال صورة مصغرة (والتي ، على الأرجح ، ستحتاج إلى تعديل) وتأكيد عنوان البريد الإلكتروني. إذا قام تطبيقك بمعالجة الصورة ثم أرسل بريدًا إلكترونيًا للتأكيد في معالج الطلب ، فسيتعين على المستخدم النهائي الانتظار لسبب ما لإكمال كلتا المهمتين قبل إعادة تحميل الصفحة أو إغلاقها. بدلاً من ذلك ، يمكنك نقل هذه العمليات إلى قائمة انتظار المهام وتركها لعملية منفصلة للمعالجة لإرسال استجابة إلى المستخدم على الفور. في هذه الحالة ، سيتمكن المستخدم النهائي من القيام بأشياء أخرى من جانب العميل أثناء المعالجة في الخلفية. في هذه الحالة ، سيكون تطبيقك قادرًا أيضًا على الاستجابة بحرية للطلبات الواردة من المستخدمين والعملاء الآخرين.سنتحدث اليوم عن عملية إعداد وتكوين Celery و Redis للتعامل مع العمليات طويلة الأمد في تطبيق Django لحل هذه المشاكل. سنستخدم أيضًا Docker و Docker Compose لربط جميع القطع معًا ومعرفة كيفية اختبار وظائف الكرفس مع اختبارات الوحدة والتكامل.في نهاية هذا الدليل ، سنتعلم:- دمج الكرفس في Django لخلق وظائف الخلفية.
- احزم Django و Celery و Redis مع Docker.
- قم بتشغيل العمليات في الخلفية باستخدام سير عمل منفصل.
- حفظ سجلات الكرفس في ملف.
- قم بإعداد زهرة لرصد وإدارة وظائف والعمال الكرفس.
- اختبار وظائف الكرفس مع اختبارات الوحدة والتكامل.
مهام فى الخلفيه
لتحسين تجربة المستخدم ، يجب تشغيل العمليات المطولة في الخلفية خارج تيار طلب / استجابة HTTP العادي.على سبيل المثال:- إرسال رسائل للتأكيد ؛
- تجديف الويب والزحف ؛
- تحليل البيانات؛
- معالجة الصورة؛
- إنشاء تقرير.
عند إنشاء تطبيق ، حاول فصل المهام التي يجب القيام بها خلال عمر الطلب / الاستجابة ، على سبيل المثال ، عمليات CRUD ، من المهام التي يجب القيام بها في الخلفية.عملية العمل
هدفنا هو تطوير تطبيق Django يستخدم الكرفس للتعامل مع العمليات المطولة خارج دورة الطلب / الاستجابة.- ينشئ المستخدم النهائي مهمة جديدة عن طريق إرسال طلب POST إلى الخادم.
- في طريقة العرض هذه ، تتم إضافة المهمة إلى قائمة الانتظار ، ويتم إرسال معرف الوظيفة إلى العميل.
- باستخدام AJAX ، يستمر العميل في الاستعلام عن الخادم للتحقق من حالة المهمة ، بينما تعمل المهمة نفسها في الخلفية.

إنشاء المشروع
قم باستنساخ المشروع من مستودع django-celery وقم بالخروج على علامة v1 في الفرع الرئيسي :$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master
نظرًا لأننا نحتاج بشكل عام إلى العمل مع ثلاث عمليات (Django ، Redis ، worker) ، فإننا نستخدم Docker لتبسيط العمل ، وربطهم حتى نتمكن من تشغيل كل شيء باستخدام أمر واحد في نافذة طرفية واحدة.من جذر المشروع ، قم بإنشاء الصور وتشغيل حاويات Docker:$ docker-compose up -d --build
عند اكتمال البناء ، انتقل إلى localhost : 1337:
تحقق من نجاح الاختبارات بنجاح:$ docker-compose exec web python -m pytest
======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item
tests/test_tasks.py . [100%]
========================================= 1 passed in 0.47s =========================================
دعونا نلقي نظرة على هيكل المشروع قبل المضي قدما:├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
├── Dockerfile
├── core
│ ├── __init__.py
│ ├── asgi.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── entrypoint.sh
├── manage.py
├── pytest.ini
├── requirements.txt
├── static
│ ├── bulma.min.css
│ ├── jquery-3.4.1.min.js
│ ├── main.css
│ └── main.js
├── tasks
│ ├── __init__.py
│ ├── apps.py
│ ├── migrations
│ │ └── __init__.py
│ ├── templates
│ │ └── home.html
│ └── views.py
└── tests
├── __init__.py
└── test_tasks.py
بدء العمل
معالج الحدث project/static/main.js
مشترك في نقرة زر. بالضغط على الخادم يرسل AJAX ما بعد الطلب مع نوع العمل المناسب: 1
، 2
أو 3
.$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') },
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
على جانب الخادم ، تم تكوين طريقة عرض بالفعل لمعالجة الطلب في project/tasks/views.py
:def run_task(request):
if request.POST:
task_type = request.POST.get("type")
return JsonResponse({"task_type": task_type}, status=202)
والآن يبدأ المرح: نحن نربط الكرفس!إعداد الكرفس
لنبدأ بإضافة Celery و Redis إلى الملف project/requirements.txt
:celery==4.4.1
Django==3.0.4
redis==3.4.1
pytest==5.4.1
pytest-django==3.8.0
يستخدم الكرفس و سيط رسالة - RabbitMQ ، رديس، أو أوس بسيط خدمة انتظار (SQS) -to تبسيط التواصل بين العمال والكرفس تطبيق ويب. يتم إرسال الرسائل إلى الوسيط ، ثم تتم معالجتها بواسطة العامل. بعد ذلك ، يتم إرسال النتائج إلى الواجهة الخلفية.سيكون Redis وسيطًا وخلفية. أضف Redis and Celery Worker إلى الملف docker-compose.yml
كما يلي:version: '3.7'
services:
web:
build: ./project
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ./project:/usr/src/app/
ports:
- 1337:8000
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- redis
celery:
build: ./project
command: celery worker --app=core --loglevel=info
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
redis:
image: redis:5-alpine
انتبه إلى celery worker --app=core --loglevel=info
:celery worker
تستخدم لبدء عامل الكرفس .--app=core
تستخدم لإطلاق core
تطبيق الكرفس (الذي سنعرفه قريبًا) ؛--loglevel=info
يحدد مستوى تسجيل المعلومات.
أضف ما يلي إلى وحدة إعدادات المشروع بحيث تستخدم كرفس Redis كوسيط وخلفية:CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
ثم قم بإنشاء الملف sample_tasks.py
في project/tasks
: هنا ، باستخدام decor_task decorator ، قمنا بتعريف وظيفة مهمة كرفس جديدة تسمى . تذكر أن المهمة نفسها لن يتم تنفيذها من عملية Django ، بل سيتم تنفيذها بواسطة عامل الكرفس. أضف الملف الآن إلى :# project/tasks/sample_tasks.py
import time
from celery import shared_task
@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True
create_task
celery.py
"project/core"
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
ماذا يجري هنا؟- تحتاج أولاً إلى تعيين القيمة الافتراضية للبيئة
DJANGO_SETTINGS_MODULE
حتى يعرف Celery كيفية العثور على مشروع Django. - ثم أنشأنا نسخة من الكرفس باسم
core
ووضعناها في متغير app
. - ثم قمنا بتحميل قيم تكوين الكرفس من كائن الإعدادات من
django.conf
. استخدمنا مساحة الاسم = "CELERY" لمنع التعارضات مع إعدادات Django الأخرى. لذلك ، يجب أن تبدأ جميع إعدادات التكوين لـ Celery ببادئة CELERY_
. - أخيرًا ،
app.autodiscover_tasks()
تخبر Celery أن تبحث عن وظائف من التطبيقات المحددة في settings.INSTALLED_APPS
.
التغيير project/core/__init__.py
بحيث يتم استيراد تطبيق Celery تلقائيًا عند بدء Django:from .celery import app as celery_app
__all__ = ("celery_app",)
بدء العمل
قم بتحديث العرض لبدء المهمة وإرسال المعرّف:@csrf_exempt
def run_task(request):
if request.POST:
task_type = request.POST.get("type")
task = create_task.delay(int(task_type))
return JsonResponse({"task_id": task.id}, status=202)
لا تنس استيراد المهمة:from tasks.sample_tasks import create_task
اجمع الصور وانشر حاويات جديدة:$ docker-compose up -d --build
لبدء مهمة جديدة ، قم بما يلي:$ curl -F type=0 http://localhost:1337/tasks/
سترى شيئا مثل هذا:{
"task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}
الحالة الوظيفية
العودة إلى معالج الأحداث من جانب العميل:$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') },
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
عندما تعود الاستجابة من طلب AJAX ، سنرسل getStatus()
من معرف الوظيفة كل ثانية:function getStatus(taskID) {
$.ajax({
url: `/tasks/${taskID}/`,
method: 'GET'
})
.done((res) => {
const html = `
<tr>
<td>${res.task_id}</td>
<td>${res.task_status}</td>
<td>${res.task_result}</td>
</tr>`
$('#tasks').prepend(html);
const taskStatus = res.task_status;
if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
setTimeout(function() {
getStatus(res.task_id);
}, 1000);
})
.fail((err) => {
console.log(err)
});
}
إذا كانت الإجابة بنعم ، فسيتم إضافة صف جديد إلى جدول DOM. قم بتجديد العرض get_status
لإرجاع الحالة:@csrf_exempt
def get_status(request, task_id):
task_result = AsyncResult(task_id)
result = {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result
}
return JsonResponse(result, status=200)
استيراد AsyncResult :from celery.result import AsyncResult
حاويات التحديث:$ docker-compose up -d --build
شغّل مهمة جديدة:$ curl -F type=1 http://localhost:1337/tasks/
ثم استخرج task_id
من الرد واتصل بالمكالمة get_status
لمعرفة الحالة:$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/
{
"task_id": "25278457-0957-4b0b-b1da-2600525f812f",
"task_status": "SUCCESS",
"task_result": true
}
يمكنك مشاهدة نفس المعلومات في المتصفح:
سجلات الكرفس
تحديث الخدمة celery
في docker-compose.yml
حال من الأحوال من النوع الذي الكرفس بتسجيل الخروج في ملف منفصل:celery:
build: ./project
command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
أضف دليلًا جديدًا إلى "مشروع" واسمه "سجلات" . ثم أضف إلى هذا الدليل الجديد وضع الملف celery.log
.تحديث:$ docker-compose up -d --build
يجب أن ترى كيف يتم تعبئة ملف السجل محليًا بعد تعيين حجم الصوت :[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
/usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
UserWarning: Using settings.DEBUG leads to a memory
leak, never use this setting in production environments!
warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
succeeded in 10.027462100006233s: True
لوحة زهرة
زهرة هي أداة خفيفة الوزن على شبكة الإنترنت لرصد الكرفس في الوقت الحقيقي. يمكنك تتبع مهام التشغيل ، وزيادة أو تقليل مجموعة العمال ، وعرض الرسوم البيانية والإحصاءات ، على سبيل المثال.أضفه إلى requirements.txt
:celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1
pytest==5.4.1
pytest-django==3.8.0
ثم أضف الخدمة الجديدة إلى docker-compose.yml
:dashboard:
build: ./project
command: flower -A core --port=5555 --broker=redis://redis:6379/0
ports:
- 5555:5555
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
- celery
واختبر:$ docker-compose up -d --build
انتقل إلى localhost : 5555 لعرض لوحة القيادة. من المفترض أن ترى عاملاً واحدًا: قم
بتشغيل بعض المهام الإضافية لاختبار لوحة التحكم:
حاول إضافة المزيد من العمال وانظر كيف يؤثر ذلك على الأداء:$ docker-compose up -d --build --scale celery=3
الاختبارات
لنبدأ بأبسط اختبار:def test_task():
assert sample_tasks.create_task.run(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run(3)
أضف حالة الاختبار أعلاه project/tests/test_tasks.py
وأضف الاستيراد التالي:from tasks import sample_tasks
قم بإجراء هذا الاختبار:$ docker-compose exec web python -m pytest -k "test_task and not test_home"
سيستغرق هذا الاختبار حوالي دقيقة:======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected
tests/test_tasks.py . [100%]
============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================
تجدر الإشارة إلى أنه في التأكيد أعلاه استخدمنا الطريقة .run
بدلاً من ذلك .delay
لإطلاق المهام مباشرة ، دون استخدام عامل الكرفس.هل تريد استخدام الإضافات الوهمية لتسريع الأمور؟@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
assert sample_tasks.create_task.run(1)
sample_tasks.create_task.run.assert_called_once_with(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run.call_count == 2
assert sample_tasks.create_task.run(3)
assert sample_tasks.create_task.run.call_count == 3
استيراد:from unittest.mock import patch, call
اختبار:$ docker-compose exec web python -m pytest -k "test_mock_task"
======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected
tests/test_tasks.py . [100%]
================================== 1 passed, 2 deselected in 1.13s ==================================
نرى؟ الآن أسرع بكثير!ماذا عن اختبار التكامل الكامل؟def test_task_status(client):
response = client.post(reverse("run_task"), {"type": 0})
content = json.loads(response.content)
task_id = content["task_id"]
assert response.status_code == 202
assert task_id
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
assert response.status_code == 200
while content["task_status"] == "PENDING":
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}
تذكر أن هذا الاختبار يستخدم نفس الوسيط والخلفية كما في التطوير. يمكنك إنشاء نسخة جديدة من الكرفس للاختبار:app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)
إضافة استيراد:import json
وتأكد من نجاح الاختبارات.استنتاج
قدمنا اليوم التكوين الأساسي لـ Celery لأداء مهام طويلة الأمد في تطبيق على Django. يجب عليك إرسال أي عمليات إلى قائمة انتظار المعالجة التي قد تبطئ الشفرة من جانب المستخدم.يمكن أيضًا استخدام الكرفس لأداء المهام المتكررة وتفكيك المهام المعقدة كثيفة الموارد من أجل توزيع الحمل الحسابي على عدة أجهزة وتقليل وقت التنفيذ والحمل على الجهاز الذي يعالج طلبات العميل.يمكنك العثور على جميع التعليمات البرمجية في هذا المستودع .
→ احصل على الدورة