将RabbitMQ与MonsterMQ结合使用第5部分

上一部分中,我们改进了日志记录系统。我们使用直接类型交换器,而不是使用扇出类型交换器,这使我们能够选择性地接收消息。尽管有所改进,但是我们的系统仍然存在局限性,例如,我们无法基于多个条件接收消息。例如,在我们的系统中,我们可能不仅要根据消息的严重性级别,还要根据消息源来重定向消息。例如,如syslog unix工具中所示,它不仅根据严重性级别(info / warn / crit ..),还根据消息源(auth / cron / kern ...)重定向消息。这可以给我们带来更大的灵活性,例如,我们可以只接收来自“ cron”的关键消息,也可以接收来自“ kern”的所有消息。要实现这样的系统,我们必须熟悉更复杂类型的交换器主题

主题类型交换器


发送到主题类型交换器的消息应具有路由密钥,该路由密钥是一组由点分隔的单词。单词可以是任意的,但通常与消息的某些功能相关。以下是一些有效路由键的示例:“ stock.usd.nyse”“ nyse.vmw”“ quick.orange.rabbit”密钥限制为255个字节。

绑定密钥应以类似的方式指定。主题类型交换器中消息的路由类似于直接类型交换器的路由-将消息与绑定键匹配的路由发送到队列。但是,有两个区别:

  1. 绑定键中的*(星号)只能替换为一个单词
  2. 绑定键中的#(磅)可以用零个或多个单词代替

这可以通过以下图像进行说明:(

图片
图像取自RabbitMQ官方网站

在此示例中,我们将发送代表动物的消息。邮件路由键由三个词(和两个点)组成。第一个单词代表速度,第二个单词代表颜色,第三个单词代表物种。

在我们的示例中,两个队列连接到交换器:带有绑定键“ * .orange。*”的 Q1 和带有两个绑定键“ *。*。Rabbit”“ lazy。#”的 Q2

这些链接将意味着以下路由:

  • Q1对所有橙色动物都感兴趣。
  • Q2 (rabbit) (lazy)

带有路由密钥“ quick.orange.rabbit”的消息将在两个队列中传递。带有密钥“ lazy.orange.elephant”的消息也将同时传递给这两个消息。

但是,带有键“ quick.orange.fox”的消息只会放在第一位,而带有键“ lazy.brown.fox”的消息只会放在第二位。尽管路由密钥与两个绑定密钥都匹配,但“ Lazy.pink.rabbit”将再次发送一次。

“ Quick.brown.fox”将不会进入任何队列,因为路由密钥与任何绑定密钥都不匹配。如果您尝试发送的消息中路由键中的字数较小或较大(例如,“橙色”“ quick.orange.male.rabbit”),则该消息将被丢弃。

如果将指定为绑定键,则主题类型交换器的行为类似于扇出类型交换器或者,如果您未在绑定键中指定* *,而直接指定一个单词,则为直接显示。

将代码放在一起


我们将在我们的日志记录系统中使用主题类型交换器。让我们从一个假设开始,我们的消息路由键看起来像“ source.strict”该代码与上一部分几乎相同,这里是send.php

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newTopicExchange('topic-logs');

   $routingKey = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';

   $message = implode(' ', array_slice($argv, 2));
   $message = empty($message) ? "Hello World!" : $message;

   $producer->publish($message, $routingKey, 'topic-logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

代码worker-1.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();

   $consumer->newTopicExchange('topic-logs');

   $bindingKeys = array_slice($argv, 1);
   if (empty($bindingKeys)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
      exit(1);
   }

   foreach ($bindingKeys as $key) {
      $producer->queue('queue-1')->bind('topic-logs', $key);
   }

   $consumer->consume('queue-1');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

代码worker-2.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();

   $consumer->newTopicExchange('topic-logs');

   $bindingKeys = array_slice($argv, 1);
   if (empty($bindingKeys)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
      exit(1);
   }

   foreach ($bindingKeys as $key) {
      $producer->queue('queue-2')->bind('topic-logs', $key);
   }

   $consumer->consume('queue-2');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

要仅接收第一个工作人员的关键消息,请致电

php worker-1.php "*.critical"

要绑定第二个工作线程使用的队列,请使用两个绑定键调用交换器:

php worker-2.php "kern.*" "*.critical"

要发送消息,请执行以下操作:

php send.php "kern.critical" "A critical kernel error"

试用这些程序。

All Articles