Travail intelligent avec RabbitMQ dans NestJS

Lors du développement de systèmes financiers, les mécanismes standard de traitement des tâches terminées dans la plupart des implémentations du protocole AMQP ne sont pas toujours adaptés. À un moment donné, nous avons rencontré un tel problème, mais tout d'abord.

L'implémentation standard de RabbitMQ dans NestJS facilite la réception de messages dans des fonctions décorées:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}

Plus de détails sur la façon dont cela fonctionne sont décrits ici .
Le travail avec les files d'attente dans Nest a également été bien couvert dans cet article sur Habré .

Il semblerait que quelque chose d'autre pourrait être nécessaire. Cependant, la mise en œuvre actuelle présente un certain nombre d'inconvénients.

Problème 1


Pour envoyer le résultat d'acquittement à un canal, vous devez retirer manuellement le canal du contexte RmqContext et lui envoyer un signal.

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();
  channel.ack(originalMsg);
}

Décision


Utilisez le même modèle que celui utilisé lors de l'utilisation des gestionnaires http en renvoyant le résultat de l'exécution directement à partir de la fonction en tant qu'objet normal, Promise ou Observable.

Problème 2


Si vous devez envoyer le résultat dans une file d'attente distincte, dans chaque contrôleur qui utilise Rabbit, vous devez connaître son nom. En d'autres termes, il n'y a aucun moyen de configurer cela à 1 endroit pendant l'initialisation du module et de l'utiliser implicitement.

Décision


À l'étape de configuration du module, spécifiez les files d'attente pour les résultats des opérations réussis et pour les erreurs. Nous avons décidé d'envoyer les résultats des opérations réussies dans une file d'attente et les résultats des erreurs dans une autre. En utilisant la solution au problème 1, si la valeur de retour, Promise ou Observable réussit, le résultat est envoyé à la file d'attente d'opération réussie, sinon, le message est rejeté et tombe dans la file d'attente d'erreur, RabbitMQ facilite la tâche en utilisant les options x-dead -letter-exchange et x-dead-letter-routing-key lors de la création de la file d'attente.

Problème 3


En tant qu'utilisateur de bibliothèque, le développeur doit connaître les détails du protocole AMQP pour obtenir l'identifiant du message suivant, comprendre ce qu'est un accusé de réception et quand l'appeler, etc.

Décision


Ajoutez un décorateur pour obtenir l'ID du message. Au lieu d'ack, retournez le résultat de l'exécution à partir de la fonction de gestionnaire.

Problème 4


Peut-être le problème le plus important: transmettre un message au gestionnaire plus d'une fois. En ce qui concerne les transactions financières, c'est un point très important, car une situation peut se produire lorsque l'argent a déjà été envoyé et que l'opération a chuté à la dernière étape - lors de l'écriture dans la base de données ou de l'envoi d'un message d'accusé de réception au courtier de messages. L'une des solutions évidentes consiste à écrire l'ID de message généré par le producteur dans la base de données lorsque le message est reçu par le consommateur, avant de le traiter, s'il n'est pas déjà là, s'il y en a un, puis rejeter le message. Mais le protocole AMQP fournit un indicateur redistribué identifiant si ce message a déjà été remis à d'autres clients, que nous pouvons utiliser pour détecter les messages redistribués et les envoyer à la file d'attente avec des erreurs.Dans l'implémentation actuelle de Nest, il n'y a aucun moyen de ne pas remettre de tels messages.

Décision


Ne remettez pas ce message au gestionnaire, mais enregistrez l'erreur au stade de la réception du message du pilote. Bien sûr, ce comportement peut être configuré au stade de la décoration de la méthode pour indiquer explicitement que nous voulons toujours recevoir des messages pour ce type d'action.

Pour résoudre tous les problèmes ci-dessus, sa propre implémentation du protocole a été écrite. À quoi ressemble l'initialisation:

const amqp = await NestFactory.create(
 RabbitModule.forRoot({
   host: process.env.AMQP_QUEUE_HOST,
   port: parseInt(process.env.AMQP_QUEUE_PORT, 10),
   login: process.env.AMQP_QUEUE_LOGIN,
   password: process.env.AMQP_QUEUE_PASSWORD,
   tasksQueueNormal: process.env.AMQP_QUEUE_COMMAND_REQUEST,
   tasksQueueRedelivery: process.env.AMQP_QUEUE_REQUEST_ONCE_DELIVERY,
   deadLetterRoutingKey: process.env.AMQP_QUEUE_COMMAND_REQUEST_DEAD_LETTER,
   deadLetterRoutingKeyRedelivery: process.env.AMQP_QUEUE_COMMAND_REQUEST_ONCE_DELEVERY_DEAD_LETTER,
   exchange: process.env.AMQP_EXCHANGE_COMMAND,
   prefetch: parseInt(process.env.AMQP_QUEUE_PREFETCH, 10),
 }),
);
const transport = amqp.get<RabbitTransport>(RabbitTransport);
app.connectMicroservice({
 strategy: transport,
 options: {},
});

app.startAllMicroservices();

Ici, nous indiquons les noms des files d'attente pour la remise des messages, des résultats et des erreurs, ainsi que les files d'attente individuelles qui ne sont pas sensibles à la redistribution.

Au niveau du contrôleur, le travail est aussi similaire que de travailler avec http

@AMQP(‘say_hey’)
sayHay(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return this.heyService.greet(requestId, q);
}

Le résultat de la tâche tombera dans la file d'attente des résultats dès que cet observable sera exécuté. Le paramètre décoré avec @AMQPRequest correspond au champ correlationId du protocole. Il s'agit d'un identifiant unique pour le message remis.

Le paramètre @AMQPParam correspond au corps du message lui-même. S'il s'agit de JSON, le message arrivera dans la fonction déjà convertie en objet. S'il s'agit d'un type simple, le message est envoyé tel quel.

Le message suivant sera dans la lettre morte:

@AMQP(‘say_hey’)
sayHayErr(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return throwError(‘Buy’);
}

Quoi de neuf


Ajoutez la réflexion de type à AMQPParam pour que le corps du message soit converti en classe transmise. Maintenant, c'est juste une caste à taper.

Tous les codes et les instructions d'installation sont disponibles sur GitHub .

Toutes modifications et commentaires sont les bienvenus.

All Articles