Utilisation de RabbitMQ avec MonsterMQ Partie 3

Dans notre article précédent, nous avons créé une file d'attente de tâches. Il suppose qu'une tâche sous la forme d'un message est remise à un destinataire. Dans cet article, nous ferons autre chose, nous enverrons un message à plusieurs destinataires à la fois. Nous allons créer un système d'enregistrement qui se composera de deux programmes: le premier enverra des messages, et le second les recevra et les affichera. Dans notre système, tous les destinataires en cours d'exécution recevront un message envoyé par l'expéditeur.

Échanges (échangeurs)


Dans les articles précédents, nous avons envoyé et reçu des messages de files d'attente, il est maintenant temps d'introduire un modèle complet de transfert de messages dans RabbitMQ.

Passons rapidement en revue ce que nous avons couvert dans les articles précédents:

  • Producteur - une application qui envoie des messages
  • File d'attente - un tampon qui stocke les messages
  • Consumer - une application qui reçoit et traite les messages

L'idée clé de RabbitMQ (ou plutôt AMQP) est que l'expéditeur (producteur) n'envoie jamais de message directement à la file d'attente. De plus, il ne sait même pas s'il est dans la file d'attente. Au lieu de cela, l'expéditeur envoie des messages aux échangeurs (Exchange). L'échangeur est une chose très simple, il fait deux choses: il reçoit un message des expéditeurs et les redirige vers la file d'attente. Les échangeurs sont de différents types: certains envoient des messages à une file d'attente spécifique (type direct), d'autres envoient le même message à plusieurs files d'attente à la fois (type fanout), d'autres redirigent les messages vers la file d'attente en fonction de règles de redirection spécifiques et spécifiées (type de rubrique).

image
(image tirée du site officiel de RabbitMQ )

Regardons un échangeur de type fanout. Pour le créer, écrivez le code suivant:

$producer->newFanoutExchange('logs');

L'échangeur Fanout fonctionne selon un schéma très simple, il envoie simplement le message reçu à toutes les files d'attente qui lui sont attachées.

Pour répertorier tous les échangeurs existants sur le serveur, appelez rabbitmqctl

sudo rabbitmqctl list_exchanges

Cette liste contiendra amq. * Échangeurs et un échangeur standard sans nom (ligne vide). Ne faites pas attention à eux, nous n'en avons pas encore besoin.

Dans le dernier article, nous ne savions rien des échangeurs, mais nous avons néanmoins pu envoyer des messages. Cela s'est produit car si dans MonsterMQ vous ne spécifiez pas explicitement l'échangeur comme troisième argument de la méthode Producer :: publish () , la bibliothèque utilisera l'échangeur sans nom standard RabbitMQ, qui envoie des messages dans des files d'attente dont les noms sont passés en tant que deuxième argument à la méthode Producer :: publish () .

Nous pouvons maintenant envoyer des messages à notre nouvel échangeur en spécifiant son nom comme troisième argument de la méthode Producer :: publish () :

$producer->publish($message, '', 'logs');

Créons maintenant deux travailleurs avec deux files d'attente qui existeront tant que les travailleurs qui les auront annoncés seront actifs. Vous pouvez le faire comme suit:

//Worker 1
$producer->queue('queue-1')->setExclusive()->declare();

//Worker 2
$producer->queue('queue-2')->setExclusive()->declare();

Désormais, lorsque le travailleur qui a annoncé la file d'attente met fin à la session et se déconnecte, elle sera automatiquement supprimée. Si les deux employés terminent leur travail, les deux files d'attente seront supprimées.

Nous connectons la file d'attente avec l'échangeur


Nous avons déjà créé un échange et une file d'attente. Il ne nous reste plus qu'à demander à l'échangeur d'envoyer les messages reçus à nos files d'attente. Nous devons lier l' échangeur et les files d'attente. Pour ce faire, écrivez le code suivant:

//Worker 1
$producer->queue('queue-1')->bind('logs');

//Worker 2
$producer->queue('queue-2')->bind('logs');

Désormais, notre échangeur de journaux est connecté à nos files d'attente et leur transmettra les messages reçus.

Vous pouvez lister tous les liens existants avec la commande suivante sur linux

rabbitmqctl list_bindings

Assembler le code


Notre script d'expéditeur n'a pas beaucoup changé par rapport à la leçon précédente. La principale différence est qu'il envoie maintenant des messages à l'échangeur annoncé précédemment, et non à l'échangeur standard (accessible depuis la boîte). Voici le code send.php

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newFanoutExchange('logs');

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, '', 'logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Il convient de mentionner qu'en envoyant un message à un échangeur qui n'est associé à aucune file d'attente, le message sera perdu. Mais maintenant cela nous convient, puisque nous n'avons pas encore lancé nos destinataires.

Voici le code de notre premier travailleur worker-1.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();
   $producer->queue('queue-1')->bind('logs');

   $consumer->consume('queue-1');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Voici le code du second travailleur worker-2.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();
   $producer->queue('queue-2')->bind('logs');

   $consumer->consume('queue-2');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Si vous souhaitez enregistrer les messages envoyés au premier travailleur dans un fichier journal, appelez simplement la commande suivante dans la console:

php worker-1.php > logs_from_rabbit.log

Pour démarrer les deux travailleurs et afficher les messages dans les fenêtres du terminal, appelez les commandes suivantes, chacune dans sa propre fenêtre:

php worker-1.php

php worker-2.php

Et pour envoyer des messages, appelez la commande suivante:

php send.php

Dans la prochaine leçon, nous verrons comment obtenir uniquement un certain sous-ensemble de messages de tous les messages envoyés.

All Articles