Utilisation de RabbitMQ avec MonsterMQ Partie 5

Dans la partie précédente, nous avons amélioré notre système d'enregistrement. Au lieu d'utiliser l'échangeur de type fanout, nous avons utilisé l'échangeur de type direct, ce qui nous a permis de recevoir sélectivement des messages. Malgré les améliorations, notre système a encore des limites, par exemple, nous ne pouvons pas recevoir de messages basés sur plusieurs critères. Par exemple, dans notre système, nous pourrions vouloir rediriger les messages en fonction non seulement du niveau de gravité du message, mais également de la source du message. Par exemple, comme dans l'outil syslog unix, qui redirige les messages non seulement en fonction du niveau de gravité (info / warn / crit ..), mais aussi en fonction de la source (auth / cron / kern ...). Cela peut nous donner une flexibilité supplémentaire, par exemple, nous ne pouvons recevoir que les messages critiques de «cron», mais aussi tous les messages de «kern». Pour mettre en œuvre un tel système, nous devons nous familiariser avec un type d'échangeur plus complexe - le sujet .

Échangeur à thème


Les messages envoyés à l'échangeur de type de rubrique doivent avoir une clé de routage, qui est un ensemble de mots séparés par des points. Les mots peuvent être arbitraires, mais ils sont généralement associés à une caractéristique du message. Voici quelques exemples de clés de routage valides: stock.usd.nyse , nyse.vmw , quick.orange.rabbit . La clé est limitée à 255 octets.

La clé de liaison doit être spécifiée de manière similaire. Le routage des messages dans l'échangeur de type de rubrique est similaire au routage direct de l'échangeur - le message est envoyé à la file d'attente avec la clé de liaison correspondant à la clé de routage. Cependant, il existe deux différences:

  1. * (astérisque) dans la clé de liaison ne peut être remplacé que par un seul mot
  2. # (livre) dans la clé de liaison peut être remplacé par zéro ou plusieurs mots

Cela peut être illustré par l'image suivante:

image
(image tirée du site officiel de RabbitMQ )

Dans cet exemple, nous enverrons des messages représentant des animaux. Les clés de routage des messages sont composées de trois mots (et de deux points). Le premier mot représente la vitesse, le second représente la couleur et le troisième représente les espèces.

Dans notre exemple, deux files d'attente sont connectées à l'échangeur: Q1 avec la clé de liaison "* .orange. *" Et Q2 avec deux clés de liaison "*. *. Rabbit" et "paresseux. #" .

Ces liens signifieront le routage suivant:

  • Q1 s'intéresse à tous les animaux oranges.
  • Q2 (rabbit) (lazy)

Un message avec la clé de routage "quick.orange.rabbit" sera remis dans les deux files d'attente. Un message avec la clé "lazy.orange.elephant" sera également remis aux deux.

Cependant, le message avec la clé "quick.orange.fox" ne sera obtenu qu'en premier lieu, et avec la clé "lazy.brown.fox" qu'en deuxième. «Lazy.pink.rabbit» sera livré une deuxième fois une fois, malgré le fait que la clé de routage correspond aux deux clés de liaison.

"Quick.brown.fox" n'entrera dans aucune file d'attente, car la clé de routage ne correspond à aucune des clés de liaison. Si vous essayez d'envoyer des messages avec un nombre de mots dans la clé de routage plus petit ou plus grand (par exemple, "orange"ou "quick.orange.male.rabbit" ) que dans la clé de liaison, le message sera rejeté.

L'échangeur de type de rubrique peut se comporter comme un échangeur de type fanout si vous spécifiez # comme clé de liaison . Ou comme direct si vous ne spécifiez pas * * ou # dans la clé de liaison , mais indiquez simplement un mot.

Assembler le code


Nous utiliserons l'échangeur de type de sujet pour notre système de journalisation. Commençons par l'hypothèse que nos clés de routage des messages ressembleront à "source.strict" . Le code sera presque le même que dans la partie précédente, voici send.php :

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newTopicExchange('topic-logs');

   $routingKey = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';

   $message = implode(' ', array_slice($argv, 2));
   $message = empty($message) ? "Hello World!" : $message;

   $producer->publish($message, $routingKey, 'topic-logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Code travailleur-1.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();

   $consumer->newTopicExchange('topic-logs');

   $bindingKeys = array_slice($argv, 1);
   if (empty($bindingKeys)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
      exit(1);
   }

   foreach ($bindingKeys as $key) {
      $producer->queue('queue-1')->bind('topic-logs', $key);
   }

   $consumer->consume('queue-1');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Code travailleur-2.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();

   $consumer->newTopicExchange('topic-logs');

   $bindingKeys = array_slice($argv, 1);
   if (empty($bindingKeys)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
      exit(1);
   }

   foreach ($bindingKeys as $key) {
      $producer->queue('queue-2')->bind('topic-logs', $key);
   }

   $consumer->consume('queue-2');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Pour recevoir uniquement les messages critiques du premier travailleur, appelez

php worker-1.php "*.critical"

Pour lier la file d'attente utilisée par le deuxième travailleur, appelez l'échangeur avec deux clés de liaison:

php worker-2.php "kern.*" "*.critical"

Pour envoyer un message, faites quelque chose comme:

php send.php "kern.critical" "A critical kernel error"

Expérimentez avec ces programmes.

All Articles