Verwenden von RabbitMQ mit MonsterMQ Teil 3

In unserem vorherigen Artikel haben wir eine Aufgabenwarteschlange erstellt. Es wird davon ausgegangen, dass eine Aufgabe in Form einer Nachricht an einen Empfänger übermittelt wird. In diesem Artikel werden wir etwas anderes tun, wir werden eine Nachricht an mehrere Empfänger gleichzeitig senden. Wir werden ein Protokollierungssystem erstellen, das aus zwei Programmen besteht: Das erste sendet Nachrichten und das zweite empfängt und zeigt sie an. In unserem System erhalten alle laufenden Empfänger eine vom Absender gesendete Nachricht.

Börsen (Austauscher)


In früheren Artikeln haben wir Nachrichten aus Warteschlangen gesendet und empfangen. Jetzt ist es an der Zeit, ein vollständiges Nachrichtenweiterleitungsmodell in RabbitMQ einzuführen.

Lassen Sie uns kurz durchgehen, was wir in früheren Artikeln behandelt haben:

  • Produzent - eine Anwendung, die Nachrichten sendet
  • Warteschlange - ein Puffer, in dem Nachrichten gespeichert werden
  • Consumer - eine Anwendung, die Nachrichten empfängt und verarbeitet

Die Schlüsselidee von RabbitMQ (oder besser AMQP) ist, dass der Absender (Produzent) niemals eine Nachricht direkt an die Warteschlange sendet. Außerdem weiß er nicht einmal, ob es in der Warteschlange steht. Stattdessen sendet der Absender Nachrichten an Austauscher (Exchange). Der Austauscher ist eine sehr einfache Sache, er macht zwei Dinge: Er empfängt eine Nachricht von Absendern und leitet sie an die Warteschlange weiter. Es gibt verschiedene Arten von Austauschern: Einige senden Nachrichten an eine bestimmte Warteschlange (direkter Typ), andere senden dieselbe Nachricht gleichzeitig an mehrere Warteschlangen (Fanout-Typ), andere leiten Nachrichten basierend auf bestimmten festgelegten Umleitungsregeln (Thementyp) an eine Warteschlange weiter.

Bild
(Bild von der offiziellen RabbitMQ-Website )

Schauen wir uns einen Fanout-Wärmetauscher an. Um es zu erstellen, schreiben Sie den folgenden Code:

$producer->newFanoutExchange('logs');

Der Fanout-Austauscher arbeitet nach einem sehr einfachen Schema. Er sendet die empfangene Nachricht einfach an alle angeschlossenen Warteschlangen.

Rufen Sie rabbitmqctl auf, um alle auf dem Server vorhandenen Austauscher aufzulisten

sudo rabbitmqctl list_exchanges

Diese Liste enthält Amq. * Exchanger und einen standardmäßigen namenlosen (Leerzeilen-) Austauscher. Achten Sie nicht auf sie, wir brauchen sie noch nicht.

Im letzten Artikel wussten wir nichts über Austauscher, konnten aber trotzdem Nachrichten senden. Dies geschah, weil, wenn Sie in MonsterMQ den Austauscher nicht explizit als drittes Argument für die Producer :: Publish () -Methode angeben, die Bibliothek den standardmäßigen namenlosen Austauscher RabbitMQ verwendet, der Nachrichten in Warteschlangen sendet, deren Namen als zweites Argument an die Producer :: Publish () -Methode übergeben werden .

Jetzt können wir Nachrichten an unseren neuen Austauscher senden, indem wir seinen Namen als drittes Argument für die Producer :: Publish () -Methode angeben :

$producer->publish($message, '', 'logs');

Erstellen wir nun zwei Worker mit zwei Warteschlangen, die existieren, solange die Worker, die sie angekündigt haben, aktiv sind. Sie können dies wie folgt tun:

//Worker 1
$producer->queue('queue-1')->setExclusive()->declare();

//Worker 2
$producer->queue('queue-2')->setExclusive()->declare();

Wenn der Mitarbeiter, der die Warteschlange angekündigt hat, die Sitzung beendet und die Verbindung trennt, wird sie automatisch gelöscht. Wenn beide Mitarbeiter ihre Arbeit beenden, werden beide Warteschlangen gelöscht.

Wir verbinden die Warteschlange mit dem Wärmetauscher


Wir haben bereits einen Austausch und eine Warteschlange erstellt. Jetzt müssen wir nur noch den Austauscher anweisen, die empfangenen Nachrichten an unsere Warteschlangen zu senden. Wir müssen den Austauscher und die Warteschlangen binden . Schreiben Sie dazu den folgenden Code:

//Worker 1
$producer->queue('queue-1')->bind('logs');

//Worker 2
$producer->queue('queue-2')->bind('logs');

Von nun an ist unser Protokollaustauscher mit unseren Warteschlangen verbunden und leitet die empfangenen Nachrichten an diese weiter.

Sie können alle vorhandenen Links mit dem folgenden Befehl unter Linux auflisten

rabbitmqctl list_bindings

Code zusammenfügen


Unser Absenderskript hat sich im Vergleich zur vorherigen Lektion nicht wesentlich geändert. Der Hauptunterschied besteht darin, dass jetzt Nachrichten an den zuvor angekündigten Austauscher gesendet werden und nicht an den Standardaustauscher (über die Box zugänglich). Hier ist der send.php Code

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newFanoutExchange('logs');

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, '', 'logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Es ist erwähnenswert, dass durch das Senden einer Nachricht an einen Austauscher, der keiner Warteschlange zugeordnet ist, die Nachricht verloren geht. Aber jetzt passt es zu uns, da wir unsere Empfänger noch nicht gestartet haben.

Hier ist der Code für unseren ersten Arbeiter-1.php-Arbeiter

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();
   $producer->queue('queue-1')->bind('logs');

   $consumer->consume('queue-1');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Hier ist der Code für den zweiten Worker-2.php-Worker

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();
   $producer->queue('queue-2')->bind('logs');

   $consumer->consume('queue-2');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Wenn Sie an den ersten Mitarbeiter gesendete Nachrichten in einer Protokolldatei speichern möchten, rufen Sie einfach den folgenden Befehl in der Konsole auf:

php worker-1.php > logs_from_rabbit.log

Rufen Sie die folgenden Befehle auf, um beide Worker zu starten und Nachrichten in Terminalfenstern anzuzeigen, und zwar jeweils in einem eigenen Fenster:

php worker-1.php

php worker-2.php

Rufen Sie zum Senden von Nachrichten den folgenden Befehl auf:

php send.php

In der nächsten Lektion werden wir uns ansehen, wie nur eine bestimmte Teilmenge von Nachrichten von allen gesendeten Nachrichten abgerufen wird.

All Articles