三节点ZooKeeper集群配置和Apache Kafka代理

美好的一天!

在本文中,我们将考虑建立一个由三个ZooKeeper节点(分布式系统协调服务)组成的集群,其中两个是Kafka消息代理,第三个是管理器。

结果,将实施以下组件方案:

图片

为简单起见,我们将一台机器上的所有组件都使用内置的控制台发布者(生产者)和订阅者(​​消费者)消息。

ZooKeeper模块内置在Kafka软件包中,我们使用它。

安装与设定


安装Kafka软件包

在这种情况下,操作系统是Ubuntu 16.04 LTS。

当前版本和详细说明在官方网站上

wget http://mirror.linux-ia64.org/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz
tar -xvzf kafka_2.12-2.4.1.tgz 

为三个节点server_1,server_2和server_3创建工作目录。

mv kafka_2.12-2.4.1 kafka_server_1
cp -r kafka_server_1/ kafka_server_2/
cp -r kafka_server_1/ kafka_server_3/

配置节点

配置Kafka代理
(分别来自目录kafka_server_1,kafka_server_2和kafka_server_3)

vim config/server.properties

  • 经纪人编号(分别为0、1、2)
  • 客户端端口(9092、9093、9094)
  • ZooKeeper端口(2181、2182、2183)
  • 日志目录(/ tmp / kafka-logs-1,/ tmp / kafka-logs-2,/ tmp / kafka-logs-3)

broker.id=0
listeners=PLAINTEXT://:9092
zookeeper.connect=localhost:2181   
log.dirs=/tmp/kafka-logs-1

Znode配置:

vim config/zookeeper.properties

  • 数据目录(/ tmp / zookeeper_1,/ tmp / zookeeper_2,/ tmp / zookeeper_3)
  • 客户端端口(2181、2182、2183)
  • 客户端连接的最大数量和连接限制
  • 用于在节点之间交换数据的端口(即,我们通知每个节点其他节点的存在)

dataDir=/tmp/zookeeper_1
clientPort=2181
maxClientCnxns=60
initLimit=10
syncLimit=5
tickTime=2000
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890

为ZooKeeper节点创建目录,将节点ID写入服务文件:

mkdir -p /tmp/zookeeper_[1..3]
echo "1" >> /tmp/zookeeper_1/myid
echo "2" >> /tmp/zookeeper_2/myid
echo "3" >> /tmp/zookeeper_3/myid

如果访问权限有问题,请在目录和文件中进行更改:

sudo chmod 777 /tmp/zookeeper_1

运行z节点和代理


对于前两个节点(kafka_server_1 /,kafka_server_2 /),运行ZooKeeper和Kafka-servers脚本,参数是相应的配置文件:

sudo bin/zookeeper-server-start.sh config/zookeeper.properties
sudo bin/kafka-server-start.sh config/server.properties

对于第三个节点(kafka_server_3 /)-仅ZooKeeper。

用于停止服务器的脚本:

sudo bin/kafka-server-stop.sh
sudo bin/zookeeper-server-stop.sh

创建主题,控制台生产者和使用者


主题(topic)-某种类型的消息流,分为多个分区(数量由--partition键指定)。每个分区在两个服务器上重复(--replication-factor)。在--bootstrap-server密钥后,指定Kafka代理的端口,以逗号分隔。--topic开关指定主题的名称。

sudo bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093 --replication-factor 2 --partitions 2 --topic TestTopic

要查找端口上的主题列表,可以使用以下命令完成有关每个主题的信息:

sudo bin/kafka-topics.sh --list --zookeeper localhost:2181           
sudo bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic TestTopic

领导者-具有分区主要实例的服务器,副本服务器-在其上复制信息的服务器,ISR-在领导者失败的情况下担当领导者角色的服务器。

我们使用适当的脚本创建控制台生产者,控制台使用者:

sudo bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093 --topic TestTopic
sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093 --from-beginning --topic TestTopic

在一个消息代理处理第二个消息代理失败的情况下,这确保了将消息连续传输到收件人。

systemctl中针对Zookeeper和kafka的服务


为了方便启动群集,可以在systemctl中创建服务:zookeeper_1.service,zookeeper_2.service,zookeeper_3.service,kafka_1.service,kafka_2.service。

我们编辑文件/etc/systemd/system/zookeeper_1.service(将目录/ home / user和user user更改为必要的目录)。

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=user
ExecStart=home/user/kafka_server_1/bin/zookeeper-server-start.sh /home/user/kafka_server_1/config/zookeeper.properties
ExecStop=/home/user/kafka_server_1/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

对于服务zookeeper_2.service和zookeeper_3.service来说,它是相似的。

文件/etc/systemd/system/kafka_1.service:
[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=user
ExecStart=/bin/sh -c '/home/user/kafka_server_1/bin/kafka-server-start.sh /home/user/kafka_server_1/config/server.properties > /tmp/kafka-logs-1 2>&1'
ExecStop=/home/user/kafka_server_1/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

kafka_2.service与之类似。

激活和验证服务

systemctl daemon-reload
systemctl enable zookeeper_1.service #  
systemctl start zookeeper_1.service
sudo journalctl -u zookeeper_1.service #   
systemctl stop zookeeper_1.service

zookeeper_2.service,zookeeper_3.service,kafka_1.service,kafka_1.service也是如此。

感谢您的关注!

All Articles