Clevere Arbeit mit RabbitMQ in NestJS

Bei der Entwicklung von Finanzsystemen sind Standardmechanismen für die Verarbeitung abgeschlossener Aufgaben in den meisten Implementierungen des AMQP-Protokolls nicht immer geeignet. Irgendwann stießen wir auf ein solches Problem, aber das Wichtigste zuerst.

Die Standardimplementierung von RabbitMQ in NestJS erleichtert das Empfangen von Nachrichten in dekorierten Funktionen:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}

Weitere Details dazu finden Sie hier .
Auch die Arbeit mit Warteschlangen in Nest wurde in diesem Artikel über Habré ausführlich behandelt .

Es scheint, dass etwas anderes benötigt werden könnte. Die derzeitige Implementierung weist jedoch eine Reihe von Nachteilen auf.

Problem 1


Um das Bestätigungsergebnis an einen Kanal zu senden, müssen Sie den Kanal manuell aus dem RmqContext-Kontext ziehen und ein Signal an ihn senden.

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();
  channel.ack(originalMsg);
}

Entscheidung


Verwenden Sie dasselbe Muster, das bei der Arbeit mit http-Handlern verwendet wurde, indem Sie das Ergebnis der Ausführung direkt von der Funktion als reguläres Objekt, Promise oder Observable zurückgeben.

Problem 2


Wenn Sie das Ergebnis in einer separaten Warteschlange senden müssen, müssen Sie in jedem Controller, der Rabbit verwendet, dessen Namen kennen. Mit anderen Worten, es ist nicht möglich, dies während der Modulinitialisierung an einer Stelle zu konfigurieren und implizit zu verwenden.

Entscheidung


Geben Sie in der Modulkonfigurationsphase die Warteschlangen für erfolgreiche Vorgänge und für Fehler an. Wir haben beschlossen, die Ergebnisse erfolgreicher Vorgänge in eine Warteschlange und die Ergebnisse fehlerhafter Vorgänge in eine andere zu senden. Unter Verwendung der Lösung für Problem 1 wird das Ergebnis an die erfolgreiche Operationswarteschlange gesendet, wenn der Rückgabewert Promise oder Observable erfolgreich ist. Wenn dies nicht der Fall ist, wird die Nachricht abgelehnt und fällt in die Fehlerwarteschlange. RabbitMQ macht dies mit den Optionen x-dead einfach -letter-Austausch und x-Dead-Letter-Routing-Schlüssel beim Erstellen der Warteschlange.

Problem 3


Als Bibliotheksbenutzer muss der Entwickler die Details des AMQP-Protokolls kennen, um die ID der nächsten Nachricht zu erhalten, zu verstehen, was ack ist und wann er sie aufrufen muss usw.

Entscheidung


Fügen Sie einen Dekorateur hinzu, um die ID der Nachricht zu erhalten. Geben Sie anstelle von ack das Ergebnis der Ausführung von der Handlerfunktion zurück.

Problem 4


Vielleicht das wichtigste Problem: eine Nachricht mehr als einmal an den Handler senden. Wenn es um Finanztransaktionen geht, ist dies ein sehr wichtiger Punkt, da eine Situation auftreten kann, wenn das Geld bereits gesendet wurde und der Vorgang im letzten Schritt fehlgeschlagen ist - beim Schreiben in die Datenbank oder beim Senden einer Bestätigungsnachricht an den Broker. Eine der offensichtlichen Lösungen besteht darin, die vom Produzenten generierte Nachrichten-ID in die Datenbank zu schreiben, wenn die Nachricht vom Verbraucher empfangen wird, bevor die Nachricht verarbeitet wird. Wenn sie nicht bereits vorhanden ist, falls vorhanden, lehnen Sie die Nachricht ab. Das AMQP-Protokoll bietet jedoch ein erneut zugestelltes Flag, das angibt, ob diese Nachricht jemals an andere Clients übermittelt wurde. Mit diesem Flag können wir erneut übermittelte Nachrichten erkennen und sie mit Fehlern an die Warteschlange senden.In der aktuellen Implementierung in Nest gibt es keine Möglichkeit, solche Nachrichten nicht zuzustellen.

Entscheidung


Liefern Sie diese Nachricht nicht an den Handler, sondern protokollieren Sie den Fehler beim Empfang der Nachricht vom Treiber. Natürlich kann dieses Verhalten in der Phase des Dekorierens der Methode so konfiguriert werden, dass explizit angegeben wird, dass für diese Art von Aktion weiterhin Nachrichten empfangen werden sollen.

Um alle oben genannten Probleme zu lösen, wurde eine eigene Implementierung des Protokolls geschrieben. Wie die Initialisierung aussieht:

const amqp = await NestFactory.create(
 RabbitModule.forRoot({
   host: process.env.AMQP_QUEUE_HOST,
   port: parseInt(process.env.AMQP_QUEUE_PORT, 10),
   login: process.env.AMQP_QUEUE_LOGIN,
   password: process.env.AMQP_QUEUE_PASSWORD,
   tasksQueueNormal: process.env.AMQP_QUEUE_COMMAND_REQUEST,
   tasksQueueRedelivery: process.env.AMQP_QUEUE_REQUEST_ONCE_DELIVERY,
   deadLetterRoutingKey: process.env.AMQP_QUEUE_COMMAND_REQUEST_DEAD_LETTER,
   deadLetterRoutingKeyRedelivery: process.env.AMQP_QUEUE_COMMAND_REQUEST_ONCE_DELEVERY_DEAD_LETTER,
   exchange: process.env.AMQP_EXCHANGE_COMMAND,
   prefetch: parseInt(process.env.AMQP_QUEUE_PREFETCH, 10),
 }),
);
const transport = amqp.get<RabbitTransport>(RabbitTransport);
app.connectMicroservice({
 strategy: transport,
 options: {},
});

app.startAllMicroservices();

Hier geben wir die Namen der Warteschlangen für die Zustellung von Nachrichten, Ergebnissen und Fehlern sowie einzelne Warteschlangen an, die nicht für eine erneute Zustellung empfindlich sind.

Auf Controller-Ebene ist die Arbeit genauso ähnlich wie die Arbeit mit http

@AMQP(‘say_hey’)
sayHay(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return this.heyService.greet(requestId, q);
}

Das Ergebnis der Aufgabe wird in die Ergebniswarteschlange gestellt, sobald diese Observable ausgeführt wird. Der mit @AMQPRequest dekorierte Parameter entspricht dem Korrelations-ID-Feld des Protokolls. Dies ist eine eindeutige Kennung für die zugestellte Nachricht.

Der Parameter @AMQPParam entspricht dem Nachrichtentext selbst. Wenn es sich um JSON handelt, kommt die Nachricht in der Funktion an, die bereits in das Objekt konvertiert wurde. Wenn es sich um einen einfachen Typ handelt, wird die Nachricht unverändert gesendet.

Die folgende Nachricht wird im toten Brief stehen:

@AMQP(‘say_hey’)
sayHayErr(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return throwError(‘Buy’);
}

Wie geht's


Fügen Sie AMQPParam Type Reflection hinzu, damit der Nachrichtentext in die übergebene Klasse konvertiert wird. Jetzt ist es nur noch eine Kaste zum Tippen.

Alle Code- und Installationsanweisungen sind auf GitHub verfügbar .

Änderungen und Kommentare sind willkommen.

All Articles