Verwenden von RabbitMQ mit MonsterMQ Teil 2

Bild
(Bild von der offiziellen RabbitMQ-Website )

Im ersten Artikel haben wir zwei PHP-Programme geschrieben, die RabbitMQ verwenden: eine gesendete Nachricht, die zweite empfangene. In diesem Artikel wird erlÀutert, wie Sie eine Warteschlange erstellen, in der Aufgaben, die viel Zeit in Anspruch nehmen, auf viele Mitarbeiter (Nachrichtenhandler) verteilt werden.

Die Hauptidee besteht nicht darin, Aufgaben, die viel Zeit in Anspruch nehmen, sofort zu erledigen, sondern sie in Form einer Nachricht zur Warteschlange hinzuzufĂŒgen. SpĂ€ter erhĂ€lt der Arbeiter eine Nachricht aus der Warteschlange, die er verbraucht und die Aufgabe wie im Hintergrund ausfĂŒhrt.
Dieses Konzept ist sehr relevant fĂŒr Webanwendungen, bei denen Sie beispielsweise einige zeitaufwĂ€ndige Aufgaben ausfĂŒhren mĂŒssen, deren Ergebnis nicht sofort erforderlich ist, z. B. das Senden von E-Mails oder das Senden einer HTTP-Anfrage an eine Drittanbieteranwendung (z. B. ĂŒber libcurl to) PHP).

Kochen


In diesem Teil senden wir Nachrichten, die Aufgaben fĂŒr Mitarbeiter darstellen. Da wir keine wirklichen Aufgaben haben, wie das Verarbeiten von Bildern oder das Generieren von PDF-Dateien, tun wir so, als wĂ€ren wir mit der Funktion sleep () beschĂ€ftigt . Wir werden die Anzahl der Punkte in der Nachricht als die KomplexitĂ€t der Aufgabe betrachten. Jeder Punkt bedeutet eine Sekunde Arbeit, zum Beispiel dauert die Meldung Hallo ... drei Sekunden Arbeit.

Wir werden unser send.php- Skript aus der letzten Lektion leicht Àndern , damit es beliebige Nachrichten senden kann.

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->queue('test-queue')->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Unser Skript " receive.php" muss ebenfalls geĂ€ndert werden. Es sollte den Betrieb fĂŒr eine Sekunde fĂŒr jeden Punkt in der empfangenen Nachricht simulieren. Benennen wir die Datei in worker.php um und schreiben den folgenden Code hinein:

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('test-queue')->declare();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

FĂŒhren Sie als NĂ€chstes beide Skripte in verschiedenen Terminals aus:

# Shell 1
php worker.php 

# Shell 2
php send.php "A very hard task which takes two seconds.."

Einer der Vorteile der Verwendung von Aufgabenwarteschlangen ist die Möglichkeit, die Arbeit auf viele Mitarbeiter zu verteilen. Versuchen wir, zwei worker.php- Skripte gleichzeitig in zwei verschiedenen Terminals auszufĂŒhren . Und im dritten werden wir Nachrichten mit dem Skript send.php senden

# Shell 1
php worker.php 

# Shell 2
php worker.php 

# Shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

Als nÀchstes wollen wir sehen, was unsere Arbeiter herausgebracht haben:
# shell 1
php worker.php
# Received: First message.
# Done
# Received: Third message...
# Done
# Received: Fifth message.....
# Done

# shell 2
php worker.php
# Received: Second message..
# Done
# Received: Fourth message....
# Done

StandardmĂ€ĂŸig sendet RabbitMQ jede nachfolgende Nachricht nacheinander an den nĂ€chsten verbrauchenden EmpfĂ€nger. Im Durchschnitt erhĂ€lt jeder EmpfĂ€nger die gleiche Anzahl von Nachrichten. Diese Methode zum Verteilen von Nachrichten wird als Round-Robin (in einem Kreis) bezeichnet. Versuchen Sie dies mit drei oder mehr Arbeitern.

NachrichtenbestÀtigung


Aufgaben können eine gewisse Zeit in Anspruch nehmen. Vielleicht interessiert Sie, was mit der Nachricht passieren wird, wenn Sie die Arbeit des EmpfĂ€ngers beenden, der es nicht geschafft hat, diese Nachricht bis zum Ende zu verarbeiten. Mit unserem aktuellen Code wird die Nachricht an die Warteschlange zurĂŒckgegeben, da die NachrichtenbestĂ€tigung in MonsterMQ standardmĂ€ĂŸig aktiviert ist.

Wenn wir die NachrichtenbestĂ€tigung verwenden, teilen wir RabbitMQ mit, dass die Nachricht verarbeitet wurde und das Recht hat, sie aus der Warteschlange zu entfernen. Wenn der EmpfĂ€nger seine Arbeit ohne Senden einer BestĂ€tigung abgeschlossen hat (z. B. aufgrund einer unerwarteten Beendigung der TCP-Verbindung), erkennt RabbitMQ, dass die Nachricht nicht verarbeitet wurde, und gibt sie an die Warteschlange zurĂŒck und versucht, sie an andere verfĂŒgbare Mitarbeiter weiterzuleiten. So können Sie sicher sein, dass Nachrichten auch bei einem unerwarteten Herunterfahren eines der Mitarbeiter nicht verloren gehen.

Um die NachrichtenbestĂ€tigung in MonsterMQ zu deaktivieren, können Sie true als zweites zu konsumierendes Argument ĂŒbergeben ()

$consumer->consume('test-queue', true);

Das Vergessen der BestÀtigung empfangener Nachrichten ist ein ziemlich hÀufiger, leicht tolerierbarer Fehler, der schwerwiegende Folgen haben kann. In diesem Fall werden Nachrichten immer wieder zugestellt und sammeln sich auch in den Warteschlangen an, die immer mehr Speicherplatz beanspruchen. Um diesen Fehler zu debuggen, verwenden rabbitmqctl anzuzeigen das messages_unacknowledged Feld

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Lassen Sie unter Windows Sudo fallen

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Warteschlangensicherheit


Trotz der Tatsache, dass unsere Nachrichten nicht verloren gehen, wenn einer der Mitarbeiter seine Arbeit unerwartet beendet, können wir die erstellten Warteschlangen dennoch verlieren, wenn RabbitMQ heruntergefahren wird.

Um sich vor dem Verlust von Warteschlangen zu schĂŒtzen, mĂŒssen Sie die Warteschlange als dauerhaft (dauerhaft, dauerhaft) deklarieren . Da Warteschlangen idempotent sind, dh wir können sie nicht Ă€ndern oder neu erstellen, indem wir die Deklarationsmethode mit demselben Namen aufrufen, mĂŒssen wir eine neue Warteschlange deklarieren. Machen wir es wie folgt

$consumer->queue('new-queue')->setDurable()->declare();

Denken Sie daran, den Warteschlangenansagecode auch im Absendercode zu Àndern.

Faire Verteilung von Nachrichten


Sie haben vielleicht bemerkt, dass die Nachrichten zwischen unseren beiden Mitarbeitern immer noch nicht ganz ehrlich verteilt sind. Wenn zum Beispiel jede gerade Nachricht zeitaufwĂ€ndig ist und jede ungerade Nachricht schnell verarbeitet wird, ist ein Mitarbeiter, der zeitaufwĂ€ndige Nachrichten empfĂ€ngt, immer beschĂ€ftigt, wĂ€hrend der zweite inaktiv ist. Um dies zu vermeiden, können Sie die ServicequalitĂ€t nutzen . FĂŒgen wir unserem Code die folgende Zeile hinzu

$consumer->qos()->prefetchCount(1)->perConsumer()->apply();

Diese Zeile weist RabbitMQ an, keine Nachrichten an den EmpfÀnger zu senden, bis die aktuelle Nachricht verarbeitet und bestÀtigt wurde. perConsumer () wendet die DienstqualitÀt auf alle KanÀle des EmpfÀngers an. Verwenden Sie die Methode perChannel () , wenn Sie die DienstqualitÀt nur auf den aktuellen Kanal anwenden möchten.

Bild
(Bild von der offiziellen RabbitMQ-Website )

Den gesamten Code zusammenfĂŒgen


Dies sieht aus wie unsere send.php (Absender)

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Und so der EmpfÀnger worker.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $consumer->qos()->prefetchCount(1)->perConsumer()->apply();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

In der nÀchsten Lektion lernen wir, wie Sie Nachrichten aus derselben Warteschlange an viele EmpfÀnger senden.

All Articles