In our previous article, we created a task queue. It assumes that one task in the form of a message is delivered to one recipient. In this article we will do something else, we will send one message to several recipients at once. We will create a logging system that will consist of two programs: the first will send messages, and the second will receive and display them. In our system, all running recipients will receive a message sent by the sender.Exchanges (exchangers)
In previous articles, we sent and received messages from queues, now it's time to introduce a complete message forwarding model in RabbitMQ.Let's quickly go through what we covered in previous articles:- Producer - an application that sends messages
- Queue - a buffer that stores messages
- Consumer - an application that receives and processes messages
The key idea of ββRabbitMQ (or rather AMQP) is that the sender (Producer) never sends a message directly to the queue. Moreover, he doesnβt even know if it is in the queue. Instead, the sender sends messages to exchangers (Exchange). The exchanger is a very simple thing, it does two things: receives a message from senders and redirects them to the queue. Exchangers are of different types: some send messages to a specific queue (direct type), others send the same message to several queues at once (fanout type), others redirect messages to the queue based on specific, specified redirection rules (topic type).
(image taken from RabbitMQ official site )Let's look at a fanout type exchanger. In order to create it, write the following code:$producer->newFanoutExchange('logs');
The Fanout exchanger works according to a very simple scheme, it simply sends the received message to all queues attached to it.To list all exchangers existing on the server, call rabbitmqctlsudo rabbitmqctl list_exchanges
This list will contain amq. * Exchangers and a standard nameless (empty line) exchanger. Do not pay attention to them we do not need them yet.In the last article, we did not know anything about exchangers, but nonetheless were able to send messages. This happened because if in MonsterMQ you do not explicitly specify the exchanger as the third argument to the Producer :: publish () method, the library will use the standard nameless exchanger RabbitMQ, which sends messages in queues whose names are passed as the second argument to the Producer :: publish () method .Now we can send messages to our new exchanger by specifying its name as the third argument to the Producer :: publish () method :$producer->publish($message, '', 'logs');
Now let's create two workers with two queues that will exist as long as the workers who announced them are active. You can do this as follows:
$producer->queue('queue-1')->setExclusive()->declare();
$producer->queue('queue-2')->setExclusive()->declare();
Now when the worker who has announced the queue ends the session and disconnects, it will automatically be deleted. If both workers complete their work, both queues will be deleted.We connect the queue with the exchanger
We have already created an exchange and a queue. Now we just have to tell the exchanger to send the received messages to our queues. We need to bind the exchanger and queues. To do this, write the following code:
$producer->queue('queue-1')->bind('logs');
$producer->queue('queue-2')->bind('logs');
From now on, our logs exchanger is connected to our queues and will forward the received messages to them.You can list all existing links with the following command on linuxrabbitmqctl list_bindings
Putting the code together
Our sender script has not changed much compared to the previous lesson. The main difference is that now it sends messages to the previously announced exchanger, and not to the standard one (accessible from the box). Here is the send.php codetry {
$producer = \MonsterMQ\Client\Producer();
$producer->connect('127.0.0.1', 5672);
$producer->logIn('guest', 'guest');
$producer->newFanoutExchange('logs');
$message = implode(' ', array_slice($argv, 1));
$message = empty($message) ? 'Hello world!' : $message;
$producer->publish($message, '', 'logs');
echo "\n Sent {$message} \n";
} catch(\Exception $e) {
var_dump($e);
}
It is worth mentioning that by sending a message to an exchanger that is not associated with any queue, the message will be lost. But now it suits us, since we have not yet launched our recipients.Here is the code for our first worker-1.php workertry {
$consumer = \MonsterMQ\Client\Consumer();
$consumer->connect('127.0.0.1', 5672);
$consumer->logIn('guest', 'guest');
$producer->queue('queue-1')->setExclusive()->declare();
$producer->queue('queue-1')->bind('logs');
$consumer->consume('queue-1');
$consumer->wait(function ($message, $channelNumber) use ($consumer){
echo "\n $message \n";
});
} catch(\Exception $e) {
var_dump($e);
}
Here is the code for the second worker-2.php workertry {
$consumer = \MonsterMQ\Client\Consumer();
$consumer->connect('127.0.0.1', 5672);
$consumer->logIn('guest', 'guest');
$producer->queue('queue-2')->setExclusive()->declare();
$producer->queue('queue-2')->bind('logs');
$consumer->consume('queue-2');
$consumer->wait(function ($message, $channelNumber) use ($consumer){
echo "\n $message \n";
});
} catch(\Exception $e) {
var_dump($e);
}
If you want to save messages sent to the first worker in a log file, just call the following command in the console:php worker-1.php > logs_from_rabbit.log
To start both workers and display messages in terminal windows, call the following commands, each in its own window:php worker-1.php
php worker-2.php
And to send messages, call the following command:php send.php
In the next lesson, we will look at how to get only a certain subset of messages from all sent.