Using RabbitMQ with MonsterMQ Part 2

image
(Image taken from the official RabbitMQ website )

In the first article, we wrote two PHP programs that use RabbitMQ: one sent messages, the other received. In this article, we will look at how to create a queue that will distribute tasks that consume a significant amount of time among many workers (message handlers).

The main idea is not to complete tasks that consume a significant amount of time immediately, but to add them to the queue in the form of a message. Later, the working worker receives a message from the queue, which he consumes and performs the task as if in the background.
This concept is very relevant for web applications, where, for example, you may need to perform some time-consuming tasks, the result of which is not immediately required, for example, sending email or making an HTTP request to a third-party application (for example, via libcurl to PHP).

Cooking


In this part, we will send messages that will represent tasks for workers. Since we do not have real tasks, like processing images or generating pdf files, we will pretend that we are busy using the sleep () function . We will take the number of points in the message as the complexity of the task. Each dot will mean one second of work, for example, the message Hello ... will take three seconds of work.

We will slightly modify our send.php script from the last lesson so that it can send arbitrary messages.

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->queue('test-queue')->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Our receive.php script also needs to be changed. It should simulate operation for one second for each point in the received message. Let's rename the file to worker.php and write the following code in it:

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('test-queue')->declare();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Next, run both scripts in different terminals:

# Shell 1
php worker.php 

# Shell 2
php send.php "A very hard task which takes two seconds.."

One of the advantages of using task queues is the ability to distribute work among many workers. Let's try to run two worker.php scripts at once in two different terminals. And in the third we will send messages with the send.php script

# Shell 1
php worker.php 

# Shell 2
php worker.php 

# Shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

Next, let's see what our workers brought out:
# shell 1
php worker.php
# Received: First message.
# Done
# Received: Third message...
# Done
# Received: Fifth message.....
# Done

# shell 2
php worker.php
# Received: Second message..
# Done
# Received: Fourth message....
# Done

By default, RabbitMQ will send each subsequent message to the next consuming recipient in turn. On average, each recipient will receive the same number of messages, this method of distributing messages is called round-robin (in a circle). Try this with three or more workers.

Message confirmation


Tasks can take a certain amount of time. Perhaps you are interested in what will happen to the message if you finish the work of the recipient who did not manage to process this message to the end. With our current code, the message will be returned to the queue, since message acknowledgment is enabled by default in MonsterMQ.

When we use message confirmation, we tell RabbitMQ that the message has been processed and he has the right to remove it from the queue. If the recipient completed his work without sending confirmation (for example, as a result of an unexpected termination of the TCP connection), RabbitMQ will understand that the message was not processed and will return it to the queue, trying to deliver it to other available workers. Thus, you can be sure that messages will not be lost, even in the event of an unexpected shutdown of any of the workers.

To disable message acknowledgment in MonsterMQ, you can pass true as the second argument to consume ()

$consumer->consume('test-queue', true);

Forgetting the confirmation of received messages is a fairly common, easily tolerated error, which can lead to serious consequences. If this happens, messages will be delivered again and again and will also accumulate in the queues taking up more and more memory. To debug this error, use rabbitmqctl to display the messages_unacknowledged field

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

On windows, drop sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Queue Security


Despite the fact that our messages will not be lost if any of the workers unexpectedly terminates their work, nevertheless, we can still lose the created queues if RabbitMQ is shut down.

To protect yourself from losing queues, you need to declare the queue as durable (persistent, durable). Since the queues are idempotent, that is, we cannot change or recreate it by calling the declaration method with the same name, we will have to declare a new queue. Let's do it as follows

$consumer->queue('new-queue')->setDurable()->declare();

Remember to change the queue announcement code in the sender code as well.

Fair distribution of messages


You may have noticed that messages between our two workers are still not distributed fairly honestly. If, for example, every even message is time-consuming, and every odd message is processed quickly, then one worker receiving time-consuming messages will always be busy, while the second will be idle. To avoid this, you can use quality of service . Let's add the following line to our code

$consumer->qos()->prefetchCount(1)->perConsumer()->apply();

This line tells RabbitMQ not to send messages to the recipient until it processes and acknowledges the current message. perConsumer () applies quality of service to all channels of the recipient, use the perChannel () method if you want to apply quality of service only to the current channel.

image
(Image taken from RabbitMQ official site )

Putting the whole code together


This will look like our send.php (sender)

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

And so the recipient worker.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $consumer->qos()->prefetchCount(1)->perConsumer()->apply();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

That's all, in the next lesson we will learn how to send messages to many recipients from the same queue.

All Articles