باستخدام RabbitMQ مع MonsterMQ الجزء 2

صورة
(الصورة مأخوذة من موقع RabbitMQ الرسمي )

في المقال الأول ، كتبنا برنامجين PHP يستخدمون RabbitMQ: أحدهما أرسل رسائل ، والثاني تم استلامه. في هذه المقالة ، سننظر في كيفية إنشاء قائمة انتظار توزع المهام التي تستهلك قدرًا كبيرًا من الوقت بين العديد من العاملين (معالجات الرسائل).

الفكرة الأساسية ليست إكمال المهام التي تستهلك وقتًا كبيرًا في وقت واحد ، ولكن إضافتها إلى قائمة الانتظار كرسالة. في وقت لاحق ، يتلقى العامل العامل رسالة من قائمة الانتظار ، يستهلكها وينفذ المهمة كما لو كان في الخلفية.
هذا المفهوم وثيق الصلة بتطبيقات الويب ، حيث قد تحتاج ، على سبيل المثال ، إلى تنفيذ بعض المهام التي تستغرق وقتًا طويلاً ، والتي لا تكون نتيجة ذلك مطلوبة على الفور ، مثل المهام مثل إرسال بريد إلكتروني أو تقديم طلب HTTP إلى تطبيق تابع لجهة خارجية (على سبيل المثال ، عبر libcurl إلى بي أتش بي).

طبخ


في هذا الجزء ، سوف نرسل رسائل تمثل المهام للعاملين. نظرًا لعدم وجود مهام حقيقية ، مثل معالجة الصور أو إنشاء ملفات pdf ، فسنتظاهر بأننا مشغولون باستخدام وظيفة sleep () . سنأخذ عدد النقاط في الرسالة على أنها تعقيد المهمة. ستعني كل نقطة ثانية من العمل ، على سبيل المثال ، ستستغرق الرسالة Hello ... ثلاث ثوانٍ من العمل.

سنقوم بتعديل نص Send.php قليلاً من الدرس الأخير حتى يتمكن من إرسال رسائل عشوائية.

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->queue('test-queue')->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

يحتاج البرنامج النصي Rece.php أيضًا إلى التغيير. يجب أن تحاكي العملية لمدة ثانية واحدة لكل نقطة في الرسالة المستلمة. دعنا نعيد تسمية الملف إلى worker.php ونكتب الكود التالي فيه:

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('test-queue')->declare();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

بعد ذلك ، قم بتشغيل كلا النصين في محطات طرفية مختلفة:

# Shell 1
php worker.php 

# Shell 2
php send.php "A very hard task which takes two seconds.."

تتمثل إحدى مزايا استخدام قوائم انتظار المهام في القدرة على توزيع العمل بين العديد من العمال. دعونا نحاول تشغيل نصين worker.php مرة واحدة في محطتين مختلفتين. وفي الثالث سنرسل رسائل مع البرنامج النصي send.php

# Shell 1
php worker.php 

# Shell 2
php worker.php 

# Shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

بعد ذلك ، دعنا نرى ما جلبه عمالنا:
# shell 1
php worker.php
# Received: First message.
# Done
# Received: Third message...
# Done
# Received: Fifth message.....
# Done

# shell 2
php worker.php
# Received: Second message..
# Done
# Received: Fourth message....
# Done

بشكل افتراضي ، سيرسل RabbitMQ كل رسالة لاحقة إلى المستلم المستهلك التالي. في المتوسط ​​، سيتلقى كل مستلم نفس عدد الرسائل ، وتسمى هذه الطريقة لتوزيع الرسائل Round-robin (في دائرة). جرب هذا مع ثلاثة عمال أو أكثر.

تأكيد الرسالة


يمكن أن تستغرق المهام قدرًا معينًا من الوقت. ربما تكون مهتمًا بما سيحدث للرسالة إذا أنهيت عمل المستلم الذي لم يتمكن من معالجة هذه الرسالة حتى النهاية. باستخدام الكود الحالي ، سيتم إرجاع الرسالة إلى قائمة الانتظار ، حيث يتم تمكين التعرف على الرسالة افتراضيًا في MonsterMQ.

عندما نستخدم تأكيد الرسالة ، فإننا نخبر RabbitMQ أن الرسالة قد تمت معالجتها ولديه الحق في إزالتها من قائمة الانتظار. إذا أكمل المستلم عمله دون إرسال تأكيد (على سبيل المثال ، نتيجة إنهاء غير متوقع لاتصال TCP) ، سيتفهم RabbitMQ أن الرسالة لم تتم معالجتها وسيعيدها إلى قائمة الانتظار ، في محاولة لتسليمها إلى العمال الآخرين المتوفرين. وبالتالي ، يمكنك التأكد من أن الرسائل لن تضيع ، حتى في حالة إيقاف التشغيل غير المتوقع لأي من العمال.

لتعطيل إقرار الرسالة في MonsterMQ ، يمكنك تمرير true كوسيطة ثانية تستهلك ()

$consumer->consume('test-queue', true);

يعد نسيان تأكيد الرسائل المستلمة خطأ شائعًا إلى حد ما ويمكن تحمله بسهولة ، وقد يؤدي إلى عواقب وخيمة. إذا حدث ذلك ، فسيتم تسليم الرسائل مرارًا وتكرارًا وستتراكم أيضًا في قوائم الانتظار التي تستهلك المزيد والمزيد من الذاكرة. لتصحيح هذا الخطأ، واستخدام rabbitmqctl لعرض messages_unacknowledged الحقل

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

على النوافذ ، أسقط sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

أمان قائمة الانتظار


على الرغم من حقيقة أن رسائلنا لن تضيع إذا أنهى أي من العمال عملهم بشكل غير متوقع ، ومع ذلك ، لا يزال بإمكاننا فقدان قوائم الانتظار التي تم إنشاؤها إذا تم إغلاق RabbitMQ.

لحماية نفسك من فقدان قوائم الانتظار ، تحتاج إلى التصريح بأن قائمة الانتظار دائمة (مستمرة ، دائمة). نظرًا لأن قوائم الانتظار غير صالحة ، بمعنى أنه لا يمكننا تغييرها أو إعادة إنشائها عن طريق استدعاء طريقة الإعلان بالاسم نفسه ، سيتعين علينا الإعلان عن قائمة انتظار جديدة. دعونا نفعل ذلك على النحو التالي

$consumer->queue('new-queue')->setDurable()->declare();

تذكر تغيير رمز إعلان قائمة الانتظار في رمز المرسل أيضًا.

التوزيع العادل للرسائل


ربما لاحظت أن الرسائل بين عاملينا لا تزال غير موزعة بنزاهة. على سبيل المثال ، إذا كانت كل رسالة زوجية تستغرق وقتًا طويلاً ، وتتم معالجة كل رسالة فردية بسرعة ، فإن عاملًا واحدًا يتلقى رسائل مستهلكة للوقت سيكون دائمًا مشغولًا ، بينما تكون الرسالة الثانية خاملة. لتجنب ذلك ، يمكنك استخدام جودة الخدمة . دعنا نضيف السطر التالي إلى الكود الخاص بنا

$consumer->qos()->prefetchCount(1)->perConsumer()->apply();

يطلب هذا الخط من RabbitMQ عدم إرسال رسائل إلى المستلم حتى يقوم بمعالجة الرسالة الحالية والتعرف عليها. يطبق perConsumer () جودة الخدمة على جميع قنوات المستلم ، استخدم طريقة perChannel () إذا كنت تريد تطبيق جودة الخدمة على القناة الحالية فقط.

صورة
(الصورة مأخوذة من موقع RabbitMQ الرسمي )

وضع الشفرة بأكملها معًا


سيبدو هذا مثل send.php (المرسل)

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

وهكذا العامل المتلقي. php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $consumer->qos()->prefetchCount(1)->perConsumer()->apply();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

هذا كل شيء ، في الدرس التالي سنتعلم كيفية إرسال رسائل إلى العديد من المستلمين من نفس قائمة الانتظار.

All Articles