Usando RabbitMQ com MonsterMQ Parte 5

Na parte anterior, melhoramos nosso sistema de registro. Em vez de usar o trocador de tipos fanout, usamos o trocador direto, o que nos permitiu receber mensagens seletivamente. Apesar das melhorias, nosso sistema ainda possui limitações, por exemplo, não podemos receber mensagens com base em vários critérios. Por exemplo, em nosso sistema, podemos redirecionar as mensagens com base não apenas no nível de gravidade da mensagem, mas também na origem da mensagem. Por exemplo, como na ferramenta syslog unix, que redireciona as mensagens não apenas dependendo do nível de gravidade (info / warn / crit ..), mas também dependendo da fonte (auth / cron / kern ...). Isso pode nos dar flexibilidade adicional, por exemplo, podemos receber apenas mensagens críticas do 'cron', mas também todas as mensagens do 'kern'. Para implementar esse sistema, precisamos nos familiarizar com um tipo mais complexo de trocador - tópico .

Trocador de tipo de tópico


As mensagens enviadas para o trocador de tipos de tópicos devem ter uma chave de roteamento, que é um conjunto de palavras separadas por pontos. As palavras podem ser arbitrárias, mas geralmente são associadas a algum recurso da mensagem. Aqui estão alguns exemplos de chaves de roteamento válidas: stock.usd.nyse , nyse.vmw , quick.orange.rabbit . A chave está limitada a 255 bytes.

A chave de ligação deve ser especificada de maneira semelhante. O roteamento de mensagens no tipo de tópico trocador é semelhante ao roteador direto do trocador - a mensagem é enviada para a fila com a chave de ligação correspondente à chave de roteamento. No entanto, existem duas diferenças:

  1. * (asterisco) na chave de encadernação só pode ser substituído por uma palavra
  2. # (libra) na chave de encadernação pode ser substituído por zero ou mais palavras

Isso pode ser ilustrado pela seguinte imagem:

imagem
(imagem obtida no site oficial do RabbitMQ )

Neste exemplo, enviaremos mensagens que representam animais. As teclas de roteamento de mensagens consistem em três palavras (e dois pontos). A primeira palavra representa velocidade, a segunda representa cor e a terceira representa espécies.

No nosso exemplo, duas filas são conectadas ao trocador: Q1 com a chave de ligação "* .orange. *" E Q2 com duas chaves de ligação "*. *. Rabbit" e "preguiçoso. #" .

Esses links significarão o seguinte roteamento:

  • O primeiro trimestre está interessado em todos os animais alaranjados.
  • Q2 (rabbit) (lazy)

Uma mensagem com a chave de roteamento “quick.orange.rabbit” será entregue nas duas filas. Uma mensagem com a chave "lazy.orange.elephant" também será entregue a ambos.

No entanto, a mensagem com a tecla "quick.orange.fox" será exibida apenas em primeiro lugar, e com a tecla "lazy.brown.fox" somente no segundo. “Lazy.pink.rabbit” será entregue uma segunda vez, apesar do fato de a chave de roteamento corresponder às duas chaves de ligação.

“Quick.brown.fox” não entra em nenhuma fila, pois a chave de roteamento não corresponde a nenhuma das chaves de ligação. Se você tentar enviar mensagens com o número de palavras na chave de roteamento menor ou maior (por exemplo, "laranja"ou “quick.orange.male.rabbit” ) que na chave de ligação a mensagem será descartada.

O trocador de tipos de tópicos pode se comportar como um trocador de fanout se você especificar # como a chave de ligação . Ou como direto, se você não especificar * * ou # na chave de ligação , mas simplesmente indicar uma palavra.

Juntando o código


Usaremos o permutador de tipo de tópico em nosso sistema de registro. Vamos começar com a suposição de que nossas chaves de roteamento de mensagens se parecerão com "source.strict" . O código será quase o mesmo da parte anterior, aqui está o send.php :

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newTopicExchange('topic-logs');

   $routingKey = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';

   $message = implode(' ', array_slice($argv, 2));
   $message = empty($message) ? "Hello World!" : $message;

   $producer->publish($message, $routingKey, 'topic-logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Código worker-1.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();

   $consumer->newTopicExchange('topic-logs');

   $bindingKeys = array_slice($argv, 1);
   if (empty($bindingKeys)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
      exit(1);
   }

   foreach ($bindingKeys as $key) {
      $producer->queue('queue-1')->bind('topic-logs', $key);
   }

   $consumer->consume('queue-1');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Código worker-2.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();

   $consumer->newTopicExchange('topic-logs');

   $bindingKeys = array_slice($argv, 1);
   if (empty($bindingKeys)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
      exit(1);
   }

   foreach ($bindingKeys as $key) {
      $producer->queue('queue-2')->bind('topic-logs', $key);
   }

   $consumer->consume('queue-2');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Para receber apenas mensagens críticas do primeiro trabalhador, ligue para

php worker-1.php "*.critical"

Para ligar a fila que o segundo trabalhador usa, chame o trocador com duas chaves de ligação:

php worker-2.php "kern.*" "*.critical"

Para enviar uma mensagem, faça algo como:

php send.php "kern.critical" "A critical kernel error"

Experimente com esses programas.

All Articles