将RabbitMQ与MonsterMQ结合使用第4部分

上一篇文章中,我们编写了一个简单的日志记录系统。在其中,我们一次向多个收件人发送邮件。在本文中,我们将向其添加一个新功能,该功能将使收件人仅接收特定的邮件子集。因此,例如,我们可以仅将关键消息发送到日志文件,同时在终端窗口中显示所有已发送的消息。

队列与交换器之间的链接


在上一篇文章中,我们学习了如何将队列与交换器连接。关于此代码:

$producer->queue('queue-1')->bind('logs');

此外,绑定方法可以接受第二个自变量路由键(路由键)。为了不将其与路由键混淆,路由键采用了Producer :: publish()作为第二个参数,我们将其称为绑定键(binding key)。这是我们可以使用绑定键将交换器与队列绑定的方法:

$producer->queue('queue-2')->bind('logs', 'binding-key');

绑定键的值取决于交换器的类型。上一课中的扇出交换器只是忽略了该值。

直接交换器


我们上一篇文章的日志记录系统会将所有消息发送给所有收件人。

让我们对其稍作更改,以便它根据严重性级别过滤邮件。例如,以便她仅将关键消息写入日志文件,而不会浪费警告和信息消息上的磁盘空间。

扇出交换器不会给我们这样的机会。

但是我们可以使用直接交换器,该交换器仅将消息发送到绑定键与所发送消息的路由键匹配的那些队列(publish()方法的第二个参数)。下图说明了这种作用机理:(

图片
图片取自RabbitMQ官方网站

此图显示了交换器X属于直接类型,有两个队列-一个带有橙色绑定键,第二个带有两个黑色绿色绑定键

在此安装中,具有橙色路由键的消息将首先发送,而具有黑色绿色路由键的消息将被发送到第二。任何其他消息将被丢弃。

多重装订


图片
(图片取自RabbitMQ官方网站

用一个绑定密钥将交换器绑定到多个队列是完全可以接受的。在我们的示例中,我们可以使用黑色绑定键在交换器XQ1队列之间添加连接在这种情况下,我们的直接类型交换器的行为类似于扇出类型交换器,它将使用匹配的绑定键将消息发送到所有队列。带有黑色路由键的消息将同时传递给Q1Q2

讯息发布


我们将对我们的日志记录系统使用上述模型。与其发送消息到扇出交换器,不如将它们发送到直接交换器。我们将消息的严重性级别指示为路由密钥。因此,接收者脚本将能够选择所需严重性级别的消息。但是首先,让我们集中精力发布消息。与往常一样,为此,我们首先需要创建一个交换器。


$producer->newDirectExchange('my-logs');

现在我们可以发送消息了:


$producer->newDirectExchange('my-logs');
$producer->publish($message, $severity, 'my-logs');

为了不使事情复杂化,我们假定严重性级别($严重性)只能是“ info”,“ warning”,“ error”。

订阅


与上一部分一样,我们将创建两个工作人员,他们将宣布队列并将它们与交换器关联,如下所示:


// Worker 1
foreach ($severities as $severity) {
   $consumer->queue('queue-1')->bind('my-logs', $severity);
}

放在一起


图片
(图片取自RabbitMQ官方网站send.php

代码


try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newDirectExchange('my-logs');

   $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

   $message = implode(' ', array_slice($argv, 2));
   $message = empty($message) ? "Hello World!" : $message;

   $producer->publish($message, $severity, 'my-logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

代码worker-1.php


try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();

   $severities = array_slice($argv, 1);
   if (empty($severities)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
      exit(1);
   }

   foreach ($severities as $severity) {
      $producer->queue('queue-1')->bind('my-logs', $severity);
   }

   $consumer->consume('queue-1');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

工人2.PHP代码将不同于工人1.PHP仅与相关联的换热器并传送到所述队列的名称消耗()方法将“ queue-1”替换为“ queue-2”。

现在,如果您希望第一个工作程序将“错误”和“警告”级别的消息保存到文件中,请在终端中运行以下命令:

php worker-1.php warning error > logs_from_rabbit.log

并且如果您希望第二个工作人员在终端窗口中显示所有消息,请执行以下操作:

php worker-2.php info warning error
# => Waiting for logs. To exit press CTRL+C

为了发出错误消息,请编写以下内容:

php send.php error "Run. Run. Or it will explode."

就这样。接下来的部分,我们将学习如何接收基于特定模式的消息。

All Articles