将RabbitMQ与MonsterMQ结合使用第3部分

上一篇文章中,我们创建了一个任务队列。它假定将一项消息形式的任务传递给一个收件人。在本文中,我们将做其他事情,我们将一次向多个收件人发送一条消息。我们将创建一个包含两个程序的日志记录系统:第一个将发送消息,第二个将接收并显示它们。在我们的系统中,所有正在运行的收件人都会收到发件人发送的消息。

交易所(交易所)


在先前的文章中,我们从队列发送和接收消息,现在是时候在RabbitMQ中引入完整的消息转发模型了。

让我们快速浏览上一篇文章中介绍的内容:

  • 生产者-发送消息的应用程序
  • 队列-存储消息的缓冲区
  • 使用者-接收和处理消息的应用程序

RabbitMQ(或更确切地说是AMQP)的关键思想是发送者(生产者)永远不会直接将消息发送到队列。而且,他甚至都不知道队列中是否有队列。而是,发件人将消息发送到交换器(Exchange)。交换器是一件非常简单的事情,它完成了两件事:它接收来自发送者的消息并将其重定向到队列。交换器的类型不同:一些将消息发送到特定队列(直接类型),另一些一次将同一消息发送到多个队列(扇出类型),其他则根据特定的,指定的重定向规则将消息重定向到队列(主题类型)。

图片
(图片取自RabbitMQ官方网站

让我们看一下扇出型交换器。为了创建它,请编写以下代码:

$producer->newFanoutExchange('logs');

扇出交换器按照非常简单的方案工作,它只是将接收到的消息发送到与其连接的所有队列。

要列出服务器上现有的所有交换器,请调用rabbitmqctl

sudo rabbitmqctl list_exchanges

此列表将包含amq。*交换器和标准的无名(空行)交换器。不要关注它们,我们还不需要它们。

在上一篇文章中,我们对交换器一无所知,但是仍然能够发送消息。发生这种情况的原因是,如果在MonsterMQ中没有将交换器明确指定为Producer :: publish()方法的第三个参数,则库将使用标准的无名交换器RabbitMQ,该交换器在队列中发送消息,消息的名称将作为第二个参数传递给Producer :: publish()方法

现在,我们可以通过将其名称指定为Producer :: publish()方法的第三个参数,将消息发送到新的交换器

$producer->publish($message, '', 'logs');

现在,让我们创建两个带有两个队列的工人,只要宣布他们的工人处于活动状态即可。您可以按照以下步骤进行操作:

//Worker 1
$producer->queue('queue-1')->setExclusive()->declare();

//Worker 2
$producer->queue('queue-2')->setExclusive()->declare();

现在,当宣布队列的工作人员结束会话并断开连接时,它将自动被删除。如果两个工作人员都完成工作,则两个队列都将被删除。

我们将队列与交换器连接


我们已经创建了一个交换和一个队列。现在,我们只需要告诉交换器将接收到的消息发送到我们的队列即可。我们需要绑定交换器和队列。为此,请编写以下代码:

//Worker 1
$producer->queue('queue-1')->bind('logs');

//Worker 2
$producer->queue('queue-2')->bind('logs');

从现在开始,我们的日志交换器将连接到我们的队列,并将接收到的消息转发给它们。

您可以在linux上使用以下命令列出所有现有链接

rabbitmqctl list_bindings

将代码放在一起


与上一课相比,我们的发送者脚本没有太大变化。主要区别在于,它现在将消息发送到先前宣布的交换器,而不是标准消息(可从包装箱中访问)。这是send.php代码

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newFanoutExchange('logs');

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, '', 'logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

值得一提的是,通过将消息发送到与任何队列都不相关的交换器,消息将丢失。但现在它很适合我们,因为我们还没有启动我们的接收者。

这是我们第一个worker-1.php worker的代码

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();
   $producer->queue('queue-1')->bind('logs');

   $consumer->consume('queue-1');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

这是第二个worker-2.php worker的代码

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();
   $producer->queue('queue-2')->bind('logs');

   $consumer->consume('queue-2');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

如果要将发送给第一个工作程序的消息保存在日志文件中,只需在控制台中调用以下命令:

php worker-1.php > logs_from_rabbit.log

要同时启动工作程序并在终端窗口中显示消息,请在各自的窗口中调用以下命令:

php worker-1.php

php worker-2.php

要发送消息,请调用以下命令:

php send.php

下一课中,我们将研究如何从所有已发送消息中仅获取消息的特定子集。

All Articles