Verwenden von RabbitMQ mit MonsterMQ Teil 5

Im vorherigen Teil haben wir unser Protokollierungssystem verbessert. Anstatt den Fanout-Typ-Austauscher zu verwenden, haben wir den direkten Typ-Austauscher verwendet, mit dem wir selektiv Nachrichten empfangen konnten. Trotz der Verbesserungen weist unser System immer noch Einschränkungen auf. Beispielsweise können wir keine Nachrichten empfangen, die auf mehreren Kriterien basieren. In unserem System möchten wir beispielsweise Nachrichten möglicherweise nicht nur basierend auf dem Schweregrad der Nachricht, sondern auch basierend auf der Nachrichtenquelle umleiten. Zum Beispiel wie im Syslog- Unix-Tool, der Nachrichten nicht nur abhängig vom Schweregrad (info / warn / krit ..), sondern auch abhängig von der Quelle (auth / cron / kern ...) umleitet. Dies kann uns zusätzliche Flexibilität geben, zum Beispiel können wir nur kritische Nachrichten von 'cron' empfangen, aber auch alle Nachrichten von 'kern'. Um ein solches System zu implementieren, haben wir mit einer komplexeren Art von Tauscher kennen lernen - Thema .

Themenaustauscher


An den Thementyp-Austauscher gesendete Nachrichten sollten einen Routing-Schlüssel haben, bei dem es sich um eine Reihe von Wörtern handelt, die durch Punkte getrennt sind. Wörter können beliebig sein, sind jedoch normalerweise mit bestimmten Merkmalen der Nachricht verknüpft. Hier einige Beispiele für gültige Routing-Schlüssel: "stock.usd.nyse" , "nyse.vmw" , "quick.orange.rabbit" . Der Schlüssel ist auf 255 Bytes begrenzt.

Der Bindungsschlüssel sollte auf ähnliche Weise angegeben werden. Das Routing von Nachrichten im Thementypaustauscher ähnelt dem Routing des direkten Typaustauschers - die Nachricht wird an die Warteschlange gesendet, wobei der Bindungsschlüssel mit dem Routingschlüssel übereinstimmt. Es gibt jedoch zwei Unterschiede:

  1. * (Sternchen) im Bindungsschlüssel kann nur durch ein Wort ersetzt werden
  2. # (Pfund) im Bindungsschlüssel kann durch null oder mehr Wörter ersetzt werden

Dies kann durch das folgende Bild veranschaulicht werden:

Bild
(Bild von der offiziellen RabbitMQ-Website )

In diesem Beispiel senden wir Nachrichten, die Tiere darstellen. Nachrichtenroutingschlüssel bestehen aus drei Wörtern (und zwei Punkten). Das erste Wort steht für Geschwindigkeit, das zweite für Farbe und das dritte für Arten.

In unserem Beispiel sind zwei Warteschlangen mit dem Austauscher verbunden: Q1 mit dem Bindungsschlüssel "* .orange. *" Und Q2 mit zwei Bindungsschlüsseln "*. *. Rabbit" und "faul. #" .

Diese Links bedeuten folgendes Routing:

  • Q1 interessiert sich für alle orangefarbenen Tiere.
  • Q2 (rabbit) (lazy)

In beiden Warteschlangen wird eine Nachricht mit dem Routing-Schlüssel "quick.orange.rabbit" zugestellt . Eine Nachricht mit dem Schlüssel "lazy.orange.elephant" wird ebenfalls an beide gesendet .

Die Nachricht mit dem Schlüssel "quick.orange.fox" wird jedoch nur an erster Stelle und mit dem Schlüssel "lazy.brown.fox" nur an zweiter Stelle angezeigt . "Lazy.pink.rabbit" wird zum zweiten Mal ausgeliefert, obwohl der Routing-Schlüssel mit beiden Bindungsschlüsseln übereinstimmt.

"Quick.brown.fox" wird nicht in eine Warteschlange gestellt, da der Routing-Schlüssel keinem der Bindungsschlüssel entspricht. Wenn Sie versuchen, Nachrichten mit einer kleineren oder größeren Anzahl von Wörtern im Routing-Schlüssel zu senden (z. B. "orange")oder "quick.orange.male.rabbit" ) als im Bindungsschlüssel wird die Nachricht verworfen.

Der Thementyp-Austauscher kann sich wie ein Fanout-Typ-Austauscher verhalten, wenn Sie # als Bindungsschlüssel angeben . Oder als direkt, wenn Sie im Bindungsschlüssel nicht * * oder # angeben , sondern einfach ein Wort angeben.

Code zusammenfügen


Wir werden den Thementyp-Austauscher für unser Protokollierungssystem verwenden. Beginnen wir mit der Annahme, dass unsere Nachrichtenrouting-Schlüssel wie "source.strict" aussehen . Der Code ist fast der gleiche wie im vorherigen Teil, hier ist send.php :

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newTopicExchange('topic-logs');

   $routingKey = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';

   $message = implode(' ', array_slice($argv, 2));
   $message = empty($message) ? "Hello World!" : $message;

   $producer->publish($message, $routingKey, 'topic-logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Code worker-1.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();

   $consumer->newTopicExchange('topic-logs');

   $bindingKeys = array_slice($argv, 1);
   if (empty($bindingKeys)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
      exit(1);
   }

   foreach ($bindingKeys as $key) {
      $producer->queue('queue-1')->bind('topic-logs', $key);
   }

   $consumer->consume('queue-1');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Code worker-2.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-2')->setExclusive()->declare();

   $consumer->newTopicExchange('topic-logs');

   $bindingKeys = array_slice($argv, 1);
   if (empty($bindingKeys)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
      exit(1);
   }

   foreach ($bindingKeys as $key) {
      $producer->queue('queue-2')->bind('topic-logs', $key);
   }

   $consumer->consume('queue-2');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Rufen Sie an, um nur kritische Nachrichten vom ersten Mitarbeiter zu erhalten

php worker-1.php "*.critical"

Rufen Sie den Austauscher mit zwei Bindeschlüsseln auf, um die vom zweiten Worker verwendete Warteschlange zu binden:

php worker-2.php "kern.*" "*.critical"

Um eine Nachricht zu senden, gehen Sie wie folgt vor:

php send.php "kern.critical" "A critical kernel error"

Experimentieren Sie mit diesen Programmen.

All Articles