Usando RabbitMQ con MonsterMQ Parte 2

imagen
(Imagen tomada del sitio web oficial de RabbitMQ )

En el primer artículo, escribimos dos programas PHP que usan RabbitMQ: uno envió mensajes y el otro recibió. En este artículo, discutiremos cómo crear una cola que distribuirá tareas que consuman una cantidad significativa de tiempo entre muchos trabajadores (manejadores de mensajes).

La idea principal no es completar tareas que consuman una cantidad significativa de tiempo inmediatamente, sino agregarlas a la cola en forma de mensaje. Más tarde, el trabajador trabajador recibe un mensaje de la cola, que consume y realiza la tarea como si estuviera en segundo plano.
Este concepto es muy relevante para las aplicaciones web, donde, por ejemplo, es posible que deba realizar algunas tareas que requieren mucho tiempo, cuyo resultado no se requiere de inmediato, por ejemplo, enviar un correo electrónico o realizar una solicitud HTTP a una aplicación de terceros (por ejemplo, a través de libcurl a PHP).

Cocinando


En esta parte, enviaremos mensajes que representarán tareas para los trabajadores. Como no tenemos tareas reales, como procesar imágenes o generar archivos pdf, simulamos que estamos ocupados usando la función sleep () . Tomaremos el número de puntos en el mensaje como la complejidad de la tarea. Cada punto significará un segundo de trabajo, por ejemplo, el mensaje Hola ... tomará tres segundos de trabajo. Modificaremos

ligeramente nuestro script send.php de la última lección para que pueda enviar mensajes arbitrarios.

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->queue('test-queue')->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Nuestro script recibo.php también necesita ser cambiado. Debe simular la operación durante un segundo para cada punto en el mensaje recibido. Cambiemos el nombre del archivo a worker.php y escriba el siguiente código:

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('test-queue')->declare();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

A continuación, ejecute ambos scripts en diferentes terminales:

# Shell 1
php worker.php 

# Shell 2
php send.php "A very hard task which takes two seconds.."

Una de las ventajas de usar colas de tareas es la capacidad de distribuir el trabajo entre muchos trabajadores. Intentemos ejecutar dos scripts worker.php a la vez en dos terminales diferentes. Y en el tercero enviaremos mensajes con el script send.php

# Shell 1
php worker.php 

# Shell 2
php worker.php 

# Shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

A continuación, veamos qué aportaron nuestros trabajadores:
# shell 1
php worker.php
# Received: First message.
# Done
# Received: Third message...
# Done
# Received: Fifth message.....
# Done

# shell 2
php worker.php
# Received: Second message..
# Done
# Received: Fourth message....
# Done

Por defecto, RabbitMQ enviará cada mensaje subsiguiente al siguiente destinatario consumidor a su vez. En promedio, cada destinatario recibirá la misma cantidad de mensajes, este método de distribución de mensajes se llama round-robin (en un círculo). Intente esto con tres o más trabajadores.

Mensaje de confirmación


Las tareas pueden llevar cierto tiempo. Quizás esté interesado en lo que sucederá con el mensaje si finaliza el trabajo del destinatario que no logró procesar este mensaje hasta el final. Con nuestro código actual, el mensaje volverá a la cola, ya que el reconocimiento de mensajes está habilitado de forma predeterminada en MonsterMQ.

Cuando usamos la confirmación del mensaje, le decimos a RabbitMQ que el mensaje ha sido procesado y que tiene el derecho de eliminarlo de la cola. Si el destinatario completó su trabajo sin enviar confirmación (por ejemplo, como resultado de una terminación inesperada de la conexión TCP), RabbitMQ comprenderá que el mensaje no se procesó y lo devolverá a la cola, intentando entregarlo a otros trabajadores disponibles. Por lo tanto, puede estar seguro de que los mensajes no se perderán, incluso en el caso de un cierre inesperado de cualquiera de los trabajadores.

Para deshabilitar el reconocimiento de mensajes en MonsterMQ, puede pasar true como el segundo argumento para consumir ()

$consumer->consume('test-queue', true);

Olvidar la confirmación de los mensajes recibidos es un error bastante común y fácil de tolerar, que puede tener graves consecuencias. Si esto sucede, los mensajes se entregarán una y otra vez y también se acumularán en las colas y ocuparán cada vez más memoria. Para depurar este error, use rabbitmqctl para mostrar el campo messages_unacknowledged

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

En windows, suelta sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Seguridad de cola


A pesar del hecho de que nuestros mensajes no se perderán si alguno de los trabajadores termina inesperadamente su trabajo, sin embargo, aún podemos perder las colas creadas si RabbitMQ se cierra.

Para protegerse de la pérdida de colas, debe declarar la cola como duradera (persistente, duradera). Dado que las colas son idempotentes, es decir, no podemos cambiarlo ni recrearlo llamando al método de declaración con el mismo nombre, tendremos que declarar una nueva cola. Hagámoslo de la siguiente manera

$consumer->queue('new-queue')->setDurable()->declare();

Recuerde cambiar también el código de anuncio de la cola en el código del remitente.

Distribución justa de mensajes


Es posible que haya notado que los mensajes entre nuestros dos trabajadores todavía no se distribuyen con honestidad. Si, por ejemplo, cada mensaje par lleva mucho tiempo, y cada mensaje impar se procesa rápidamente, entonces un trabajador que recibe mensajes largos siempre estará ocupado, mientras que el segundo estará inactivo. Para evitar esto, puede utilizar la calidad del servicio . Agreguemos la siguiente línea a nuestro código

$consumer->qos()->prefetchCount(1)->perConsumer()->apply();

Esta línea le dice a RabbitMQ que no envíe mensajes al destinatario hasta que procese y reconozca el mensaje actual. perConsumer () aplica la calidad de servicio a todos los canales del destinatario, use el método perChannel () si desea aplicar la calidad de servicio solo al canal actual.

imagen
(Imagen tomada del sitio oficial de RabbitMQ )

Poniendo todo el código juntos


Esto se verá como nuestro send.php (remitente)

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Y así, el destinatario trabajador.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $consumer->qos()->prefetchCount(1)->perConsumer()->apply();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Eso es todo, en la próxima lección aprenderemos cómo enviar mensajes a muchos destinatarios desde la misma cola.

All Articles