Using RabbitMQ with MonsterMQ Part 4

In a previous article, we wrote a simple logging system. In which we sent messages to several recipients at once. In this article, we will add a new function to it that will allow recipients to receive only a specific subset of messages. Due to this, we can, for example, send only critical messages to a log file, at the same time displaying all sent messages in a terminal window.

Links between the queue and the exchanger


In the previous article, we learned how to connect a queue with an exchanger. About this code:

$producer->queue('queue-1')->bind('logs');

Furthermore, the binding method can accept a second argument routing key (routing key). In order not to confuse it with the routing key, which adopts the method Producer :: publish () as the second parameter, let's call it a binding key (binding key). Here's how we can bind the exchanger with the queue using the bind key:

$producer->queue('queue-2')->bind('logs', 'binding-key');

The value of the binding key depends on the type of exchanger. The fanout exchanger from the previous lesson simply ignores this value.

Direct exchanger


Our logging system from the last article sends all messages to all recipients.

Let's change it slightly so that it filters messages based on the severity level. For example, so that she writes only critical messages to the log file, and does not waste disk space on warning and info messages.

Fanout exchanger does not give us such an opportunity.

But we can use a direct exchanger that sends messages only to those queues whose binding key matches the routing key of the sent message (the second argument to the publish () method). This mechanism of action is illustrated by the following picture:

image
(image taken from the official site of RabbitMQ )

This picture shows the exchanger Xof direct type and two queues - one with an orange binding key and the second with two black and green binding keys .

In this installation, messages with the orange routing key will be sent first, and messages with the black or green routing keys will be sent to the second. Any other messages will be discarded.

Multiple binding


image
(image taken from the official RabbitMQ website )

It is perfectly acceptable to bind the exchanger with several queues with one bind key. In our example, we can add a connection between the exchanger X and the Q1 queue using the black binding key . In this case, our direct type exchanger will behave like a fanout type exchanger, it will send messages to all queues with the matching binding key. A message with the black routing key will be delivered to both Q1 and Q2 .

Message Release


We will use the model described above for our logging system. Instead of sending messages to the fanout exchanger, we will send them to the direct exchanger. We will indicate the severity level of the message as a routing key. Thus, the recipient script will be able to select messages of the desired level of severity. But first, let's focus on releasing messages. As always, for this we first need to create an exchanger.


$producer->newDirectExchange('my-logs');

Now we can send messages:


$producer->newDirectExchange('my-logs');
$producer->publish($message, $severity, 'my-logs');

In order not to complicate things, let's assume that the level of severity ($ severity) can only be 'info', 'warning', 'error'.

Subscription


As in the previous part, we will create two workers who will announce the queues and associate them with the exchanger as follows:


// Worker 1
foreach ($severities as $severity) {
   $consumer->queue('queue-1')->bind('my-logs', $severity);
}

Putting it all together


image
(image taken from the official RabbitMQ website ) send.php

code :


try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newDirectExchange('my-logs');

   $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

   $message = implode(' ', array_slice($argv, 2));
   $message = empty($message) ? "Hello World!" : $message;

   $producer->publish($message, $severity, 'my-logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Code worker-1.php :


try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();

   $severities = array_slice($argv, 1);
   if (empty($severities)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
      exit(1);
   }

   foreach ($severities as $severity) {
      $producer->queue('queue-1')->bind('my-logs', $severity);
   }

   $consumer->consume('queue-1');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

The worker-2.php code will differ from worker-1.php only in the name of the queue associated with the exchanger and passed to the consume () method . Replace 'queue-1' with 'queue-2'.

Now, if you want the first worker to save messages of the 'error' and 'warning' level to a file, run the following command in the terminal:

php worker-1.php warning error > logs_from_rabbit.log

And if you want the second worker to display all messages in the terminal window, do the following:

php worker-2.php info warning error
# => Waiting for logs. To exit press CTRL+C

And in order to issue error message write the following:

php send.php error "Run. Run. Or it will explode."

That's all. In the next part, we will learn how to receive messages based on a specific pattern.

All Articles