Clever work with RabbitMQ in NestJS

When developing financial systems, standard mechanisms for processing completed tasks in most implementations of the AMQP protocol are not always suitable. At some point, we encountered such a problem, but first things first.

The standard implementation of RabbitMQ in NestJS makes it easy to receive messages in decorated functions:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}

More details on how this works are described here .
Also work with queues in Nest was well covered in this article on Habré .

It would seem that something else might be needed. However, there are a number of disadvantages in the current implementation.

Problem 1


In order to send the ack result to a channel, you need to manually pull the channel out of the RmqContext context and send a signal to it.

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();
  channel.ack(originalMsg);
}

Decision


Use the same pattern that was used when working with http handlers by returning the result of execution directly from the function as a regular object, Promise or Observable.

Problem 2


If you need to send the result in a separate queue, then in each controller that uses Rabbit you need to know its name. In other words, there is no way to configure this in 1 place during module initialization and use it implicitly.

Decision


At the module configuration stage, specify the queues for successful results of operations and for errors. We decided to send the results of successful operations in one queue, and the results of erroneous ones in another. Using the solution to problem 1, if the return value, Promise or Observable is successful, the result is sent to the successful operation queue, if not, the message is rejected and falls into the error queue, RabbitMQ makes it easy to do this using the x-dead options -letter-exchange and x-dead-letter-routing-key when creating the queue.

Problem 3


As a library user, the developer needs to know the details of the AMQP protocol to get the id of the next message, understand what ack is and when to call it, etc.

Decision


Add a decorator to get the id of the message. Instead of ack, return the result of the execution from the handler function.

Problem 4


Perhaps the most important problem: delivering a message to the handler more than once. When it comes to financial transactions, this is a very important point, because a situation may arise when the money has already been sent, and the operation fell at the last step - when writing to the database or sending acknowledgment message to the message broker. One of the obvious solutions is to write the message ID generated by the producer in the database when the message is received by the consumer, before processing the message, if it is not already there, if there is one, then reject the message. But the AMQP protocol provides a redelivered flag identifying whether this message was ever delivered to other clients, which we can use to detect re-delivered messages and send them to the queue with errors.In the current implementation in Nest, there is no way to not deliver such messages.

Decision


Do not deliver this message to the handler, but log the error at the stage of receiving the message from the driver. Of course, this behavior can be made configurable at the stage of decorating the method to explicitly indicate that we still want to receive messages for this type of action.

To solve all of the above problems, its own implementation of the protocol was written. What initialization looks like:

const amqp = await NestFactory.create(
 RabbitModule.forRoot({
   host: process.env.AMQP_QUEUE_HOST,
   port: parseInt(process.env.AMQP_QUEUE_PORT, 10),
   login: process.env.AMQP_QUEUE_LOGIN,
   password: process.env.AMQP_QUEUE_PASSWORD,
   tasksQueueNormal: process.env.AMQP_QUEUE_COMMAND_REQUEST,
   tasksQueueRedelivery: process.env.AMQP_QUEUE_REQUEST_ONCE_DELIVERY,
   deadLetterRoutingKey: process.env.AMQP_QUEUE_COMMAND_REQUEST_DEAD_LETTER,
   deadLetterRoutingKeyRedelivery: process.env.AMQP_QUEUE_COMMAND_REQUEST_ONCE_DELEVERY_DEAD_LETTER,
   exchange: process.env.AMQP_EXCHANGE_COMMAND,
   prefetch: parseInt(process.env.AMQP_QUEUE_PREFETCH, 10),
 }),
);
const transport = amqp.get<RabbitTransport>(RabbitTransport);
app.connectMicroservice({
 strategy: transport,
 options: {},
});

app.startAllMicroservices();

Here we indicate the names of the queues for delivering messages, results and errors, as well as individual queues that are not sensitive to redelivery.

At the controller level, the work is as similar as working with http

@AMQP(‘say_hey’)
sayHay(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return this.heyService.greet(requestId, q);
}

The result of the task will fall into the result queue as soon as this Observable is executed. The parameter decorated with @AMQPRequest corresponds to the correlationId field of the protocol. This is a unique identifier for the delivered message.

The @AMQPParam parameter matches the message body itself. If it is JSON, then the message will arrive in the function already converted to the object. If it is a simple type, then the message is sent as is.

The following message will be in the dead letter:

@AMQP(‘say_hey’)
sayHayErr(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return throwError(‘Buy’);
}

What's up


Add Type Reflection to AMQPParam so that the message body is converted to the class being passed. Now it’s just a caste to type.

All code and installation instructions are available on GitHub .

Any edits and comments are welcome.

All Articles