باستخدام RabbitMQ مع MonsterMQ الجزء 3

في مقالنا السابق ، أنشأنا قائمة انتظار المهام. يفترض أنه يتم تسليم مهمة واحدة في شكل رسالة إلى مستلم واحد. في هذه المقالة سنقوم بشيء آخر ، وسوف نرسل رسالة واحدة إلى عدة مستلمين في وقت واحد. سننشئ نظام تسجيل يتألف من برنامجين: الأول يرسل الرسائل ، والثاني يستقبلها ويعرضها. في نظامنا ، سيتلقى جميع المستلمين قيد التشغيل رسالة مرسلة من المرسل.

التبادلات (المبادلات)


في المقالات السابقة ، أرسلنا واستلمنا رسائل من قوائم الانتظار ، والآن حان الوقت لتقديم نموذج كامل لإعادة توجيه الرسائل في RabbitMQ.

دعنا نتناول بسرعة ما تناولناه في المقالات السابقة:

  • Producer - تطبيق يرسل الرسائل
  • قائمة انتظار - مخزن مؤقت يقوم بتخزين الرسائل
  • المستهلك - تطبيق يتلقى الرسائل ويعالجها

الفكرة الرئيسية لـ RabbitMQ (أو بالأحرى AMQP) هي أن المرسل (المنتج) لا يرسل رسالة مباشرة إلى قائمة الانتظار. علاوة على ذلك ، لا يعرف حتى ما إذا كان في قائمة الانتظار. بدلاً من ذلك ، يرسل المرسل رسائل إلى المبادلات (Exchange). المبادل هو شيء بسيط للغاية ، يقوم بأمرين: يتلقى رسالة من المرسلين ويعيد توجيههم إلى قائمة الانتظار. المبادلات من أنواع مختلفة: البعض يرسل الرسائل إلى قائمة انتظار معينة (النوع المباشر) ، والبعض الآخر يرسل نفس الرسالة إلى عدة قوائم انتظار في وقت واحد (نوع fanout) ، والبعض الآخر يعيد توجيه الرسائل إلى قائمة الانتظار بناءً على قواعد إعادة توجيه محددة ومحددة (نوع الموضوع).

صورة
(الصورة مأخوذة من موقع RabbitMQ الرسمي )

دعونا نلقي نظرة على مبادل من نوع fanout. لإنشائه ، اكتب الرمز التالي:

$producer->newFanoutExchange('logs');

يعمل مبادل Fanout وفقًا لمخطط بسيط للغاية ، فهو ببساطة يرسل الرسالة المستلمة إلى جميع قوائم الانتظار المرفقة به.

لسرد جميع المبادلات الموجودة على الخادم ، اتصل ب rabbitmqctl

sudo rabbitmqctl list_exchanges

ستحتوي هذه القائمة على amq. * مبادلات ومبدل قياسي بدون اسم (سطر فارغ). لا تنتبه لهم نحن لسنا بحاجة لهم بعد.

في المقالة الأخيرة ، لم نكن نعرف شيئًا عن المبادلات ، ولكن مع ذلك كنا قادرين على إرسال الرسائل. حدث هذا لأنه إذا لم تحدد في MonsterMQ بشكل صريح المبادل كوسيطة ثالثة لأسلوب Producer :: Publish () ، فستستخدم المكتبة المبادل القياسي بدون اسم RabbitMQ ، الذي يرسل الرسائل في قوائم الانتظار التي يتم تمرير أسمائها كوسيطة ثانية إلى الأسلوب Producer :: Publish () .

الآن يمكننا إرسال رسائل إلى مبادلنا الجديد عن طريق تحديد اسمه كوسيطة ثالثة لطريقة Producer :: Publish () :

$producer->publish($message, '', 'logs');

الآن دعنا ننشئ عاملين بقسمين في طابور سيكونون موجودين طالما أن العمال الذين أعلنوا عنهم نشطون. يمكنك القيام بذلك على النحو التالي:

//Worker 1
$producer->queue('queue-1')->setExclusive()->declare();

//Worker 2
$producer->queue('queue-2')->setExclusive()->declare();

الآن عندما ينتهي العامل الذي أعلن عن قائمة الانتظار من الجلسة ويفصل ، سيتم حذفه تلقائيًا. إذا أكمل العاملان عملهما ، فسيتم حذف كل من قوائم الانتظار.

نربط قائمة الانتظار مع المبادل


لقد أنشأنا بالفعل بورصة وقائمة انتظار. الآن علينا فقط إخبار المبادل بإرسال الرسائل المستلمة إلى قوائم الانتظار لدينا. نحن بحاجة إلى ربط المبادل والصفوف. للقيام بذلك ، اكتب الرمز التالي:

//Worker 1
$producer->queue('queue-1')->bind('logs');

//Worker 2
$producer->queue('queue-2')->bind('logs');

من الآن فصاعدا ، فإن مبادل السجلات لدينا متصل بقوائم الانتظار الخاصة بنا وسيعيد توجيه الرسائل المستلمة إليهم.

يمكنك سرد جميع الروابط الموجودة بالأمر التالي على لينكس

rabbitmqctl list_bindings

وضع الكود معًا


لم يتغير نص المرسل كثيرًا مقارنة بالدرس السابق. الفرق الرئيسي هو أنه الآن يرسل الرسائل إلى المبادل المعلن عنه مسبقًا ، وليس إلى القياسي (يمكن الوصول إليه من الصندوق). هنا هو رمز send.php

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newFanoutExchange('logs');

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, '', 'logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

ومن الجدير بالذكر أنه من خلال إرسال رسالة إلى مبادل غير مرتبط بأي قائمة انتظار ، ستفقد الرسالة. ولكن الآن يناسبنا ، لأننا لم نطلق متلقينا بعد.

هذا هو الرمز الخاص بعاملنا العامل الأول

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();
   $producer->queue('queue-1')->bind('logs');

   $consumer->consume('queue-1');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

هذا هو الرمز الخاص بعامل worker-2.php الثاني

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();
   $producer->queue('queue-2')->bind('logs');

   $consumer->consume('queue-2');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

إذا كنت تريد حفظ الرسائل المرسلة إلى العامل الأول في ملف سجل ، فقط اتصل بالأمر التالي في وحدة التحكم:

php worker-1.php > logs_from_rabbit.log

لبدء كل من العاملين وعرض الرسائل في النوافذ الطرفية ، اتصل بالأوامر التالية ، كل منها في نافذته الخاصة:

php worker-1.php

php worker-2.php

ولإرسال الرسائل ، اتصل بالأمر التالي:

php send.php

في الدرس التالي ، سنلقي نظرة على كيفية استقبال مجموعة فرعية معينة من الرسائل من جميع الرسائل المرسلة.

All Articles