在NestJS中巧妙地使用RabbitMQ

在开发金融系统时,在大多数AMQP协议实现中用于处理已完成任务的标准机制并不总是适用。在某些时候,我们遇到了这样的问题,但首先要解决。

NestJS中RabbitMQ的标准实现使您可以轻松接收带有修饰功能的消息:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}

这里描述了如何工作的更多细节
在Habré上的这篇文章中很好地介绍了在Nest中使用队列的工作

似乎还需要其他一些东西。但是,在当前的实现中存在许多缺点。

问题1


为了将确认结果发送到通道,您需要手动将通道从RmqContext上下文中拉出并向其发送信号。

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();
  channel.ack(originalMsg);
}

决断


通过直接从函数作为常规对象Promise或Observable返回执行结果,使用与HTTP处理程序一起使用的模式。

问题2


如果需要将结果发送到单独的队列中,则在每个使用Rabbit的控制器中,您需要知道其名称。换句话说,在模块初始化期间无法将其配置在1个位置并隐式使用它。

决断


在模块配置阶段,指定队列以获取成功的操作结果和错误。我们决定将成功操作的结果发送到一个队列中,将错误操作的结果发送到另一个队列中。使用问题1的解决方案,如果返回值Promise或Observable成功,则将结果发送到成功的操作队列,如果不成功,则消息被拒绝并落入错误队列,RabbitMQ使用x-dead选项使此操作变得容易创建队列时,请输入-letter-exchange和x-dead-letter-routing-key。

问题3


作为库用户,开发人员需要了解AMQP协议的详细信息,以获取下一条消息的ID,了解什么是ack以及何时调用它,等等。

决断


添加装饰器以获取消息的ID。代替ack,从处理程序函数返回执行结果。

问题4


也许最重要的问题是:多次向处理程序传递消息。对于金融交易,这是非常重要的一点,因为可能会发生以下情况:已汇出钱款,而操作落到了最后一步-写数据库或向消息代理发送确认消息。一种显而易见的解决方案是,当消费者接收到消息时,在处理消息之前(如果消息尚不存在的话),将生产者生成的消息ID写入数据库中,如果不存在,则拒绝该消息。但是AMQP协议提供了一个重新传递的标志,用于标识该消息是否曾经传递给其他客户端,我们可以使用它来检测重新传递的消息并将它们发送到错误队列中。在Nest的当前实现中,没有办法不传递此类消息。

决断


不要将此消息传递给处理程序,而应在从驱动程序接收消息的阶段记录错误。当然,可以在装饰该方法的阶段配置此行为,以明确指示我们仍然希望接收此类操作的消息。

为了解决上述所有问题,编写了该协议自己的实现。初始化如下所示:

const amqp = await NestFactory.create(
 RabbitModule.forRoot({
   host: process.env.AMQP_QUEUE_HOST,
   port: parseInt(process.env.AMQP_QUEUE_PORT, 10),
   login: process.env.AMQP_QUEUE_LOGIN,
   password: process.env.AMQP_QUEUE_PASSWORD,
   tasksQueueNormal: process.env.AMQP_QUEUE_COMMAND_REQUEST,
   tasksQueueRedelivery: process.env.AMQP_QUEUE_REQUEST_ONCE_DELIVERY,
   deadLetterRoutingKey: process.env.AMQP_QUEUE_COMMAND_REQUEST_DEAD_LETTER,
   deadLetterRoutingKeyRedelivery: process.env.AMQP_QUEUE_COMMAND_REQUEST_ONCE_DELEVERY_DEAD_LETTER,
   exchange: process.env.AMQP_EXCHANGE_COMMAND,
   prefetch: parseInt(process.env.AMQP_QUEUE_PREFETCH, 10),
 }),
);
const transport = amqp.get<RabbitTransport>(RabbitTransport);
app.connectMicroservice({
 strategy: transport,
 options: {},
});

app.startAllMicroservices();

在这里,我们指示用于传递消息,结果和错误的队列的名称,以及对重新交付不敏感的单个队列。

在控制器级别,工作与使用http相似

@AMQP(‘say_hey’)
sayHay(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return this.heyService.greet(requestId, q);
}

该Observable一旦执行,任务的结果就会落入结果队列。用@AMQPRequest装饰的参数对应于协议的correlationId字段。这是传递的消息的唯一标识符。

@AMQPParam参数与消息正文本身匹配。如果是JSON,则消息将到达已经转换为对象的函数中。如果是简单类型,则按原样发送消息。

以下消息将以空字母出现:

@AMQP(‘say_hey’)
sayHayErr(@AMQPRequest requestId: string, @AMQPParam q: HeyMessage): Observable<Result> {
 return throwError(‘Buy’);
}

这是怎么回事


将类型反射添加到AMQPParam,以便将邮件正文转换为要传递的类。现在,这只是种姓。

所有代码和安装说明均可在GitHub上获得

欢迎任何编辑和评论。

All Articles