Utilisation de RabbitMQ avec MonsterMQ Partie 2

image
(Image prise sur le site officiel de RabbitMQ )

Dans le premier article, nous avons écrit deux programmes PHP qui utilisent RabbitMQ: l'un envoyé des messages, le second reçu. Dans cet article, nous expliquerons comment créer une file d'attente qui répartira les tâches qui consomment beaucoup de temps entre de nombreux travailleurs (gestionnaires de messages).

L'idée principale n'est pas de terminer immédiatement des tâches qui consomment beaucoup de temps, mais de les ajouter à la file d'attente sous forme de message. Plus tard, le travailleur actif reçoit un message de la file d'attente, qu'il consomme et exécute la tâche comme s'il était en arrière-plan.
Ce concept est très pertinent pour les applications Web, où, par exemple, vous devrez peut-être effectuer des tâches fastidieuses, dont le résultat n'est pas immédiatement requis, telles que l'envoi d'un e-mail ou l'envoi d'une demande HTTP à une application tierce (par exemple, via libcurl à PHP).

Cuisine


Dans cette partie, nous enverrons des messages qui représenteront des tâches pour les travailleurs. Comme nous n'avons pas de tâches réelles, comme le traitement d'images ou la génération de fichiers pdf, nous prétendons que nous sommes occupés à utiliser la fonction sleep () . Nous prendrons le nombre de points dans le message comme la complexité de la tâche. Chaque point signifiera une seconde de travail, par exemple, le message Bonjour ... prendra trois secondes de travail.

Nous allons légèrement modifier notre script send.php de la dernière leçon afin qu'il puisse envoyer des messages arbitraires.

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->queue('test-queue')->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Notre script receive.php doit également être modifié. Il doit simuler le fonctionnement pendant une seconde pour chaque point du message reçu. Renommons le fichier en worker.php et écrivons-y le code suivant:

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('test-queue')->declare();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Ensuite, exécutez les deux scripts dans différents terminaux:

# Shell 1
php worker.php 

# Shell 2
php send.php "A very hard task which takes two seconds.."

L'un des avantages de l'utilisation des files d'attente de tâches est la possibilité de partager le travail entre de nombreux travailleurs. Essayons d'exécuter deux scripts worker.php à la fois dans deux terminaux différents. Et dans le troisième, nous enverrons des messages avec le script send.php

# Shell 1
php worker.php 

# Shell 2
php worker.php 

# Shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

Voyons ensuite ce que nos travailleurs ont fait ressortir:
# shell 1
php worker.php
# Received: First message.
# Done
# Received: Third message...
# Done
# Received: Fifth message.....
# Done

# shell 2
php worker.php
# Received: Second message..
# Done
# Received: Fourth message....
# Done

Par défaut, RabbitMQ enverra tour à tour chaque message suivant au prochain destinataire consommateur. En moyenne, chaque destinataire recevra le même nombre de messages, cette méthode de distribution des messages est appelée round-robin (en cercle). Essayez ceci avec trois travailleurs ou plus.

Confirmation du message


Les tâches peuvent prendre un certain temps. Peut-être que vous êtes intéressé par ce qui arrivera au message si vous terminez le travail du destinataire qui n'a pas réussi à traiter ce message à la fin. Avec notre code actuel, le message sera renvoyé dans la file d'attente, car l'accusé de réception du message est activé par défaut dans MonsterMQ.

Lorsque nous utilisons la confirmation de message, nous informons RabbitMQ que le message a été traité et qu'il a le droit de le supprimer de la file d'attente. Si le destinataire a terminé son travail sans envoyer de confirmation (par exemple, à la suite d'une interruption inattendue de la connexion TCP), RabbitMQ comprendra que le message n'a pas été traité et le renverra dans la file d'attente, en essayant de le livrer à d'autres travailleurs disponibles. Ainsi, vous pouvez être sûr que les messages ne seront pas perdus, même en cas d'arrêt inattendu de l'un des employés.

Pour désactiver l'accusé de réception de message dans MonsterMQ, vous pouvez passer true comme deuxième argument à consume ()

$consumer->consume('test-queue', true);

Oublier la confirmation des messages reçus est une erreur assez courante et facilement tolérée, qui peut entraîner de graves conséquences. Si cela se produit, les messages seront remis encore et encore et s'accumuleront également dans les files d'attente occupant de plus en plus de mémoire. Pour déboguer cette erreur, utilisez rabbitmqctl pour afficher le champ messages_unacknowledged

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Sous Windows, déposez sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Sécurité des files d'attente


Malgré le fait que nos messages ne seront pas perdus si l'un des employés met fin inopinément à son travail, nous pouvons néanmoins perdre les files d'attente créées si RabbitMQ est fermé.

Pour vous protéger contre la perte de files d'attente, vous devez déclarer la file d'attente comme durable (persistante, durable). Étant donné que les files d'attente sont idempotentes, c'est-à-dire que nous ne pouvons pas les modifier ou les recréer en appelant la méthode de déclaration avec le même nom, nous devrons déclarer une nouvelle file d'attente. Faisons-le comme suit

$consumer->queue('new-queue')->setDurable()->declare();

N'oubliez pas de modifier également le code d'annonce de file d'attente dans le code de l'expéditeur.

Distribution équitable des messages


Vous avez peut-être remarqué que les messages entre nos deux employés ne sont toujours pas distribués assez honnêtement. Si, par exemple, chaque message pair prend du temps et que chaque message impair est traité rapidement, alors un travailleur recevant des messages chronophages sera toujours occupé, tandis que le second sera inactif. Pour éviter cela, vous pouvez utiliser la qualité de service . Ajoutons la ligne suivante à notre code

$consumer->qos()->prefetchCount(1)->perConsumer()->apply();

Cette ligne indique à RabbitMQ de ne pas envoyer de messages au destinataire tant qu'il n'a pas traité et accusé réception du message actuel. perConsumer () applique la qualité de service à tous les canaux du destinataire, utilisez la méthode perChannel () si vous souhaitez appliquer la qualité de service uniquement au canal actuel.

image
(Image prise sur le site officiel de RabbitMQ )

Assembler tout le code


Cela ressemblera à notre send.php (expéditeur)

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Et donc le destinataire worker.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $consumer->qos()->prefetchCount(1)->perConsumer()->apply();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

C'est tout, dans la prochaine leçon, nous apprendrons comment envoyer des messages à de nombreux destinataires à partir de la même file d'attente.

All Articles