Usando RabbitMQ com MonsterMQ Parte 4

Em um artigo anterior, escrevemos um sistema de registro simples. Em que enviamos mensagens para vários destinatários ao mesmo tempo. Neste artigo, adicionaremos uma nova função que permitirá que os destinatários recebam apenas um subconjunto específico de mensagens. Por esse motivo, podemos, por exemplo, enviar apenas mensagens críticas para um arquivo de log, exibindo ao mesmo tempo todas as mensagens enviadas em uma janela do terminal.

Links entre a fila e o trocador


No artigo anterior, aprendemos como conectar uma fila a um trocador. Sobre este código:

$producer->queue('queue-1')->bind('logs');

Além disso, o método de ligação pode aceitar uma segunda chave de roteamento de argumento (chave de roteamento). Para não confundi-lo com a chave de roteamento, que adota o método Producer :: publish () como o segundo parâmetro, vamos chamá-lo de chave de ligação (chave de ligação). Aqui está como podemos ligar o trocador à fila usando a chave de ligação:

$producer->queue('queue-2')->bind('logs', 'binding-key');

O valor da chave de ligação depende do tipo de trocador. O trocador de fanout da lição anterior simplesmente ignora esse valor.

Trocador direto


Nosso sistema de registro em log do último artigo envia todas as mensagens para todos os destinatários.

Vamos mudar um pouco para filtrar as mensagens com base no nível de gravidade. Por exemplo, para que ela grave apenas mensagens críticas no arquivo de log e não perca espaço em disco em mensagens de aviso e informações.

O trocador de fanout não nos dá essa oportunidade.

Mas podemos usar um trocador direto que envia mensagens apenas para as filas cuja chave de ligação corresponde à chave de roteamento da mensagem enviada (o segundo argumento para o método publish ()). Esse mecanismo de ação é ilustrado pela figura a seguir:

imagem
(imagem tirada do site oficial do RabbitMQ )

Esta imagem mostra o trocador Xdo tipo direto e duas filas - uma com uma chave de ligação laranja e a segunda com duas chaves de ligação preto e verde .

Nesta instalação, as mensagens com a tecla de roteamento laranja serão enviadas primeiro e as mensagens com as teclas de roteamento em preto ou verde serão enviadas para a segunda. Quaisquer outras mensagens serão descartadas.

Ligação múltipla


imagem
(imagem retirada do site oficial do RabbitMQ )

É perfeitamente aceitável vincular o trocador a várias filas com uma chave de vinculação. Em nosso exemplo, podemos adicionar uma conexão entre o trocador X e a fila Q1 usando a chave de ligação preta . Nesse caso, nosso trocador de tipo direto se comportará como um trocador de fanout e enviará mensagens para todas as filas com a chave de ligação correspondente. Uma mensagem com a chave de roteamento preta será entregue ao Q1 e Q2 .

Liberação de mensagem


Usaremos o modelo descrito acima para o nosso sistema de registro. Em vez de enviar mensagens para o trocador fanout, nós as enviaremos para o trocador direto. Vamos indicar o nível de gravidade da mensagem como uma chave de roteamento. Assim, o script do destinatário poderá selecionar mensagens do nível de gravidade desejado. Mas primeiro, vamos nos concentrar em liberar mensagens. Como sempre, para isso, primeiro precisamos criar um trocador.


$producer->newDirectExchange('my-logs');

Agora podemos enviar mensagens:


$producer->newDirectExchange('my-logs');
$producer->publish($message, $severity, 'my-logs');

Para não complicar as coisas, vamos supor que o nível de gravidade ($ severity) possa ser apenas 'info', 'warning', 'error'.

Inscrição


Como na parte anterior, criaremos dois trabalhadores que anunciarão as filas e as associarão ao trocador da seguinte maneira:


// Worker 1
foreach ($severities as $severity) {
   $consumer->queue('queue-1')->bind('my-logs', $severity);
}

Juntando tudo


imagem
(imagem retirada do site oficial do RabbitMQ )

código send.php :


try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newDirectExchange('my-logs');

   $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

   $message = implode(' ', array_slice($argv, 2));
   $message = empty($message) ? "Hello World!" : $message;

   $producer->publish($message, $severity, 'my-logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Código worker-1.php :


try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();

   $severities = array_slice($argv, 1);
   if (empty($severities)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
      exit(1);
   }

   foreach ($severities as $severity) {
      $producer->queue('queue-1')->bind('my-logs', $severity);
   }

   $consumer->consume('queue-1');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

O código worker-2.php será diferente de worker-1.php apenas no nome da fila associada ao trocador e passado para o método consume () . Substitua 'fila-1' por 'fila-2'.

Agora, se você quiser que o primeiro trabalhador salve as mensagens do nível 'erro' e 'aviso' em um arquivo, execute o seguinte comando no terminal:

php worker-1.php warning error > logs_from_rabbit.log

E se você quiser que o segundo trabalhador exiba todas as mensagens na janela do terminal, faça o seguinte:

php worker-2.php info warning error
# => Waiting for logs. To exit press CTRL+C

E, para emitir uma mensagem de erro, escreva o seguinte:

php send.php error "Run. Run. Or it will explode."

Isso é tudo. Na próxima parte, aprenderemos como receber mensagens com base em um padrão específico.

All Articles