Verwenden von RabbitMQ mit MonsterMQ Teil 4

In einem früheren Artikel haben wir ein einfaches Protokollierungssystem geschrieben. In dem wir Nachrichten an mehrere Empfänger gleichzeitig gesendet haben. In diesem Artikel fügen wir eine neue Funktion hinzu, mit der Empfänger nur eine bestimmte Teilmenge von Nachrichten empfangen können. Aus diesem Grund können wir beispielsweise nur kritische Nachrichten an eine Protokolldatei senden und gleichzeitig alle gesendeten Nachrichten in einem Terminalfenster anzeigen.

Verbindungen zwischen der Warteschlange und dem Austauscher


Im vorherigen Artikel haben wir gelernt, wie eine Warteschlange mit einem Austauscher verbunden wird. Über diesen Code:

$producer->queue('queue-1')->bind('logs');

Darüber hinaus kann die Bindungsmethode einen zweiten Argument- Routing-Schlüssel (Routing-Schlüssel) akzeptieren . Um es nicht mit dem Routing-Schlüssel zu verwechseln, der die Methode Producer :: Publish () als zweiten Parameter verwendet, nennen wir ihn einen Bindungsschlüssel (Bindungsschlüssel). So können wir den Austauscher mit dem Bindungsschlüssel an die Warteschlange binden:

$producer->queue('queue-2')->bind('logs', 'binding-key');

Der Wert des Bindungsschlüssels hängt vom Austauschertyp ab. Der Fanout-Austauscher aus der vorherigen Lektion ignoriert diesen Wert einfach.

Direktaustauscher


Unser Protokollierungssystem aus dem letzten Artikel sendet alle Nachrichten an alle Empfänger.

Lassen Sie es uns leicht ändern, damit Nachrichten basierend auf dem Schweregrad gefiltert werden. Zum Beispiel, damit sie nur kritische Nachrichten in die Protokolldatei schreibt und keinen Speicherplatz für Warn- und Infomeldungen verschwendet.

Der Fanout-Austauscher bietet uns keine solche Gelegenheit.

Wir können jedoch einen direkten Austauscher verwenden, der Nachrichten nur an die Warteschlangen sendet, deren Bindungsschlüssel mit dem Routing-Schlüssel der gesendeten Nachricht übereinstimmt (das zweite Argument für die Methode publish ()). Dieser Wirkungsmechanismus wird durch das folgende Bild veranschaulicht:

Bild
(Bild von der offiziellen Website von RabbitMQ )

Dieses Bild zeigt den Austauscher X.vom direkten Typ und zwei Warteschlangen - eine mit einem orangefarbenen Bindungsschlüssel und die zweite mit zwei schwarzen und grünen Bindungsschlüsseln .

In dieser Installation werden zuerst Nachrichten mit dem orangefarbenen Routing-Schlüssel und Nachrichten mit dem schwarzen oder grünen Routing-Schlüssel an den zweiten gesendet. Alle anderen Nachrichten werden verworfen.

Mehrfachbindung


Bild
(Bild von der offiziellen RabbitMQ-Website )

Es ist durchaus akzeptabel, den Austauscher mit einem Bindungsschlüssel an mehrere Warteschlangen zu binden. In unserem Beispiel können wir mithilfe des schwarzen Bindungsschlüssels eine Verbindung zwischen dem Austauscher X und der Q1- Warteschlange hinzufügen . In diesem Fall verhält sich unser direkter Typaustauscher wie ein Fanout-Typaustauscher und sendet mit dem passenden Bindungsschlüssel Nachrichten an alle Warteschlangen. Eine Nachricht mit dem schwarzen Routing-Schlüssel wird sowohl an Q1 als auch an Q2 gesendet .

Nachrichtenfreigabe


Wir werden das oben beschriebene Modell für unser Protokollierungssystem verwenden. Anstatt Nachrichten an den Fanout-Austauscher zu senden, senden wir sie an den direkten Austauscher. Wir geben den Schweregrad der Nachricht als Routing-Schlüssel an. Somit kann das Empfängerskript Nachrichten mit dem gewünschten Schweregrad auswählen. Aber zuerst konzentrieren wir uns auf das Freigeben von Nachrichten. Dafür müssen wir wie immer zuerst einen Austauscher erstellen.


$producer->newDirectExchange('my-logs');

Jetzt können wir Nachrichten senden:


$producer->newDirectExchange('my-logs');
$producer->publish($message, $severity, 'my-logs');

Um die Sache nicht zu komplizieren, nehmen wir an, dass der Schweregrad ($ Schweregrad) nur 'info', 'warning', 'error' sein kann.

Abonnement


Wie im vorherigen Teil werden wir zwei Mitarbeiter erstellen, die die Warteschlangen ankündigen und sie wie folgt mit dem Austauscher verknüpfen:


// Worker 1
foreach ($severities as $severity) {
   $consumer->queue('queue-1')->bind('my-logs', $severity);
}

Alles zusammenfügen


Bild
(Bild von der offiziellen RabbitMQ-Website ) send.php

code :


try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newDirectExchange('my-logs');

   $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

   $message = implode(' ', array_slice($argv, 2));
   $message = empty($message) ? "Hello World!" : $message;

   $producer->publish($message, $severity, 'my-logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Code worker-1.php :


try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();

   $severities = array_slice($argv, 1);
   if (empty($severities)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
      exit(1);
   }

   foreach ($severities as $severity) {
      $producer->queue('queue-1')->bind('my-logs', $severity);
   }

   $consumer->consume('queue-1');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Der Worker-2.php-Code unterscheidet sich vom Worker-1.php-Code nur im Namen der Warteschlange, die dem Austauscher zugeordnet und an die Consume () -Methode übergeben wird . Ersetzen Sie 'Warteschlange-1' durch 'Warteschlange-2'.

Wenn Sie nun möchten, dass der erste Mitarbeiter Nachrichten der Fehler- und Warnstufe in einer Datei speichert, führen Sie den folgenden Befehl im Terminal aus:

php worker-1.php warning error > logs_from_rabbit.log

Wenn der zweite Mitarbeiter alle Nachrichten im Terminalfenster anzeigen soll, gehen Sie wie folgt vor:

php worker-2.php info warning error
# => Waiting for logs. To exit press CTRL+C

Und um eine Fehlermeldung auszugeben, schreiben Sie Folgendes:

php send.php error "Run. Run. Or it will explode."

Das ist alles. Im nächsten Teil lernen wir, wie Sie Nachrichten basierend auf einem bestimmten Muster empfangen.

All Articles