RabbitMQ和Spring Boot的简单集成

在“ Spring框架开发人员 ”课程开始之前,准备了本文的翻译




大家好!

我想与您分享一个开放源代码库,该有助于RabbitMQ与Spring Boot上的应用程序集成。此外,该库提供了一种经过改进的重试新概念(与标准Spring AMQP方法相比)。
简化吗?但是如何?


自动配置


假设我们想使用Spring AMQP与RabbitMQ进行交互(例如,使用SSL)。我们需要创建一些豆类,如ConnectionFactoryRabbitAdminRabbitTemplateAbstractRabbitListenerContainerFactory

但是,想象一下RabbitMQ中有几个虚拟主机。默认情况下,在Spring AMQP,你需要用的手动配置每个虚拟主机创建这样一个生态系统RabbitTemplateRabbitListener



如您所见,所有这些配置都需要时间并且可能令人头疼。

意识到了这个问题,建议的库会自动为您配置所有这些bean。您唯一需要做的就是确定中的配置application.properties,然后魔术就会发生!
哇,但是其中的配置看起来像application.properties什么?

一切都非常简单。首先,属性有一个前缀-this spring.rabbitmq.custom

之后,您必须指定事件的名称。这是非常重要的部分,因为库进行的所有设置都基于该名称。

在事件名称之后,您可以指定属性和值。文档中介绍了所有属性

因此,模板如下: 以下是两个不同连接的示例配置。

spring.rabbitmq.custom.<YOUR_EVENT>.<PROPERTY>=<VALUE>




spring.rabbitmq.custom.some-event.host=localhost
spring.rabbitmq.custom.some-event.port=5672
spring.rabbitmq.custom.some-event.ttlRetryMessage=5000
spring.rabbitmq.custom.some-event.maxRetriesAttempts=5
spring.rabbitmq.custom.some-event.ttlMultiply=2
spring.rabbitmq.custom.some-event.queueRoutingKey=ROUTING.KEY.TEST
spring.rabbitmq.custom.some-event.exchange=ex.custom.direct
spring.rabbitmq.custom.some-event.exchangeType=direct
spring.rabbitmq.custom.some-event.queue=queue.custom.test
spring.rabbitmq.custom.some-event.autoCreate=true
spring.rabbitmq.custom.some-event.concurrentConsumers=1
spring.rabbitmq.custom.some-event.maxConcurrentConsumers=1
spring.rabbitmq.custom.some-event.virtualHost=tradeshift
spring.rabbitmq.custom.some-event.primary=true
spring.rabbitmq.custom.some-event.sslConnection=true
spring.rabbitmq.custom.some-event.tlsKeystoreLocation=file:///etc/tradeshift/your-service/tls-keystore.pkcs12
spring.rabbitmq.custom.some-event.tlsKeystorePassword=${RABBITMQ_PASS_CERT}

spring.rabbitmq.custom.another-event.host=localhost
spring.rabbitmq.custom.another-event.port=5672
spring.rabbitmq.custom.another-event.ttlRetryMessage=5000
spring.rabbitmq.custom.another-event.maxRetriesAttempts=5
spring.rabbitmq.custom.another-event.queueRoutingKey=TEST.QUEUE
spring.rabbitmq.custom.another-event.exchange=ex_test_1
spring.rabbitmq.custom.another-event.exchangeType=direct
spring.rabbitmq.custom.another-event.queue=queue_test_1
spring.rabbitmq.custom.another-event.autoCreate=true
spring.rabbitmq.custom.another-event.concurrentConsumers=1
spring.rabbitmq.custom.another-event.maxConcurrentConsumers=1
spring.rabbitmq.custom.another-event.username=guest
spring.rabbitmq.custom.another-event.password=${RABBITMQ_PASS}

如您所见,我们还没有编写任何代码!
很好,但是您是在谈论新的重播策略,对吗?

是的,我的朋友说!

但是在解释这种新策略之前,让我们看一下RabbitMQ和Spring AMQP的默认行为。

默认情况下,RabbitMQ不提供允许您控制整个消息生命周期的重试处理。

例如,在RabbitMQ 3.8之前,消息头没有用于控制重试次数的属性。

RabbitMQ的默认行为:

  • 如果您尚未定义生存时间(TTL,生存期),那么RabbitMQ将不断尝试将您的消息排队。
  • 如果您定义了TTL但未定义dlx,则在TTL之后,该消息将从队列中删除,并且您将丢失该消息。
  • 如果已定义TTL和dlx,则在TTL之后,消息将发送到dlx中定义交换器


RabbitMQ默认行为

但是,如果我们想要增加TTL(例如,在操作不稳定的情况下)并控制重试次数怎么办?

好问题!

现在该解释一下Spring AQMP和Spring Rabbit Tuning库是如何工作的!

重试策略


Spring AMQP默认


默认情况下 使用Spring AMQP时,您可以使用以下属性定义重试选项。

但是这种方法有问题。默认情况下,当您尝试再次传递消息时,Spring AMQP将阻止您的队列。

可以通过一种绕行方式解决此问题:使用并行性。但是通过这种方式,我们加载了JVM,这不是最好的方法。如果您有五个有问题的消息(如示例中所示),则会再次出现瓶颈,并且我们仍然需要手动确定-bin中@Configuration每个连接和容器的bean

spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=2000
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.multiplier=2
spring.rabbitmq.listener.simple.max-concurrency=5
spring.rabbitmq.listener.simple.concurrency=5


Spring RabbitMQ调整


该库采用不同的方法,并具有单独的重试队列。这样,我们可以使用到期消息参数和x-death参数控制TTL,以控制重试次数。
但是如何?

我们在重试队列中使用dlx概念将消息重新发送到主队列。因此,我们可以访问x-death参数,并且可以以编程方式确定消息的生存期。

注意。由于此库是Spring AMQP的扩展,因此您可以使用默认的重试策略,并且仅将此库用于自动配置Bean。


Spring RabbitMQ优化重试
我可以在没有自动配置的情况下使用重试策略吗?我已经有很多bean,没有时间重写它们。

我们知道世界并不完美,在这种情况下,我们有一个标志允许您禁用自动配置,仅使用我们的方法进行重复处理和其他优势(例如箱管理)。

您可以使用以下属性禁用自动配置:

spring.rabbitmq.enable.custom.autoconfiguration=false

但是我有两个不同的连接,在这种情况下该怎么办?

如果您有多个连接,并且要禁用自动配置,则需要RabbitTemplate为每个连接指定Bean名称在这里描述 您仍然可以使用我们的bean 来简化Spring AMQP的工作。

spring.rabbitmq.custom.<YOUR_EVENT>.rabbitTemplateBeanName= <RABBITTEMPLATE_BEAN_NAME>

RabbitTemplateHandler
很好!所以,我想用这个。我怎样才能做到这一点?您有示例或文档吗?
是!

我们在该存储库中有一个示例项目,您可以在其中查看如何对发布者和订阅者使用该库。

但这很简单,我将在这里举例说明!

发行人


库中有一个类,RabbitTemplateHandler非常易于使用。您需要调用该方法getRabbitTemplate并将虚拟主机作为参数传递,以获取Bean RabbitTemplate之后,您可以调用convertAndSend方法并将其传递给交换和路由关键参数。

注意:交换和路由密钥可以使用注释获得

一个例子

@Value("${spring.rabbitmq.custom.some-event.exchange}")
private String exchangeSomeEvent;

@Value("${spring.rabbitmq.custom.some-event.queueRoutingKey}")
private String routingKeySomeEvent;

@Autowired
private final RabbitTemplateHandler rabbitTemplateHandler;

public void sendMessage(final String message) {
   rabbitTemplateHandler.getRabbitTemplate("some-event").convertAndSend(exchangeSomeEvent, routingKeySomeEvent, message);
}

订户


订户也很简单。您唯一需要做的就是在方法中添加一个注释RabbitListener(默认为Spring AMQP注释)并传递名称containerFactory
如何为虚拟主机找到正确的containerFactory名称?

您不需要知道它,只需传递事件的名称,库就会为您做所有的魔术!该库还具有推荐重试和dlq的功能。

打开它们非常简单。您需要在方法中添加注释EnableRabbitRetryAndDlq,并将属性名称作为参数传递。

注意:您还可以指定重试和dlq处理哪些异常。默认情况下会处理它Exception.class,这意味着处理所有异常。

例:

@RabbitListener(containerFactory = "some-event", queues = "${spring.rabbitmq.custom.some-event.queue}")
@EnableRabbitRetryAndDlq(event = "some-event", exceptions = { IllegalArgumentException.class, RuntimeException.class })
public void onMessage(Message message) {
   ...
}

这就是全部!希望您像我们一样喜欢这个图书馆!

而且,最重要的是:随时贡献或发送反馈!

特别感谢


安德烈·路易斯·戈麦斯
Rogerio Bueno
莱昂纳多·费雷拉



注册免费课程。


All Articles