在上一篇文章中,我们创建了一个任务队列。它假定将一项消息形式的任务传递给一个收件人。在本文中,我们将做其他事情,我们将一次向多个收件人发送一条消息。我们将创建一个包含两个程序的日志记录系统:第一个将发送消息,第二个将接收并显示它们。在我们的系统中,所有正在运行的收件人都会收到发件人发送的消息。交易所(交易所)
在先前的文章中,我们从队列发送和接收消息,现在是时候在RabbitMQ中引入完整的消息转发模型了。让我们快速浏览上一篇文章中介绍的内容:- 生产者-发送消息的应用程序
- 队列-存储消息的缓冲区
- 使用者-接收和处理消息的应用程序
RabbitMQ(或更确切地说是AMQP)的关键思想是发送者(生产者)永远不会直接将消息发送到队列。而且,他甚至都不知道队列中是否有队列。而是,发件人将消息发送到交换器(Exchange)。交换器是一件非常简单的事情,它完成了两件事:它接收来自发送者的消息并将其重定向到队列。交换器的类型不同:一些将消息发送到特定队列(直接类型),另一些一次将同一消息发送到多个队列(扇出类型),其他则根据特定的,指定的重定向规则将消息重定向到队列(主题类型)。
(图片取自RabbitMQ官方网站)让我们看一下扇出型交换器。为了创建它,请编写以下代码:$producer->newFanoutExchange('logs');
扇出交换器按照非常简单的方案工作,它只是将接收到的消息发送到与其连接的所有队列。要列出服务器上现有的所有交换器,请调用rabbitmqctlsudo rabbitmqctl list_exchanges
此列表将包含amq。*交换器和标准的无名(空行)交换器。不要关注它们,我们还不需要它们。在上一篇文章中,我们对交换器一无所知,但是仍然能够发送消息。发生这种情况的原因是,如果在MonsterMQ中没有将交换器明确指定为Producer :: publish()方法的第三个参数,则库将使用标准的无名交换器RabbitMQ,该交换器在队列中发送消息,消息的名称将作为第二个参数传递给Producer :: publish()方法。现在,我们可以通过将其名称指定为Producer :: publish()方法的第三个参数,将消息发送到新的交换器:$producer->publish($message, '', 'logs');
现在,让我们创建两个带有两个队列的工人,只要宣布他们的工人处于活动状态即可。您可以按照以下步骤进行操作:
$producer->queue('queue-1')->setExclusive()->declare();
$producer->queue('queue-2')->setExclusive()->declare();
现在,当宣布队列的工作人员结束会话并断开连接时,它将自动被删除。如果两个工作人员都完成工作,则两个队列都将被删除。我们将队列与交换器连接
我们已经创建了一个交换和一个队列。现在,我们只需要告诉交换器将接收到的消息发送到我们的队列即可。我们需要绑定交换器和队列。为此,请编写以下代码:
$producer->queue('queue-1')->bind('logs');
$producer->queue('queue-2')->bind('logs');
从现在开始,我们的日志交换器将连接到我们的队列,并将接收到的消息转发给它们。您可以在linux上使用以下命令列出所有现有链接rabbitmqctl list_bindings
将代码放在一起
与上一课相比,我们的发送者脚本没有太大变化。主要区别在于,它现在将消息发送到先前宣布的交换器,而不是标准消息(可从包装箱中访问)。这是send.php代码try {
$producer = \MonsterMQ\Client\Producer();
$producer->connect('127.0.0.1', 5672);
$producer->logIn('guest', 'guest');
$producer->newFanoutExchange('logs');
$message = implode(' ', array_slice($argv, 1));
$message = empty($message) ? 'Hello world!' : $message;
$producer->publish($message, '', 'logs');
echo "\n Sent {$message} \n";
} catch(\Exception $e) {
var_dump($e);
}
值得一提的是,通过将消息发送到与任何队列都不相关的交换器,消息将丢失。但现在它很适合我们,因为我们还没有启动我们的接收者。这是我们第一个worker-1.php worker的代码try {
$consumer = \MonsterMQ\Client\Consumer();
$consumer->connect('127.0.0.1', 5672);
$consumer->logIn('guest', 'guest');
$producer->queue('queue-1')->setExclusive()->declare();
$producer->queue('queue-1')->bind('logs');
$consumer->consume('queue-1');
$consumer->wait(function ($message, $channelNumber) use ($consumer){
echo "\n $message \n";
});
} catch(\Exception $e) {
var_dump($e);
}
这是第二个worker-2.php worker的代码try {
$consumer = \MonsterMQ\Client\Consumer();
$consumer->connect('127.0.0.1', 5672);
$consumer->logIn('guest', 'guest');
$producer->queue('queue-2')->setExclusive()->declare();
$producer->queue('queue-2')->bind('logs');
$consumer->consume('queue-2');
$consumer->wait(function ($message, $channelNumber) use ($consumer){
echo "\n $message \n";
});
} catch(\Exception $e) {
var_dump($e);
}
如果要将发送给第一个工作程序的消息保存在日志文件中,只需在控制台中调用以下命令:php worker-1.php > logs_from_rabbit.log
要同时启动工作程序并在终端窗口中显示消息,请在各自的窗口中调用以下命令:php worker-1.php
php worker-2.php
要发送消息,请调用以下命令:php send.php
在下一课中,我们将研究如何从所有已发送消息中仅获取消息的特定子集。