Usando RabbitMQ com MonsterMQ Parte 2

imagem
(Imagem retirada do site oficial do RabbitMQ )

No primeiro artigo, escrevemos dois programas PHP que usam o RabbitMQ: um enviado mensagens, o segundo recebido. Neste artigo, discutiremos como criar uma fila que distribuirá tarefas que consomem uma quantidade significativa de tempo entre muitos trabalhadores (manipuladores de mensagens).

A idéia principal não é concluir tarefas que consomem uma quantidade significativa de tempo imediatamente, mas adicioná-las à fila na forma de uma mensagem. Posteriormente, o trabalhador que trabalha recebe uma mensagem da fila, que consome e executa a tarefa como se estivesse em segundo plano.
Esse conceito é muito relevante para aplicativos da Web, onde, por exemplo, você pode precisar executar algumas tarefas demoradas, cujo resultado não é imediatamente necessário, como enviar um email ou fazer uma solicitação HTTP para um aplicativo de terceiros (por exemplo, via libcurl para PHP).

Cozinhando


Nesta parte, enviaremos mensagens que representarão tarefas para os trabalhadores. Como não temos tarefas reais, como processar imagens ou gerar arquivos pdf, fingiremos que estamos ocupados usando a função sleep () . Tomaremos o número de pontos na mensagem como a complexidade da tarefa. Cada ponto significa um segundo de trabalho, por exemplo, a mensagem Olá ... levará três segundos de trabalho. Modificaremos

levemente nosso script send.php da última lição para que ele possa enviar mensagens arbitrárias.

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->queue('test-queue')->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Nosso script receive.php também precisa ser alterado. Ele deve simular a operação por um segundo para cada ponto na mensagem recebida. Vamos renomear o arquivo para worker.php e escrever o seguinte código:

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('test-queue')->declare();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Em seguida, execute os dois scripts em diferentes terminais:

# Shell 1
php worker.php 

# Shell 2
php send.php "A very hard task which takes two seconds.."

Uma das vantagens do uso de filas de tarefas é a capacidade de distribuir o trabalho entre muitos trabalhadores. Vamos tentar executar dois scripts worker.php de umavez em dois terminais diferentes. E no terceiro, enviaremos mensagens com o script send.php

# Shell 1
php worker.php 

# Shell 2
php worker.php 

# Shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

A seguir, vamos ver o que nossos trabalhadores trouxeram:
# shell 1
php worker.php
# Received: First message.
# Done
# Received: Third message...
# Done
# Received: Fifth message.....
# Done

# shell 2
php worker.php
# Received: Second message..
# Done
# Received: Fourth message....
# Done

Por padrão, o RabbitMQ enviará cada mensagem subsequente para o próximo destinatário consumidor, por sua vez. Em média, cada destinatário receberá o mesmo número de mensagens, esse método de distribuição de mensagens é chamado round-robin (em um círculo). Tente isso com três ou mais trabalhadores.

Confirmação de mensagem


As tarefas podem levar um certo tempo. Talvez você esteja interessado no que acontecerá com a mensagem se terminar o trabalho do destinatário que não conseguiu processar essa mensagem até o fim. Com o nosso código atual, a mensagem será retornada para a fila, pois o reconhecimento de mensagens é ativado por padrão no MonsterMQ.

Quando usamos a confirmação da mensagem, informamos ao RabbitMQ que a mensagem foi processada e que ela tem o direito de removê-la da fila. Se o destinatário tiver concluído seu trabalho sem enviar confirmação (por exemplo, como resultado de um encerramento inesperado da conexão TCP), o RabbitMQ entenderá que a mensagem não foi processada e a retornará à fila, tentando entregá-la a outros trabalhadores disponíveis. Portanto, você pode ter certeza de que as mensagens não serão perdidas, mesmo no caso de um desligamento inesperado de qualquer um dos trabalhadores.

Para desativar o reconhecimento de mensagens no MonsterMQ, você pode passar true como o segundo argumento para consumir ()

$consumer->consume('test-queue', true);

Esquecer a confirmação das mensagens recebidas é um erro bastante comum e facilmente tolerável, que pode levar a sérias conseqüências. Se isso acontecer, as mensagens serão entregues repetidamente e também se acumularão nas filas, ocupando cada vez mais memória. Para depurar esse erro, use rabbitmqctl para exibir o campo messages_unacknowledged

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

No windows, solte sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Segurança da fila


Apesar do fato de que nossas mensagens não serão perdidas se algum dos trabalhadores encerrar seu trabalho inesperadamente, ainda assim, ainda podemos perder as filas criadas se o RabbitMQ for desligado.

Para se proteger contra a perda de filas, é necessário declarar a fila como durável (persistente, durável). Como as filas são idempotentes, ou seja, não podemos alterá-lo ou recriá-lo chamando o método de declaração com o mesmo nome, teremos que declarar uma nova fila. Vamos fazer o seguinte

$consumer->queue('new-queue')->setDurable()->declare();

Lembre-se de alterar também o código de anúncio da fila no código do remetente.

Distribuição justa de mensagens


Você deve ter notado que as mensagens entre nossos dois funcionários ainda não são distribuídas com bastante honestidade. Se, por exemplo, todas as mensagens pares são demoradas e todas as mensagens ímpares são processadas rapidamente, um trabalhador que recebe mensagens demoradas estará sempre ocupado, enquanto o segundo ficará ocioso. Para evitar isso, você pode usar qualidade de serviço . Vamos adicionar a seguinte linha ao nosso código

$consumer->qos()->prefetchCount(1)->perConsumer()->apply();

Esta linha diz ao RabbitMQ para não enviar mensagens ao destinatário até que ele processe e reconheça a mensagem atual. perConsumer () aplica qualidade de serviço a todos os canais do destinatário, use o método perChannel () se desejar aplicar qualidade de serviço apenas ao canal atual.

imagem
(Imagem retirada do site oficial do RabbitMQ )

Reunindo todo o código


Será parecido com o nosso send.php (remetente)

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

E assim o destinatário worker.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $consumer->qos()->prefetchCount(1)->perConsumer()->apply();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Isso é tudo: na próxima lição, aprenderemos como enviar mensagens para muitos destinatários da mesma fila.

All Articles