Trabalho inteligente com RabbitMQ no NestJS

Ao desenvolver sistemas financeiros, nem sempre são adequados mecanismos padrão para o processamento de tarefas concluídas na maioria das implementações do protocolo AMQP. Em algum momento, encontramos esse problema, mas primeiro as primeiras coisas.

A implementação padrão do RabbitMQ no NestJS facilita o recebimento de mensagens em funções decoradas:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}

Mais detalhes sobre como isso funciona são descritos aqui .
Também o trabalho com filas no Nest foi bem abordado neste artigo sobre Habré .

Parece que algo mais pode ser necessário. No entanto, existem várias desvantagens na implementação atual.

Problema 1


Para enviar o resultado ack para um canal, você precisa puxar manualmente o canal para fora do contexto RmqContext e enviar um sinal para ele.

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();
  channel.ack(originalMsg);
}

Decisão


Use o mesmo padrão usado ao trabalhar com manipuladores http, retornando o resultado da execução diretamente da função como um objeto regular, Promise ou Observable.

Problema 2


Se você precisar enviar o resultado em uma fila separada, em cada controlador que usa o Rabbit, será necessário saber seu nome. Em outras palavras, não há como configurar isso em um local durante a inicialização do módulo e usá-lo implicitamente.

Decisão


No estágio de configuração do módulo, especifique as filas para obter resultados bem-sucedidos das operações e erros. Decidimos enviar os resultados de operações bem-sucedidas em uma fila e os resultados de operações errôneas em outra. Usando a solução para o problema 1, se o valor de retorno, Promise ou Observable for bem-sucedido, o resultado será enviado para a fila de operações bem-sucedidas; caso contrário, a mensagem será rejeitada e cairá na fila de erros. O RabbitMQ facilita isso usando as opções x-dead -letter-exchange e x-dead-letter-routing-key ao criar a fila.

Problema 3


Como usuário da biblioteca, o desenvolvedor precisa conhecer os detalhes do protocolo AMQP para obter o ID da próxima mensagem, entender o que é aceitar e quando chamá-lo, etc.

Decisão


Adicione um decorador para obter o ID da mensagem. Em vez de ack, retorne o resultado da execução da função manipulador.

Problema 4


Talvez o problema mais importante: entregar uma mensagem ao manipulador mais de uma vez. Quando se trata de transações financeiras, esse é um ponto muito importante, pois pode surgir uma situação em que o dinheiro já foi enviado e a operação caiu na última etapa - ao gravar no banco de dados ou ao enviar uma mensagem de reconhecimento ao corretor. Uma das decisões óbvias é quando o consumidor recebe a mensagem, antes de processá-la, grava o ID da mensagem gerado pelo produtor no banco de dados, se já não houver, se houver, e rejeite a mensagem. Mas o protocolo AMQP fornece um sinalizador entregue novamente, identificando se essa mensagem já foi entregue a outros clientes, que podemos usar para detectar mensagens entregues novamente e enviá-las para a fila com erros.Na implementação atual no Nest, não há como não entregar essas mensagens.

Decisão


Não entregue essa mensagem ao manipulador, mas registre o erro no estágio de recebimento da mensagem do driver. Obviamente, esse comportamento pode ser configurado na fase de decoração do método para indicar explicitamente que ainda queremos receber mensagens para esse tipo de ação.

Para resolver todos os problemas acima, sua própria implementação do protocolo foi escrita. Como é a inicialização:

const amqp = await NestFactory.create(
 RabbitModule.forRoot({
   host: process.env.AMQP_QUEUE_HOST,
   port: parseInt(process.env.AMQP_QUEUE_PORT, 10),
   login: process.env.AMQP_QUEUE_LOGIN,
   password: process.env.AMQP_QUEUE_PASSWORD,
   tasksQueueNormal: process.env.AMQP_QUEUE_COMMAND_REQUEST,
   tasksQueueRedelivery: process.env.AMQP_QUEUE_REQUEST_ONCE_DELIVERY,
   deadLetterRoutingKey: process.env.AMQP_QUEUE_COMMAND_REQUEST_DEAD_LETTER,
   deadLetterRoutingKeyRedelivery: process.env.AMQP_QUEUE_COMMAND_REQUEST_ONCE_DELEVERY_DEAD_LETTER,
   exchange: process.env.AMQP_EXCHANGE_COMMAND,
   prefetch: parseInt(process.env.AMQP_QUEUE_PREFETCH, 10),
 }),
);
const transport = amqp.get<RabbitTransport>(RabbitTransport);
app.connectMicroservice({
 strategy: transport,
 options: {},
});

app.startAllMicroservices();

Aqui, indicamos os nomes das filas para entrega de mensagens, resultados e erros, bem como filas individuais que não são sensíveis à entrega novamente.

No nível do controlador, o trabalho é tão semelhante ao trabalho com http

@AMQP(‘say_hey’)
sayHay(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return this.heyService.greet(requestId, q);
}

O resultado da tarefa cairá na fila de resultados assim que este Observável for executado. O parâmetro decorado com @AMQPRequest corresponde ao campo correlacionId do protocolo. Este é um identificador exclusivo para a mensagem entregue.

O parâmetro @AMQPParam corresponde ao próprio corpo da mensagem. Se for JSON, a mensagem chegará na função já convertida no objeto. Se for um tipo simples, a mensagem será enviada como está.

A seguinte mensagem estará na letra morta:

@AMQP(‘say_hey’)
sayHayErr(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return throwError(‘Buy’);
}

E aí


Adicione Reflexão de tipo a AMQPParam para que o corpo da mensagem seja convertido na classe que está sendo passada. Agora é apenas uma casta para digitar.

Todo o código e instruções de instalação estão disponíveis no GitHub .

Quaisquer edições e comentários são bem-vindos.

All Articles