将RabbitMQ与MonsterMQ结合使用第2部分

图片
(图片取自RabbitMQ官方网站

第一篇文章中,我们编写了两个使用RabbitMQ的PHP程序:一个发送消息,第二个接收消息。在本文中,我们将讨论如何创建一个队列,该队列将在许多工作人员(消息处理程序)之间分配消耗大量时间的任务。

主要思想不是立即完成消耗大量时间的任务,而是以消息的形式将它们添加到队列中。后来,工作人员从队列中接收到一条消息,他使用该消息并像在后台一样执行任务。
这个概念与Web应用程序非常相关,例如,您可能需要执行一些耗时的任务,而这些任务的结果并不需要立即执行,例如,向第三方应用程序发送电子邮件或发出HTTP请求(例如,通过libcurl到PHP)。

烹饪


在这一部分中,我们将发送代表工人任务的消息。由于我们没有诸如处理图像或生成pdf文件之类的实际任务,因此我们会假装我们正忙于使用sleep()函数。我们将消息中的点数视为任务的复杂性。每个点表示工作需要一秒钟,例如,消息Hello ...将需要三秒钟的工作。

上一课中,我们将稍微修改send.php脚本,以便它可以发送任意消息。

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->queue('test-queue')->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

我们的receive.php脚本也需要更改。它应该为接收到的消息中的每个点模拟一秒钟的操作。让我们将文件重命名为worker.php并在其中编写以下代码:

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('test-queue')->declare();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

接下来,在不同的终端上运行两个脚本:

# Shell 1
php worker.php 

# Shell 2
php send.php "A very hard task which takes two seconds.."

使用任务队列的优点之一是能够在许多工作人员之间分配工作。让我们尝试在两个不同的终端一次运行两个worker.php脚本第三,我们将使用send.php脚本发送消息

# Shell 1
php worker.php 

# Shell 2
php worker.php 

# Shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

接下来,让我们看看我们的工人带来了什么:
# shell 1
php worker.php
# Received: First message.
# Done
# Received: Third message...
# Done
# Received: Fifth message.....
# Done

# shell 2
php worker.php
# Received: Second message..
# Done
# Received: Fourth message....
# Done

默认情况下,RabbitMQ会将每个后续消息依次发送给下一个使用方。平均而言,每个收件人将收到相同数量的消息,这种分发消息的方法称为循环(循环)。与三个或更多工人一起尝试。

留言确认


任务可能需要一定的时间。如果您完成了未能成功处理此消息的收件人的工作,那么您可能会对消息会发生什么感兴趣。使用我们当前的代码,由于MonsterMQ中默认启用了消息确认,因此消息将返回队列。

当我们使用消息确认时,我们告诉RabbitMQ消息已被处理,他有权将其从队列中删除。如果接收者在未发送确认的情况下完成了工作(例如,由于TCP连接的意外终止),RabbitMQ将理解该消息未得到处理,并将其返回到队列,尝试将其传递给其他可用的工作程序。因此,即使在任何工作程序意外关闭的情况下,您也可以确保不会丢失消息。

要在MonsterMQ中禁用消息确认,您可以将true作为第二个参数传递消耗()

$consumer->consume('test-queue', true);

忘记确认收到的消息是一个相当普遍的,容易容忍的错误,它可能导致严重的后果。如果发生这种情况,则消息将一次又一次地传递,并且还将累积在队列中,从而占用越来越多的内存。要调试此错误,请使用rabbitmqctl显示messages_unacknowledged字段

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上,放下sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

队列安全


尽管事实上,如果有任何一个工人意外终止他们的工作,我们的消息也不会丢失,但是,如果RabbitMQ关闭,我们仍然会丢失创建的队列。

为了防止丢失队列,您需要将队列声明为持久(持久,持久)。由于队列是幂等的,也就是说,我们无法通过调用具有相同名称的声明方法来更改或重新创建它,因此我们必须声明一个新队列。让我们做如下

$consumer->queue('new-queue')->setDurable()->declare();

请记住,还要在发送者代码中更改队列公告代码。

信息的公平分配


您可能已经注意到,我们两个工作人员之间的消息仍然没有诚实地分发。例如,如果每条偶数消息都很耗时,并且每条奇数消息都得到了快速处理,则接收耗时消息的一个工作人员将总是很忙,而第二个工作人员将处于空闲状态。为避免这种情况,您可以使用服务质量。让我们在代码中添加以下行

$consumer->qos()->prefetchCount(1)->perConsumer()->apply();

此行告诉RabbitMQ在处理并确认当前消息之前不要将消息发送给收件人。perConsumer()将服务质量应用于接收者的所有通道,如果只想将服务质量应用于当前通道,请使用perChannel()方法

图片
(图片取自RabbitMQ官方网站

将整个代码放在一起


这看起来像我们的send.php(发送方)

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

所以接收者worker.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $consumer->qos()->prefetchCount(1)->perConsumer()->apply();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

就是这样,在下一课中,我们将学习如何从同一队列将消息发送给许多收件人。

All Articles