美好的一天!在本文中,我们将考虑建立一个由三个ZooKeeper节点(分布式系统协调服务)组成的集群,其中两个是Kafka消息代理,第三个是管理器。结果,将实施以下组件方案:
为简单起见,我们将一台机器上的所有组件都使用内置的控制台发布者(生产者)和订阅者(消费者)消息。ZooKeeper模块内置在Kafka软件包中,我们使用它。安装与设定
安装Kafka软件包在这种情况下,操作系统是Ubuntu 16.04 LTS。当前版本和详细说明在官方网站上。wget http://mirror.linux-ia64.org/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz
tar -xvzf kafka_2.12-2.4.1.tgz
为三个节点server_1,server_2和server_3创建工作目录。mv kafka_2.12-2.4.1 kafka_server_1
cp -r kafka_server_1/ kafka_server_2/
cp -r kafka_server_1/ kafka_server_3/
配置节点配置Kafka代理(分别来自目录kafka_server_1,kafka_server_2和kafka_server_3)vim config/server.properties
- 经纪人编号(分别为0、1、2)
- 客户端端口(9092、9093、9094)
- ZooKeeper端口(2181、2182、2183)
- 日志目录(/ tmp / kafka-logs-1,/ tmp / kafka-logs-2,/ tmp / kafka-logs-3)
broker.id=0
listeners=PLAINTEXT://:9092
zookeeper.connect=localhost:2181
log.dirs=/tmp/kafka-logs-1
Znode配置:vim config/zookeeper.properties
- 数据目录(/ tmp / zookeeper_1,/ tmp / zookeeper_2,/ tmp / zookeeper_3)
- 客户端端口(2181、2182、2183)
- 客户端连接的最大数量和连接限制
- 用于在节点之间交换数据的端口(即,我们通知每个节点其他节点的存在)
dataDir=/tmp/zookeeper_1
clientPort=2181
maxClientCnxns=60
initLimit=10
syncLimit=5
tickTime=2000
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
为ZooKeeper节点创建目录,将节点ID写入服务文件:mkdir -p /tmp/zookeeper_[1..3]
echo "1" >> /tmp/zookeeper_1/myid
echo "2" >> /tmp/zookeeper_2/myid
echo "3" >> /tmp/zookeeper_3/myid
如果访问权限有问题,请在目录和文件中进行更改:sudo chmod 777 /tmp/zookeeper_1
运行z节点和代理
对于前两个节点(kafka_server_1 /,kafka_server_2 /),运行ZooKeeper和Kafka-servers脚本,参数是相应的配置文件:sudo bin/zookeeper-server-start.sh config/zookeeper.properties
sudo bin/kafka-server-start.sh config/server.properties
对于第三个节点(kafka_server_3 /)-仅ZooKeeper。用于停止服务器的脚本:sudo bin/kafka-server-stop.sh
sudo bin/zookeeper-server-stop.sh
创建主题,控制台生产者和使用者
主题(topic)-某种类型的消息流,分为多个分区(数量由--partition键指定)。每个分区在两个服务器上重复(--replication-factor)。在--bootstrap-server密钥后,指定Kafka代理的端口,以逗号分隔。--topic开关指定主题的名称。sudo bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093 --replication-factor 2 --partitions 2 --topic TestTopic
要查找端口上的主题列表,可以使用以下命令完成有关每个主题的信息:sudo bin/kafka-topics.sh --list --zookeeper localhost:2181
sudo bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic TestTopic
领导者-具有分区主要实例的服务器,副本服务器-在其上复制信息的服务器,ISR-在领导者失败的情况下担当领导者角色的服务器。我们使用适当的脚本创建控制台生产者,控制台使用者:sudo bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093 --topic TestTopic
sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093 --from-beginning --topic TestTopic
在一个消息代理处理第二个消息代理失败的情况下,这确保了将消息连续传输到收件人。systemctl中针对Zookeeper和kafka的服务
为了方便启动群集,可以在systemctl中创建服务:zookeeper_1.service,zookeeper_2.service,zookeeper_3.service,kafka_1.service,kafka_2.service。我们编辑文件/etc/systemd/system/zookeeper_1.service(将目录/ home / user和user user更改为必要的目录)。[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=user
ExecStart=home/user/kafka_server_1/bin/zookeeper-server-start.sh /home/user/kafka_server_1/config/zookeeper.properties
ExecStop=/home/user/kafka_server_1/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
对于服务zookeeper_2.service和zookeeper_3.service来说,它是相似的。文件/etc/systemd/system/kafka_1.service:[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=user
ExecStart=/bin/sh -c '/home/user/kafka_server_1/bin/kafka-server-start.sh /home/user/kafka_server_1/config/server.properties > /tmp/kafka-logs-1 2>&1'
ExecStop=/home/user/kafka_server_1/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
kafka_2.service与之类似。激活和验证服务systemctl daemon-reload
systemctl enable zookeeper_1.service
systemctl start zookeeper_1.service
sudo journalctl -u zookeeper_1.service
systemctl stop zookeeper_1.service
zookeeper_2.service,zookeeper_3.service,kafka_1.service,kafka_1.service也是如此。感谢您的关注!