Using RabbitMQ with MonsterMQ Part 3

In our previous article, we created a task queue. It assumes that one task in the form of a message is delivered to one recipient. In this article we will do something else, we will send one message to several recipients at once. We will create a logging system that will consist of two programs: the first will send messages, and the second will receive and display them. In our system, all running recipients will receive a message sent by the sender.

Exchanges (exchangers)


In previous articles, we sent and received messages from queues, now it's time to introduce a complete message forwarding model in RabbitMQ.

Let's quickly go through what we covered in previous articles:

  • Producer - an application that sends messages
  • Queue - a buffer that stores messages
  • Consumer - an application that receives and processes messages

The key idea of ​​RabbitMQ (or rather AMQP) is that the sender (Producer) never sends a message directly to the queue. Moreover, he doesn’t even know if it is in the queue. Instead, the sender sends messages to exchangers (Exchange). The exchanger is a very simple thing, it does two things: receives a message from senders and redirects them to the queue. Exchangers are of different types: some send messages to a specific queue (direct type), others send the same message to several queues at once (fanout type), others redirect messages to the queue based on specific, specified redirection rules (topic type).

image
(image taken from RabbitMQ official site )

Let's look at a fanout type exchanger. In order to create it, write the following code:

$producer->newFanoutExchange('logs');

The Fanout exchanger works according to a very simple scheme, it simply sends the received message to all queues attached to it.

To list all exchangers existing on the server, call rabbitmqctl

sudo rabbitmqctl list_exchanges

This list will contain amq. * Exchangers and a standard nameless (empty line) exchanger. Do not pay attention to them we do not need them yet.

In the last article, we did not know anything about exchangers, but nonetheless were able to send messages. This happened because if in MonsterMQ you do not explicitly specify the exchanger as the third argument to the Producer :: publish () method, the library will use the standard nameless exchanger RabbitMQ, which sends messages in queues whose names are passed as the second argument to the Producer :: publish () method .

Now we can send messages to our new exchanger by specifying its name as the third argument to the Producer :: publish () method :

$producer->publish($message, '', 'logs');

Now let's create two workers with two queues that will exist as long as the workers who announced them are active. You can do this as follows:

//Worker 1
$producer->queue('queue-1')->setExclusive()->declare();

//Worker 2
$producer->queue('queue-2')->setExclusive()->declare();

Now when the worker who has announced the queue ends the session and disconnects, it will automatically be deleted. If both workers complete their work, both queues will be deleted.

We connect the queue with the exchanger


We have already created an exchange and a queue. Now we just have to tell the exchanger to send the received messages to our queues. We need to bind the exchanger and queues. To do this, write the following code:

//Worker 1
$producer->queue('queue-1')->bind('logs');

//Worker 2
$producer->queue('queue-2')->bind('logs');

From now on, our logs exchanger is connected to our queues and will forward the received messages to them.

You can list all existing links with the following command on linux

rabbitmqctl list_bindings

Putting the code together


Our sender script has not changed much compared to the previous lesson. The main difference is that now it sends messages to the previously announced exchanger, and not to the standard one (accessible from the box). Here is the send.php code

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newFanoutExchange('logs');

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, '', 'logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

It is worth mentioning that by sending a message to an exchanger that is not associated with any queue, the message will be lost. But now it suits us, since we have not yet launched our recipients.

Here is the code for our first worker-1.php worker

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();
   $producer->queue('queue-1')->bind('logs');

   $consumer->consume('queue-1');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Here is the code for the second worker-2.php worker

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();
   $producer->queue('queue-2')->bind('logs');

   $consumer->consume('queue-2');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

If you want to save messages sent to the first worker in a log file, just call the following command in the console:

php worker-1.php > logs_from_rabbit.log

To start both workers and display messages in terminal windows, call the following commands, each in its own window:

php worker-1.php

php worker-2.php

And to send messages, call the following command:

php send.php

In the next lesson, we will look at how to get only a certain subset of messages from all sent.

All Articles