使用Kafka Streams确保应用程序的高可用性

Kafka Streams是一个Java库,用于分析和处理存储在Apache Kafka中的数据。与任何其他流处理平台一样,它能够实时执行带有和/或不带有状态保存的数据处理。在这篇文章中,我将尝试描述为何在Kafka Streams中实现高可用性(99.99%)是有问题的,以及我们可以做些什么。

我们需要知道什么


在描述问题和可能的解决方案之前,让我们看一下Kafka Streams的基本概念。如果您使用过面向消费者/生产者的Kafka API,那么您对这些范例中的大多数都是熟悉的。在以下各节中,我将尝试用几个词来描述分区中的数据存储,消费者组的重新平衡以及Kafka客户端的基本概念如何适合Kafka Streams库。

Kafka:分区数据


在Kafka世界中,生产者应用程序将数据作为键值对发送到特定主题。该主题本身在Kafka经纪人中分为一个或多个分区。Kafka使用消息键指示应将数据写入哪个分区。因此,具有相同密钥的消息始终以同一分区结尾。

消费者应用程序分为多个消费者组,每个组可以有一个或多个消费者实例。
消费者组中消费者的每个实例负责处理来自输入主题的唯一分区集合的数据。

使用者实例本质上是在一组使用者中扩大处理量的一种方法。

卡夫卡:重新平衡消费群体


如前所述,使用者组的每个实例都接收一组唯一的分区,并从中消费数据。每当新的消费者加入一个小组时,都必须进行重新平衡,以便获得一个分区。当使用者死亡时,会发生同样的事情,其余的使用者应使用其分区以确保所有分区都得到处理。

Kafka Streams:流


在本文的开头,我们熟悉了一个事实,即Kafka Streams库是基于生产者和消费者的API构建的,并且数据处理的组织方式与Kafka上的标准解决方案相同。在Kafka Streams配置中,application.id字段等效于group.id在使用者API中。 Kafka Streams会预先创建一定数量的线程,并且每个线程都会从一个或多个输入主题分区中执行数据处理。用消费者API的术语来说,流实质上与来自同一组的消费者实例重合。线程是扩展Kafka Streams中数据处理的主要方法,可以通过增加一台计算机上每个Kafka Streams应用程序的线程数来垂直完成此操作,或者通过添加具有相同application.id的另一台计算机来水平执行此操作。

图片

资料来源:kafka.apache.org/21/documentation/streams/architecture

Kafka Streams中还有许多其他元素,例如任务,处理拓扑,线程模型等,我们将不在本文中讨论。可以在此处找到更多信息

Kafka Streams:状态存储


在流处理中,存在带有或不带有状态保存的操作。状态使应用程序能够记住超出当前正在处理的记录范围之外的必要信息。

状态操作(例如计数,任何类型的聚合,联接等)要复杂得多。这是由于只有一条记录,您无法确定给定键的最后状态(例如,计数),因此您需要将流的状态存储在应用程序中。如前所述,每个线程处理一组唯一的分区;因此,一个线程仅处理整个数据集的一个子集。这意味着每个具有相同application.id的Kafka Streams应用程序线程都保持其自己的隔离状态。我们不会详细介绍状态在Kafka Streams中的形成方式,但重要的是要了解使用状态更改主题(更改日志主题)来还原状态,不仅将状态保存在本地磁盘上,而且还保存在Kafka Broker中。将状态更改日志保存在Kafka Broker中作为一个单独的主题不仅是为了容错,而且还可以使您轻松地使用相同的application.id部署Kafka Streams的新实例。由于状态在代理方面存储为更改日志主题,因此新实例可以从该主题加载其自己的状态。

有关状态存储的更多信息,请参见此处

为什么高可用性对Kafka Streams有问题?


我们回顾了使用Kafka Streams进行数据处理的基本概念和原理。现在,让我们尝试将所有部分组合在一起,并分析为什么实现高可用性可能会带来问题。在前面的部分中,我们必须记住:

  1. Kafka主题中的数据分为多个分区,这些分区分布在Kafka Streams流之间。
  2. 实际上,具有相同application.id的Kafka Streams应用程序是一组使用者,并且其每个线程都是使用者的独立隔离实例。
  3. 对于状态操作,线程维护其自己的状态,该状态由Kafka主题以更改日志的形式“保留”。
  4. , Kafka , .

TransferWise SPaaS (Stream Processing as a Service)


在强调这篇文章的本质之前,让我首先告诉您我们在TransferWise中创建的内容以及为什么高可用性对我们非常重要。

在TransferWise中,我们有多个用于流处理的节点,每个节点包含每个产品团队的多个Kafka Streams实例。专为特定开发团队设计的Kafka Streams实例具有特殊的application.id,通常具有5个以上的线程。通常,团队在整个集群中通常具有10-20个线程(相当于使用者的实例数)。部署在节点上的应用程序侦听输入主题,并在输入数据有状态和无状态的情况下执行几种类型的操作,并为后续的下游微服务提供实时数据更新。

产品团队需要实时更新汇总数据。为了使我们的客户能够立即转账,这是必要的。我们通常的SLA:
在任何给定的一天中,应在不到10秒的时间内获得99.99%的汇总数据。

为了给您一个想法,在压力测试期间,Kafka Streams每秒能够处理和聚合20,085条输入消息。因此,在正常负载下10秒的SLA听起来是可以实现的。不幸的是,在滚动部署应用程序的节点的更新过程中未达到我们的SLA,下面我将描述发生这种情况的原因。

滑动节点更新


在TransferWise,我们坚信软件的持续交付,并且通常每天发布两次新版本的服务。让我们看一个简单的连续服务更新的示例,看看发布过程中发生了什么。同样,我们必须记住:

  1. Kafka主题中的数据分为多个分区,这些分区分布在Kafka Streams流之间。
  2. 实际上,具有相同application.id的Kafka Streams应用程序是一组使用者,并且其每个线程都是使用者的独立隔离实例。
  3. 对于状态操作,线程维护其自己的状态,该状态由Kafka主题以更改日志的形式“保留”。
  4. , Kafka , .

在单个节点上的释放过程通常需要八到九秒钟。在发行期间,节点上的Kafka Streams实例会“缓慢重启”。因此,对于单个节点,正确重新启动服务所需的时间大约为8到9秒。显然,关闭节点上的Kafka Streams实例会导致使用者组重新平衡。由于数据已分区,因此属于可启动实例的所有分区必须在具有相同application.id的活动Kafka Streams应用程序之间分配。这也适用于已保存到磁盘的聚合数据。在此过程完成之前,将不会处理数据。

备用副本


为了减少Kafka Streams应用程序的重新平衡时间,有一个备份副本的概念,该副本在配置中定义为num.standby.replicas。备份副本是本地状态存储的副本。这种机制可以将状态存储从一个Kafka Streams实例复制到另一个实例。当Kafka Streams线程由于任何原因终止时,状态恢复过程的持续时间可以最小化。不幸的是,由于我将在下面解释的原因,即使备份副本也无法帮助滚动服务更新。

假设我们在两台不同的机器上有两个Kafka Streams实例:node-a和node-b。对于每个Kafka Streams实例,在这2个节点上指示num.standby.replicas = 1.通过这种配置,每个Kafka Streams实例在另一个节点上维护自己的存储库副本。在滚动更新中,我们遇到以下情况:

  1. 新版本的服务已部署到节点a。
  2. 节点a上的Kafka Streams实例已禁用。
  3. 重新平衡已经开始。
  4. 由于我们指定配置num.standby.replicas = 1,因此已将节点a的存储库复制到节点b。
  5. 节点b已经具有节点a的卷影副本,因此重新平衡过程几乎立即发生。
  6. 节点-a重新启动。
  7. 节点-a加入了一组消费者。
  8. Kafka经纪人看到Kafka Streams的新实例并开始重新平衡。

如我们所见,num.standby.replicas仅在完全关闭节点的情况下才有用。这意味着,如果节点a崩溃,那么节点b几乎可以立即继续正常工作。但是在滚动更新的情况下,断开连接后,节点a将再次加入该组,而这最后一步将导致重新平衡。当节点a重新引导后加入使用者组时,它将被视为使用者的新实例。同样,我们必须记住,实时数据处理将停止,直到新实例从更改日志主题恢复其状态为止。
请注意,将新实例加入组时重新平衡分区不适用于Kafka Streams API,因为这是Apache Kafka使用者组协议的工作方式。

成就:Kafka Streams的高可用性


尽管Kafka客户端库没有为上述问题提供内置功能,但是在滚动更新过程中,可以使用一些技巧来实现高群集可用性。备份副本背后的想法仍然有效,并且在适当的时候拥有备份计算机是一个很好的解决方案,我们可以用来确保在实例发生故障时实现高可用性。

初始设置的问题是我们在所有节点上的所有团队都有一组消费者。现在,我们不再有一组消费者,而是两个,而第二个充当“热”集群。在prod中,节点具有特殊的变量CLUSTER_ID,该变量已添加到Kafka Streams实例的application.id中。这是一个示例Spring Boot application.yml配置:
application.yml
spring.profiles: production
streaming-pipelines:
team-a-stream-app-id: "${CLUSTER_ID}-team-a-stream-app"
team-b-stream-app-id: "${CLUSTER_ID}-team-b-stream-app"


在某个时间点,分别只有一个集群处于活动模式,备用集群不会实时向下游微服务发送消息。在发行版的发行期间,备份群集将变为活动状态,从而可以在第一个群集上进行滚动更新。由于这是一个完全不同的消费者组,因此我们的客户甚至没有注意到处理中的任何违规行为,并且后续服务继续从最近活动的群集中接收消息。使用后备使用者组的明显缺点之一是额外的开销和资源消耗,但是,尽管如此,此体系结构仍为我们的流处理系统提供了额外的保证,控制和容错能力。

除了添加其他群集外,还有一些技巧可以缓解频繁重新平衡的问题。

增加group.initial.rebalance.delay.ms


从Kafka 0.11.0.0开始,添加了配置组.initial.rebalance.delay.ms。根据文档,此设置负责:
GroupCoordinator将延迟该组消费者的初始重新平衡的时间(以毫秒为单位)。

例如,如果我们在此设置中设置了60,000毫秒,则通过滚动更新,我们可能会有一个分钟的发布窗口。如果Kafka Streams实例在此时间窗口内成功重新启动,则不会调用任何重新平衡。请注意,重启的Kafka Streams实例负责的数据将继续不可用,直到该节点返回联机模式为止。例如,如果实例重启大约需要八秒钟,那么该实例所负责的数据将有八秒钟的停机时间。

应该注意的是,此概念的主要缺点是,在节点发生故障的情况下,考虑到当前配置,还原期间还会收到一分钟的额外延迟。

变更日志主题中的段大小缩小


重新平衡Kafka Stream的最大延迟是由于从更改日志主题恢复了状态存储。更改日志主题是压缩的主题,它使您可以存储主题中特定键的最新记录。我将在下面简要描述这个概念。

Kafka Broker中的主题按段进行组织。当段达到配置的阈值大小时,将创建一个新段并压缩前一个段。默认情况下,此阈值设置为1 GB。您可能知道,Kafka主题及其分区基础的主要数据结构是具有前向写入功能的日志结构,即,将消息发送到该主题时,它们总是添加到最后一个“活动”段中,而压缩不继续。
因此,变更日志中大多数已存储的存储状态始终位于“活动段”文件中,并且从未压缩,从而导致数百万条未压缩的变更日志消息。对于Kafka Streams,这意味着在重新平衡期间,当Kafka Streams实例从changelog主题恢复其状态时,它需要从changelog主题读取很多冗余条目。假定状态存储仅关心最后一个状态,而不关心历史,则浪费了该处理时间。减小段的大小将导致更积极的数据压缩,因此Kafka Streams应用程序的新实例可以更快地恢复。

结论


即使Kafka Streams没有提供在滚动服务更新过程中提供高可用性的内置功能,仍然可以在基础结构级别上完成。我们必须记住,与Apache Flink或Apache Spark不同,Kafka Streams不是一个“集群框架”。它是一个轻量级的Java库,允许开发人员创建可伸缩的应用程序以流式传输数据。尽管如此,它还是提供了实现“ 99.99%”可用性这样宏伟的流媒体目标所必需的构建块。

All Articles