Usando RabbitMQ con MonsterMQ Parte 3

En nuestro artículo anterior, creamos una cola de tareas. Se supone que una tarea en forma de mensaje se entrega a un destinatario. En este artículo haremos algo más, enviaremos un mensaje a varios destinatarios a la vez. Crearemos un sistema de registro que constará de dos programas: el primero enviará mensajes y el segundo los recibirá y mostrará. En nuestro sistema, todos los destinatarios en ejecución recibirán un mensaje enviado por el remitente.

Intercambios (intercambiadores)


En artículos anteriores, enviamos y recibimos mensajes de las colas, ahora es el momento de presentar un modelo completo de reenvío de mensajes en RabbitMQ.

Repasemos rápidamente lo que cubrimos en artículos anteriores:

  • Productor: una aplicación que envía mensajes
  • Cola: un búfer que almacena mensajes
  • Consumidor: una aplicación que recibe y procesa mensajes

La idea clave de RabbitMQ (o más bien AMQP) es que el remitente (Productor) nunca envía un mensaje directamente a la cola. Además, él ni siquiera sabe si está en la cola. En cambio, el remitente envía mensajes a los intercambiadores (Exchange). El intercambiador es algo muy simple, hace dos cosas: recibe un mensaje de los remitentes y los redirige a la cola. Los intercambiadores son de diferentes tipos: algunos envían mensajes a una cola específica (tipo directo), otros envían el mismo mensaje a varias colas a la vez (tipo de despliegue), otros redirigen los mensajes a una cola en función de reglas específicas y específicas de redirección (tipo de tema).

imagen
(imagen tomada del sitio oficial de RabbitMQ )

Veamos un intercambiador de tipo fanout. Para crearlo, escriba el siguiente código:

$producer->newFanoutExchange('logs');

El intercambiador Fanout funciona de acuerdo con un esquema muy simple, simplemente envía el mensaje recibido a todas las colas adjuntas.

Para enumerar todos los intercambiadores existentes en el servidor, llame a rabbitmqctl

sudo rabbitmqctl list_exchanges

Esta lista contendrá amq. * Intercambiadores y un intercambiador estándar sin nombre (línea vacía). No les preste atención, todavía no los necesitamos.

En el último artículo, no sabíamos nada acerca de los intercambiadores, pero aun así pudimos enviar mensajes. Esto sucedió porque si en MonsterMQ no especifica explícitamente el intercambiador como el tercer argumento para el método Producer :: Publish () , la biblioteca utilizará el intercambiador estándar sin nombre RabbitMQ, que envía mensajes en colas cuyos nombres se pasan como el segundo argumento para el método Producer :: Publish () .

Ahora podemos enviar mensajes a nuestro nuevo intercambiador especificando su nombre como el tercer argumento para el método Producer :: Publish () :

$producer->publish($message, '', 'logs');

Ahora creemos dos trabajadores con dos colas que existirán mientras los trabajadores que los anunciaron estén activos. Puede hacer esto de la siguiente manera:

//Worker 1
$producer->queue('queue-1')->setExclusive()->declare();

//Worker 2
$producer->queue('queue-2')->setExclusive()->declare();

Ahora, cuando el trabajador que ha anunciado la cola finaliza la sesión y se desconecta, se eliminará automáticamente. Si ambos trabajadores completan su trabajo, se eliminarán ambas colas.

Conectamos la cola con el intercambiador


Ya hemos creado un intercambio y una cola. Ahora solo tenemos que decirle al intercambiador que envíe los mensajes recibidos a nuestras colas. Necesitamos vincular el intercambiador y las colas. Para hacer esto, escriba el siguiente código:

//Worker 1
$producer->queue('queue-1')->bind('logs');

//Worker 2
$producer->queue('queue-2')->bind('logs');

A partir de ahora, nuestro intercambiador de registros está conectado a nuestras colas y les enviará los mensajes recibidos.

Puede enumerar todos los enlaces existentes con el siguiente comando en Linux

rabbitmqctl list_bindings

Poniendo el código juntos


Nuestro script de remitente no ha cambiado mucho en comparación con la lección anterior. La principal diferencia es que ahora envía mensajes al intercambiador anunciado previamente, y no al estándar (accesible desde la caja). Aquí está el código send.php

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newFanoutExchange('logs');

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, '', 'logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Vale la pena mencionar que al enviar un mensaje a un intercambiador que no está asociado con ninguna cola, el mensaje se perderá. Pero ahora nos conviene, ya que aún no hemos lanzado a nuestros destinatarios.

Aquí está el código para nuestro primer trabajador worker-1.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();
   $producer->queue('queue-1')->bind('logs');

   $consumer->consume('queue-1');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Aquí está el código para el segundo trabajador trabajador-2.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();
   $producer->queue('queue-2')->bind('logs');

   $consumer->consume('queue-2');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Si desea guardar los mensajes enviados al primer trabajador en un archivo de registro, simplemente llame al siguiente comando en la consola:

php worker-1.php > logs_from_rabbit.log

Para iniciar ambos trabajadores y mostrar mensajes en las ventanas de terminal, llame a los siguientes comandos, cada uno en su propia ventana:

php worker-1.php

php worker-2.php

Y para enviar mensajes, llame al siguiente comando:

php send.php

En la próxima lección, veremos cómo obtener solo un cierto subconjunto de mensajes de todos los enviados.

All Articles