Using RabbitMQ with MonsterMQ Part 5

In the previous part, we improved our logging system. Instead of using the fanout type exchanger, we used the direct type exchanger, which allowed us to selectively receive messages. Despite the improvements, our system still has limitations, for example, we cannot receive messages based on several criteria. For example, in our system, we might want to redirect messages based not only on the severity level of the message, but also on the message source. For example, as in the syslog unix tool, which redirects messages not only depending on the severity level (info / warn / crit ..), but also depending on the source (auth / cron / kern ...). This can give us additional flexibility, for example, we can receive only critical messages from 'cron', but also all messages from 'kern'. To implement such a system, we have to get acquainted with a more complex type of exchanger - topic .

Topic type exchanger


Messages sent to the topic type exchanger should have a routing key, which is a set of words separated by dots. Words can be arbitrary, but they are usually associated with some feature of the message. Here are some examples of valid routing keys: stock.usd.nyse , nyse.vmw , quick.orange.rabbit . The key is limited to 255 bytes.

The binding key should be specified in a similar way. Message routing in the topic type exchanger is similar to direct exchanger routing - the message is sent to the queue with the binding key matching the routing key. However, there are two differences:

  1. * (asterisk) in the binding key can only be replaced with one word
  2. # (pound) in the binding key can be replaced by zero or more words

This can be illustrated by the following image:

image
(image taken from the official RabbitMQ website )

In this example, we will send messages that represent animals. Message routing keys consist of three words (and two dots). The first word represents speed, the second represents color, and the third represents species.

In our example, two queues are connected to the exchanger: Q1 with the binding key "* .orange. *" And Q2 with two binding keys "*. *. Rabbit" and "lazy. #" .

These links will mean the following routing:

  • Q1 is interested in all orange animals.
  • Q2 (rabbit) (lazy)

A message with the routing key “quick.orange.rabbit” will be delivered in both queues. A message with the key "lazy.orange.elephant" will also be delivered to both.

However, the message with the key “quick.orange.fox” will only get in the first place, and with the key “lazy.brown.fox” only in the second. "Lazy.pink.rabbit" will be delivered a second time once, despite the fact that the routing key matches both binding keys.

“Quick.brown.fox” will not get into any queue, since the routing key does not match any of the binding keys. If you try to send messages with the number of words in the routing key smaller or larger (for example, "orange"or “quick.orange.male.rabbit” ) than in the binding key the message will be discarded.

The topic type exchanger can behave like a fanout type exchanger if you specify # as the binding key . Or as direct if you do not specify * * or # in the binding key , but simply indicate a word.

Putting the code together


We will use the topic type exchanger for our logging system. Let's start with the assumption that our message routing keys will look like "source.strict . " The code will be almost the same as in the previous part, here is send.php :

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newTopicExchange('topic-logs');

   $routingKey = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';

   $message = implode(' ', array_slice($argv, 2));
   $message = empty($message) ? "Hello World!" : $message;

   $producer->publish($message, $routingKey, 'topic-logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Code worker-1.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();

   $consumer->newTopicExchange('topic-logs');

   $bindingKeys = array_slice($argv, 1);
   if (empty($bindingKeys)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
      exit(1);
   }

   foreach ($bindingKeys as $key) {
      $producer->queue('queue-1')->bind('topic-logs', $key);
   }

   $consumer->consume('queue-1');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Code worker-2.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();

   $consumer->newTopicExchange('topic-logs');

   $bindingKeys = array_slice($argv, 1);
   if (empty($bindingKeys)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
      exit(1);
   }

   foreach ($bindingKeys as $key) {
      $producer->queue('queue-2')->bind('topic-logs', $key);
   }

   $consumer->consume('queue-2');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

To receive only critical messages by the first worker, call

php worker-1.php "*.critical"

To bind the queue that the second worker uses, call the exchanger with two bind keys:

php worker-2.php "kern.*" "*.critical"

To send a message, do something like:

php send.php "kern.critical" "A critical kernel error"

Experiment with these programs.

All Articles