Determine the appropriate size for the Kafka cluster in Kubernetes

Note perev. : In this article, Banzai Cloud shares an example of the use of its special utilities to facilitate the operation of Kafka within Kubernetes. These instructions illustrate how you can determine the optimal infrastructure size and configure Kafka itself to achieve the required throughput.



Apache Kafka is a distributed streaming platform for creating reliable, scalable and high-performance real-time streaming systems. Its impressive features can be expanded with Kubernetes. To do this, we developed the Kafka Open Source operator and a tool called Supertubes. They allow you to run Kafka in Kubernetes and use its various functions, such as fine-tuning the broker's configuration, scaling based on metrics with rebalancing, rack awareness (awareness of hardware resources), “soft” (graceful) rolling updates, etc.

Try Supertubes in your cluster:

curl https://getsupertubes.sh | sh  supertubes install -a --no-democluster --kubeconfig <path-to-eks-cluster-kubeconfig-file>

Or refer to the documentation . You can also read about some of the features of Kafka, which are automated with the help of Supertubes and the Kafka operator. We already wrote about them in the blog:


When you decide to deploy a Kafka cluster in Kubernetes, you will probably encounter the problem of determining the optimal size of the underlying infrastructure and the need to fine-tune the Kafka configuration to meet bandwidth requirements. The maximum performance of each broker is determined by the performance of the infrastructure components at its core, such as memory, processor, disk speed, network bandwidth, etc.

Ideally, the broker's configuration should be such that all elements of the infrastructure are used to the maximum of their capabilities. However, in real life, this setup is very complicated. It is more likely that users will configure brokers to maximize the use of one or two components (disk, memory, or processor). Generally speaking, a broker shows maximum performance when its configuration allows you to use the slowest component "in full." So we can get an approximate idea of ​​the load that one broker can handle.

Theoretically, we can also estimate the number of brokers needed to work with a given load. However, in practice, there are so many configuration options at different levels that it is very difficult (if not impossible) to assess the potential performance of a certain configuration. In other words, it is very difficult to plan the configuration, starting from some given performance.

For Supertubes users, we usually use the following approach: start with some configuration (infrastructure + settings), then measure its performance, adjust broker settings and repeat the process again. This happens until the potential of the slowest infrastructure component is fully utilized.

In this way, we get a clearer idea of ​​how many brokers a cluster needs to cope with a certain load (the number of brokers also depends on other factors, such as the minimum number of message replicas to ensure stability, the number of partition leaders, etc.). In addition, we get an idea of ​​which infrastructure component vertical scaling is desired.

This article will discuss the steps that we take in order to “squeeze everything” out of the slowest components in the initial configurations and measure the throughput of the Kafka cluster. A highly resilient configuration requires at least three working brokers (min.insync.replicas=3) spaced in three different accessibility zones. To configure, scale and monitor the Kubernetes infrastructure, we use our own hybrid cloud container management platform - Pipeline . It supports on-premise (bare metal, VMware) and five types of clouds (Alibaba, AWS, Azure, Google, Oracle), as well as any combination thereof.

Thoughts on the infrastructure and configuration of the Kafka cluster


For the examples below, we selected AWS as the cloud service provider and EKS as the Kubernetes distribution. A similar configuration can be implemented using PKE , a Kubernetes distribution from Banzai Cloud, certified by CNCF.

Disk


Amazon offers various types of EBS volumes . SSDs are the basis of gp2 and io1 , however, to ensure high throughput, gp2 consumes accumulated loans (I / O credits) , therefore we preferred the io1 type , which offers stable high throughput.

Instance Types


Kafka's performance is highly dependent on the page cache of the operating system, so we need instances with enough memory for brokers (JVMs) and page cache. Instance c5.2xlarge is a good start, as it has 16 GB of memory and is optimized for working with EBS . Its disadvantage is that it is able to provide maximum performance for no more than 30 minutes every 24 hours. If your workload requires maximum performance over a longer period of time, look at other types of instances. We did just that, stopping at c5.4xlarge . It provides a maximum throughput of 593.75 Mb / s.. The maximum throughput of the EBS io1 volume is higher than that of the c5.4xlarge instance , so the slowest infrastructure element is probably the I / O throughput of this type of instance (which should also be confirmed by the results of our load tests).

Network


The network bandwidth should be quite large compared to the performance of the VM instance and disk, otherwise the network becomes a bottleneck. In our case, the c5.4xlarge network interface supports speeds up to 10 Gb / s, which is significantly higher than the bandwidth of the I / O instance of the VM.

Broker Deployment


Brokers should be deployed (planned in Kubernetes) to dedicated nodes in order to avoid competition with other processes for processor, memory, network and disk resources.

Java version


The logical choice is Java 11, because it is compatible with Docker in the sense that the JVM correctly determines the processors and memory available to the container in which the broker runs. Knowing that processor limits are important, the JVM internally and transparently sets the number of GC threads and JIT compiler threads. We used a Kafka image banzaicloud/kafka:2.13-2.4.0that includes Kafka version 2.4.0 (Scala 2.13) in Java 11.

If you would like to learn more about Java / JVM on Kubernetes, check out our following publications:


Broker Memory Settings


There are two key aspects to setting broker's memory: settings for the JVM and for the Kubernetes pod. The memory limit set for the pod must be greater than the maximum heap size so that the JVM has enough space for the Java metaspace in its own memory and for the operating system page cache that Kafka is actively using. In our tests, we ran Kafka brokers with parameters -Xmx4G -Xms2G, and the memory limit for the pod was 10 Gi. Please note that the memory settings for the JVM can be obtained automatically using -XX:MaxRAMPercentageand -X:MinRAMPercentage, based on the memory limit for the pod.

Broker processor settings


Generally speaking, you can increase productivity by increasing concurrency by increasing the number of threads used by Kafka. The more processors available for Kafka, the better. In our test, we started with a limit of 6 processors and gradually (iterated) raised their number to 15. In addition, we set num.network.threads=12in the broker settings to increase the number of streams that receive data from the network and send them. Having immediately discovered that follower brokers cannot receive replicas fast enough, they raised num.replica.fetchersto 4 to increase the speed with which follower brokers replicated messages from leaders.

Load generation tool


Make sure that the potential of the selected load generator does not run out before the Kafka cluster (whose benchmark is running) reaches its maximum load. In other words, it is necessary to conduct a preliminary assessment of the capabilities of the load generation tool, as well as select instance types for it with a sufficient number of processors and memory. In this case, our tool will produce more load than the Kafka cluster can digest. After many experiments, we settled on three copies of c5.4xlarge , in each of which the generator was launched.

Benchmarking


Performance measurement is an iterative process that includes the following steps:

  • infrastructure setup (EKS cluster, Kafka cluster, load generation tool, as well as Prometheus and Grafana);
  • load generation for a certain period to filter random deviations in the collected performance indicators;
  • tuning the infrastructure and configuration of the broker based on observed performance indicators;
  • repeating the process until the required level of Kafka cluster bandwidth is reached. At the same time, it should be stably reproducible and demonstrate minimal bandwidth variations.

The next section describes the steps that were performed during the benchmark test cluster.

Tools


The following tools were used to quickly deploy the base configuration, load generation, and performance measurement:

  • Banzai Cloud Pipeline EKS Amazon c Prometheus ( Kafka ) Grafana ( ). Pipeline , , , , , .
  • Sangrenel — Kafka.
  • Grafana Kafka : Kubernetes Kafka, Node Exporter.
  • Supertubes CLI for the easiest way to configure a Kafka cluster in Kubernetes. Zookeeper, Kafka operator, Envoy and many other components are installed and properly configured to run the Kafka cluster ready for production in Kubernetes.
    • To install supertubes CLI, use the instructions given here .



EKS Cluster


Prepare an EKS cluster with dedicated c5.4xlarge work nodes in various availability zones for pods with Kafka brokers, as well as dedicated nodes for the load generator and monitoring infrastructure.

banzai cluster create -f https://raw.githubusercontent.com/banzaicloud/kafka-operator/master/docs/benchmarks/infrastructure/cluster_eks_202001.json

When the EKS cluster is operational, enable its integrated monitoring service - it will deploy Prometheus and Grafana to the cluster.

Kafka system components


Install the Kafka system components (Zookeeper, kafka-operator) in the EKS using supertubes CLI:

supertubes install -a --no-democluster --kubeconfig <path-to-eks-cluster-kubeconfig-file>

Kafka Cluster


By default, EKS uses gp2 EBS volumes , so you need to create a separate storage class based on io1 volumes for the Kafka cluster:

kubectl create -f - <<EOF
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: fast-ssd
provisioner: kubernetes.io/aws-ebs
parameters:
  type: io1
  iopsPerGB: "50"
  fsType: ext4
volumeBindingMode: WaitForFirstConsumer
EOF

Set a parameter for brokers min.insync.replicas=3and deploy brokers pods on nodes in three different availability zones:

supertubes cluster create -n kafka --kubeconfig <path-to-eks-cluster-kubeconfig-file> -f https://raw.githubusercontent.com/banzaicloud/kafka-operator/master/docs/benchmarks/infrastructure/kafka_202001_3brokers.yaml --wait --timeout 600

Topics


We simultaneously launched three instances of the load generator. Each of them writes in its own topic, that is, all we need is three topics:

supertubes cluster topic create -n kafka --kubeconfig <path-to-eks-cluster-kubeconfig-file> -f -<<EOF
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaTopic
metadata:
  name: perftest1
spec:
  name: perftest1
  partitions: 12
  replicationFactor: 3
  retention.ms: '28800000'
  cleanup.policy: delete
EOF

supertubes cluster topic create -n kafka --kubeconfig <path-to-eks-cluster-kubeconfig-file> -f -<<EOF
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaTopic
metadata:
    name: perftest2
spec:
  name: perftest2
  partitions: 12
  replicationFactor: 3
  retention.ms: '28800000'
  cleanup.policy: delete
EOF

supertubes cluster topic create -n kafka --kubeconfig <path-to-eks-cluster-kubeconfig-file> -f -<<EOF
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaTopic
metadata:
  name: perftest3
spec:
  name: perftest3
  partitions: 12
  replicationFactor: 3
  retention.ms: '28800000'
  cleanup.policy: delete
EOF

For each topic, the replication factor is 3 - the minimum recommended value for highly available production systems.

Load generation tool


We launched three instances of the load generator (each wrote in a separate topic). For pods of the load generator, you need to register node affinity so that they are planned only on the nodes allocated for them:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    app: loadtest
  name: perf-load1
  namespace: kafka
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: loadtest
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: loadtest
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: nodepool.banzaicloud.io/name
                operator: In
                values:
                - loadgen
      containers:
      - args:
        - -brokers=kafka-0:29092,kafka-1:29092,kafka-2:29092,kafka-3:29092
        - -topic=perftest1
        - -required-acks=all
        - -message-size=512
        - -workers=20
        image: banzaicloud/perfload:0.1.0-blog
        imagePullPolicy: Always
        name: sangrenel
        resources:
          limits:
            cpu: 2
            memory: 1Gi
          requests:
            cpu: 2
            memory: 1Gi
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      terminationGracePeriodSeconds: 30

A few points to pay attention to:

  • The load generator generates 512-byte messages and publishes them to Kafka in batches of 500 messages.
  • -required-acks=all , Kafka. , , , , . (consumers) , , , .
  • 20 worker' (-workers=20). worker 5 producer', worker' Kafka. 100 producer', Kafka.

Cluster Monitoring


During stress testing of the Kafka cluster, we also monitored its health to make sure there were no pod restarts, out-of-sync replicas and maximum throughput with minimal fluctuations:

  • The load generator writes standard statistics on the number of published messages and the level of errors. The percentage of errors should remain in the value 0,00%.
  • Cruise Control , deployed by kafka-operator, provides a dashboard on which we can also monitor the state of the cluster. To view this panel, do:

    supertubes cluster cruisecontrol show -n kafka --kubeconfig <path-to-eks-cluster-kubeconfig-file>
  • The ISR level (in-sync replica count) of shrink and expansion is 0.

Measurement results


3 brokers, message size - 512 bytes


With partitions evenly distributed across three brokers, we managed to achieve a performance of ~ 500 Mb / s (approximately 990 thousand messages per second) :







The memory consumption of the JVM virtual machine did not exceed 2 GB:







The disk bandwidth reached the maximum I / O bandwidth the node on all three instances the brokers worked on:







From the data on the memory usage by the nodes, it follows that system buffering and caching took ~ 10-15 GB:







3 brokers, message size - 100 bytes


With a decrease in message size, throughput decreases by about 15-20%: the time spent on processing each message is affected. In addition, the processor load almost doubled.







Since broker nodes still have unused kernels, you can improve performance by changing the configuration of Kafka. This is not an easy task, therefore, to increase throughput, it is better to work with larger messages.

4 brokers, message size - 512 bytes


You can easily increase the performance of the Kafka cluster by simply adding new brokers and maintaining the balance of partitions (this ensures even load distribution between brokers). In our case, after adding a broker, the cluster throughput increased to ~ 580 Mb / s (~ 1.1 million messages per second) . The growth turned out to be smaller than expected: this is mainly due to the imbalance of the partitions (not all brokers work at the peak of opportunities).









The memory consumption by the JVM machine remains below 2 GB:









Partition imbalance affected the operation of brokers with drives:









findings


The iterative approach presented above can be extended to cover more complex scenarios, including hundreds of consumers, repartitioning, rolling updates, pod restarts, etc. All this allows us to assess the limits of the capabilities of the Kafka cluster in various conditions, identify bottlenecks in its work and find ways to deal with them.

We developed Supertubes to quickly and easily deploy a cluster, configure it, add / remove brokers and topics, respond to alerts, and ensure that Kafka works properly in Kubernetes as a whole. Our goal is to help concentrate on the main task (“generate” and “consume” Kafka messages), and provide all the hard work to Supertubes and Kafka operator.

If you are interested in Banzai Cloud technologies and Open Source projects, subscribe to the company on GitHub , LinkedIn or Twitter .

PS from the translator


Read also in our blog:


All Articles