Trabajo inteligente con RabbitMQ en NestJS

Cuando se desarrollan sistemas financieros, los mecanismos estándar para procesar tareas completadas en la mayoría de las implementaciones del protocolo AMQP no siempre son adecuados. En algún momento, encontramos ese problema, pero lo primero es lo primero.

La implementación estándar de RabbitMQ en NestJS facilita la recepción de mensajes en funciones decoradas:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}

Aquí se describen más detalles sobre cómo funciona esto .
También el trabajo con colas en Nest estaba bien cubierto en este artículo sobre Habré .

Parece que se podría necesitar algo más. Sin embargo, hay una serie de desventajas en la implementación actual.

Problema 1


Para enviar el resultado ack a un canal, debe extraer el canal manualmente del contexto RmqContext y enviarle una señal.

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();
  channel.ack(originalMsg);
}

Decisión


Utilice el mismo patrón que se utilizó al trabajar con controladores HTTP al devolver el resultado de la ejecución directamente desde la función como un objeto normal, Promesa u Observable.

Problema 2


Si necesita enviar el resultado en una cola separada, entonces en cada controlador que use Rabbit necesita saber su nombre. En otras palabras, no hay forma de configurar esto en 1 lugar durante la inicialización del módulo y usarlo implícitamente.

Decisión


En la etapa de configuración del módulo, especifique las colas para resultados exitosos de operaciones y para errores. Decidimos enviar los resultados de operaciones exitosas en una cola y los resultados de operaciones erróneas en otra. Usando la solución al problema 1, si el valor de retorno, Promesa u Observable es exitoso, entonces el resultado se envía a la cola de operación exitosa, si no, el mensaje se rechaza y cae en la cola de error, RabbitMQ hace que sea fácil hacerlo usando las opciones x-dead -letter-exchange y x-dead-letter-routing-key al crear la cola.

Problema 3


Como usuario de la biblioteca, el desarrollador necesita conocer los detalles del protocolo AMQP para obtener la identificación del siguiente mensaje, comprender qué es el reconocimiento y cuándo llamarlo, etc.

Decisión


Agregue un decorador para obtener la identificación del mensaje. En lugar de ack, devuelve el resultado de la ejecución desde la función del controlador.

Problema 4


Quizás el problema más importante: entregar un mensaje al controlador más de una vez. Cuando se trata de transacciones financieras, este es un punto muy importante, porque puede surgir una situación en la que el dinero ya ha sido enviado, y la operación cayó en el último paso, al escribir en la base de datos o al enviar un mensaje de confirmación al agente de mensajes. Una de las decisiones obvias es cuando el consumidor recibe el mensaje, antes de procesar el mensaje, escriba el ID del mensaje generado por el productor en la base de datos, si no hay ninguno, si lo hay, luego rechace el mensaje. Pero el protocolo AMQP proporciona un indicador de reenvío que identifica si este mensaje se entregó alguna vez a otros clientes, que podemos usar para detectar mensajes reenviados y enviarlos a la cola con errores.En la implementación actual en Nest, no hay forma de no entregar dichos mensajes.

Decisión


No entregue este mensaje al controlador, pero registre el error en la etapa de recepción del mensaje del controlador. Por supuesto, este comportamiento se puede configurar en la etapa de decorar el método para indicar explícitamente que todavía queremos recibir mensajes para este tipo de acción.

Para resolver todos los problemas anteriores, se escribió su propia implementación del protocolo. Cómo se ve la inicialización:

const amqp = await NestFactory.create(
 RabbitModule.forRoot({
   host: process.env.AMQP_QUEUE_HOST,
   port: parseInt(process.env.AMQP_QUEUE_PORT, 10),
   login: process.env.AMQP_QUEUE_LOGIN,
   password: process.env.AMQP_QUEUE_PASSWORD,
   tasksQueueNormal: process.env.AMQP_QUEUE_COMMAND_REQUEST,
   tasksQueueRedelivery: process.env.AMQP_QUEUE_REQUEST_ONCE_DELIVERY,
   deadLetterRoutingKey: process.env.AMQP_QUEUE_COMMAND_REQUEST_DEAD_LETTER,
   deadLetterRoutingKeyRedelivery: process.env.AMQP_QUEUE_COMMAND_REQUEST_ONCE_DELEVERY_DEAD_LETTER,
   exchange: process.env.AMQP_EXCHANGE_COMMAND,
   prefetch: parseInt(process.env.AMQP_QUEUE_PREFETCH, 10),
 }),
);
const transport = amqp.get<RabbitTransport>(RabbitTransport);
app.connectMicroservice({
 strategy: transport,
 options: {},
});

app.startAllMicroservices();

Aquí indicamos los nombres de las colas para entregar mensajes, resultados y errores, así como las colas individuales que no son sensibles a la nueva entrega.

En el nivel del controlador, el trabajo es tan similar como trabajar con http

@AMQP(‘say_hey’)
sayHay(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return this.heyService.greet(requestId, q);
}

El resultado de la tarea caerá en la cola de resultados tan pronto como se ejecute este Observable. El parámetro decorado con @AMQPRequest corresponde al campo correlationId del protocolo. Este es un identificador único para el mensaje entregado.

El parámetro @AMQPParam coincide con el cuerpo del mensaje. Si es JSON, el mensaje llegará a la función ya convertida en el objeto. Si es un tipo simple, el mensaje se envía tal cual.

El siguiente mensaje estará en la letra muerta:

@AMQP(‘say_hey’)
sayHayErr(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return throwError(‘Buy’);
}

Qué pasa


Agregue Type Reflection a AMQPParam para que el cuerpo del mensaje se convierta a la clase que se pasa. Ahora es solo una casta para escribir.

Todo el código y las instrucciones de instalación están disponibles en GitHub .

Cualquier edición y comentario son bienvenidos.

All Articles