Utilisation de RabbitMQ avec MonsterMQ Partie 4

Dans un article précédent, nous avons écrit un système de journalisation simple. Dans lequel nous avons envoyé des messages à plusieurs destinataires à la fois. Dans cet article, nous y ajouterons une nouvelle fonction qui permettra aux destinataires de ne recevoir qu'un sous-ensemble spécifique de messages. Pour cette raison, nous pouvons, par exemple, envoyer uniquement des messages critiques dans un fichier journal, tout en affichant tous les messages envoyés dans une fenêtre de terminal.

Liens entre la file d'attente et l'échangeur


Dans l'article précédent, nous avons appris à connecter une file d'attente à un échangeur. À propos de ce code:

$producer->queue('queue-1')->bind('logs');

De plus, la méthode de liaison peut accepter une deuxième clé de routage d' argument (clé de routage). Afin de ne pas la confondre avec la clé de routage, qui adopte la méthode Producer :: publish () comme deuxième paramètre, appelons-la une clé de liaison (clé de liaison). Voici comment lier l'échangeur à la file d'attente à l'aide de la clé de liaison:

$producer->queue('queue-2')->bind('logs', 'binding-key');

La valeur de la clé de liaison dépend du type d'échangeur. L'échangeur de fanout de la leçon précédente ignore simplement cette valeur.

Échangeur direct


Notre système d'enregistrement du dernier article envoie tous les messages à tous les destinataires.

Modifions-le légèrement afin qu'il filtre les messages en fonction du niveau de gravité. Par exemple, pour qu'elle n'écrive que les messages critiques dans le fichier journal et qu'elle ne gaspille pas d'espace disque sur les messages d'avertissement et d'information.

L'échangeur Fanout ne nous offre pas une telle opportunité.

Mais nous pouvons utiliser un échangeur direct qui envoie des messages uniquement aux files d'attente dont la clé de liaison correspond à la clé de routage du message envoyé (le deuxième argument de la méthode publish ()). Ce mécanisme d'action est illustré par l'image suivante:

image
(image tirée du site officiel de RabbitMQ )

Cette image montre l'échangeur Xde type direct et deux files d'attente - une avec une clé de reliure orange et la seconde avec deux clés de reliure noire et verte .

Dans cette installation, les messages avec la clé de routage orange seront envoyés en premier, et les messages avec les clés de routage noires ou vertes seront envoyés à la seconde. Tout autre message sera ignoré.

Reliure multiple


image
(image prise sur le site officiel de RabbitMQ )

Il est parfaitement acceptable de lier l'échangeur avec plusieurs files d'attente avec une seule clé de liaison. Dans notre exemple, nous pouvons ajouter une connexion entre l'échangeur X et la file d'attente Q1 à l'aide de la clé de liaison noire . Dans ce cas, notre échangeur de type direct se comportera comme un échangeur de type fanout, il enverra des messages à toutes les files d'attente avec la clé de liaison correspondante. Un message avec la clé de routage noire sera remis à Q1 et Q2 .

Communiqué de message


Nous utiliserons le modèle décrit ci-dessus pour notre système d'enregistrement. Au lieu d'envoyer des messages à l'échangeur fanout, nous les enverrons à l'échangeur direct. Nous indiquerons le niveau de gravité du message sous forme de clé de routage. Ainsi, le script destinataire pourra sélectionner des messages du niveau de gravité souhaité. Mais d'abord, concentrons-nous sur la diffusion des messages. Comme toujours, pour cela, nous devons d'abord créer un échangeur.


$producer->newDirectExchange('my-logs');

Maintenant, nous pouvons envoyer des messages:


$producer->newDirectExchange('my-logs');
$producer->publish($message, $severity, 'my-logs');

Afin de ne pas compliquer les choses, supposons que le niveau de gravité ($ gravity) ne peut être que 'info', 'warning', 'error'.

Abonnement


Comme dans la partie précédente, nous allons créer deux ouvriers qui annonceront les files d'attente et les associeront à l'échangeur comme suit:


// Worker 1
foreach ($severities as $severity) {
   $consumer->queue('queue-1')->bind('my-logs', $severity);
}

Mettre tous ensemble


image
(image tirée du site officiel de RabbitMQ )

code send.php :


try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newDirectExchange('my-logs');

   $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

   $message = implode(' ', array_slice($argv, 2));
   $message = empty($message) ? "Hello World!" : $message;

   $producer->publish($message, $severity, 'my-logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Code travailleur-1.php :


try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();

   $severities = array_slice($argv, 1);
   if (empty($severities)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
      exit(1);
   }

   foreach ($severities as $severity) {
      $producer->queue('queue-1')->bind('my-logs', $severity);
   }

   $consumer->consume('queue-1');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Le code worker-2.php ne diffère de worker-1.php que par le nom de la file d'attente associée à l'échangeur et transmis à la méthode consume () . Remplacez 'queue-1' par 'queue-2'.

Maintenant, si vous souhaitez que le premier travailleur enregistre les messages de niveau "erreur" et "avertissement" dans un fichier, exécutez la commande suivante dans le terminal:

php worker-1.php warning error > logs_from_rabbit.log

Et si vous souhaitez que le deuxième travailleur affiche tous les messages dans la fenêtre du terminal, procédez comme suit:

php worker-2.php info warning error
# => Waiting for logs. To exit press CTRL+C

Et pour émettre un message d'erreur, écrivez ce qui suit:

php send.php error "Run. Run. Or it will explode."

C'est tout. Dans la partie suivante, nous apprendrons à recevoir des messages en fonction d'un modèle spécifique.

All Articles