Kafka集群部署指南(依赖zookeeper)

一、前言 1、Kafka简介 Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch、Ha

一、前言

1、Kafka简介

Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(ElasticsearchHadoop等)

Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。

2、Kafka架构


执

在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。

概念

Broker

Kafka节点

Topic

主题,用来承载消息

Partition

分区,用于主题分片存储

Producer

生产者,向主题发布消息的应用

Consumer

消费者,从主题订阅消息的应用

Consumer Group

消费者组,由多个消费者组成

准备搭建环境工作

主机:

IP

主机名

角色

172.20.1.210

kafka-node01

kafka,zookeeper

172.20.1.211

kafka-node02

kafka,zookeeper

172.20.1.212

kafka-node03

kafka,zookeeper

软件包:

JDK

1.8

链接:https://pan.baidu.com/s/1vc-ZEKJ8KIQ_4U99uVAW2A?pwd=3nai

kafka

2.13-3.2.1

https://archive.apache.org/dist/kafka/3.2.1/kafka_2.13-3.2.1.tgz

apache-zookeeper

3.8.0

https://archive.apache.org/dist/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz

部署过程:

先搭建zookeeper

kafka-node01:

# 如果不存在,创建目录/app/tools
install -d /app/tools 

# 将JDK解压到/app/tools/
tar -xf /root/jdk-8u291-linux-x64.tar.gz -C /app/tools/ 

# 将Kafka解压到/app/tools/
tar -xf /root/kafka_2.13-3.2.1.tgz -C /app/tools/ 

# 将Zookeeper解压到/app/tools/,并创建data目录
tar -xf /root/apache-zookeeper-3.8.0-bin.tar.gz -C /app/tools/
mkdir /app/tools/apache-zookeeper-3.8.0-bin/data
echo "210" > /app/tools/apache-zookeeper-3.8.0-bin/data/myid

# 复制并编辑Zookeeper配置文件
cd /app/tools/apache-zookeeper-3.8.0-bin/conf;
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 根据指定的设置修改zoo.cfg文件

# 设置Kafka、Zookeeper和Java的环境变量
cat > /etc/profile.d/kafka.sh <<'EOF'
#!/bin/bash
export JAVA_HOME=/app/tools/jdk1.8.0_291
export PATH=$PATH:$JAVA_HOME/bin
export ZK_HOME=/app/tools/apache-zookeeper-3.8.0-bin
export PATH=$PATH:$ZK_HOME/bin
export KAFKA_HOME=/app/tools/kafka_2.13-3.2.1
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile

# 设置Zookeeper的Java环境变量
cat > /app/tools/apache-zookeeper-3.8.0-bin/conf/java.env << 'EOF'
export JAVA_HOME=/app/tools/jdk1.8.0_291
export JVMFLAGS="-Xms256m -Xmx256m $JVMFLAGS"
EOF

# 创建Zookeeper的systemd服务文件
cat > /usr/lib/systemd/system/zookeeper.service << 'EOF'
[Unit]
Description=zookeeper.service
After=network.target

[Service]
Type=forking
Environment=JAVA_HOME=/app/tools/jdk1.8.0_291
ExecStart=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh start
ExecStop=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh stop
ExecReload=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh restart

[Install]
WantedBy=multi-user.target
EOF

# 重新加载systemd配置,启用并启动Zookeeper服务
systemctl daemon-reload && systemctl enable --now zookeeper.service

kafka-node02:

# 如果不存在,创建目录/app/tools
install -d /app/tools 

# 将JDK解压到/app/tools/
tar -xf /root/jdk-8u291-linux-x64.tar.gz -C /app/tools/ 

# 将Kafka解压到/app/tools/
tar -xf /root/kafka_2.13-3.2.1.tgz -C /app/tools/ 

# 将Zookeeper解压到/app/tools/,并创建data目录
tar -xf /root/apache-zookeeper-3.8.0-bin.tar.gz -C /app/tools/
mkdir /app/tools/apache-zookeeper-3.8.0-bin/data
echo "211" > /app/tools/apache-zookeeper-3.8.0-bin/data/myid

# 复制并编辑Zookeeper配置文件
cd /app/tools/apache-zookeeper-3.8.0-bin/conf;
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 根据指定的设置修改zoo.cfg文件

# 设置Kafka、Zookeeper和Java的环境变量
cat > /etc/profile.d/kafka.sh <<'EOF'
#!/bin/bash
export JAVA_HOME=/app/tools/jdk1.8.0_291
export PATH=$PATH:$JAVA_HOME/bin
export ZK_HOME=/app/tools/apache-zookeeper-3.8.0-bin
export PATH=$PATH:$ZK_HOME/bin
export KAFKA_HOME=/app/tools/kafka_2.13-3.2.1
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile

# 设置Zookeeper的Java环境变量
cat > /app/tools/apache-zookeeper-3.8.0-bin/conf/java.env << 'EOF'
export JAVA_HOME=/app/tools/jdk1.8.0_291
export JVMFLAGS="-Xms256m -Xmx256m $JVMFLAGS"
EOF

# 创建Zookeeper的systemd服务文件
cat > /usr/lib/systemd/system/zookeeper.service << 'EOF'
[Unit]
Description=zookeeper.service
After=network.target

[Service]
Type=forking
Environment=JAVA_HOME=/app/tools/jdk1.8.0_291
ExecStart=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh start
ExecStop=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh stop
ExecReload=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh restart

[Install]
WantedBy=multi-user.target
EOF

# 重新加载systemd配置,启用并启动Zookeeper服务
systemctl daemon-reload && systemctl enable --now zookeeper.service

kafka-node03:

# 如果不存在,创建目录/app/tools
install -d /app/tools 

# 将JDK解压到/app/tools/
tar -xf /root/jdk-8u291-linux-x64.tar.gz -C /app/tools/ 

# 将Kafka解压到/app/tools/
tar -xf /root/kafka_2.13-3.2.1.tgz -C /app/tools/ 

# 将Zookeeper解压到/app/tools/,并创建data目录
tar -xf /root/apache-zookeeper-3.8.0-bin.tar.gz -C /app/tools/
mkdir /app/tools/apache-zookeeper-3.8.0-bin/data
echo "212" > /app/tools/apache-zookeeper-3.8.0-bin/data/myid

# 复制并编辑Zookeeper配置文件
cd /app/tools/apache-zookeeper-3.8.0-bin/conf;
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 根据指定的设置修改zoo.cfg文件

# 设置Kafka、Zookeeper和Java的环境变量
cat > /etc/profile.d/kafka.sh <<'EOF'
#!/bin/bash
export JAVA_HOME=/app/tools/jdk1.8.0_291
export PATH=$PATH:$JAVA_HOME/bin
export ZK_HOME=/app/tools/apache-zookeeper-3.8.0-bin
export PATH=$PATH:$ZK_HOME/bin
export KAFKA_HOME=/app/tools/kafka_2.13-3.2.1
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile

# 设置Zookeeper的Java环境变量
cat > /app/tools/apache-zookeeper-3.8.0-bin/conf/java.env << 'EOF'
export JAVA_HOME=/app/tools/jdk1.8.0_291
export JVMFLAGS="-Xms256m -Xmx256m $JVMFLAGS"
EOF

# 创建Zookeeper的systemd服务文件
cat > /usr/lib/systemd/system/zookeeper.service << 'EOF'
[Unit]
Description=zookeeper.service
After=network.target

[Service]
Type=forking
Environment=JAVA_HOME=/app/tools/jdk1.8.0_291
ExecStart=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh start
ExecStop=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh stop
ExecReload=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh restart

[Install]
WantedBy=multi-user.target
EOF

# 重新加载systemd配置,启用并启动Zookeeper服务
systemctl daemon-reload && systemctl enable --now zookeeper.service

搭建kafka集群:

kafka-node01:

# 创建Kafka的server.properties配置文件并编辑
cat > /app/tools/kafka_2.13-3.2.1/config/server.properties <<'EOF'
broker.id=210
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.20.1.210:2181,172.20.1.211:2181,172.20.1.212:2181/caijintian
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

# 创建Kafka的systemd服务文件
cat > /usr/lib/systemd/system/kafka.service << 'EOF'
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service

[Service]
Type=simple
Environment=JAVA_HOME=/app/tools/jdk1.8.0_291
User=root
Group=root
ExecStart=/app/tools/kafka_2.13-3.2.1/bin/kafka-server-start.sh /app/tools/kafka_2.13-3.2.1/config/server.properties
ExecStop=/app/tools/kafka_2.13-3.2.1/bin/kafka-server-stop.sh /app/tools/kafka_2.13-3.2.1/config/server.properties
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

# 重新加载systemd配置,并启用并启动Kafka服务
systemctl daemon-reload && systemctl enable kafka.service --now

kafka-node02:

# 创建Kafka的server.properties配置文件并编辑
cat > /app/tools/kafka_2.13-3.2.1/config/server.properties <<'EOF'
broker.id=211
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.20.1.210:2181,172.20.1.211:2181,172.20.1.212:2181/caijintian
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

# 创建Kafka的systemd服务文件
cat > /usr/lib/systemd/system/kafka.service << 'EOF'
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service

[Service]
Type=simple
Environment=JAVA_HOME=/app/tools/jdk1.8.0_291
User=root
Group=root
ExecStart=/app/tools/kafka_2.13-3.2.1/bin/kafka-server-start.sh /app/tools/kafka_2.13-3.2.1/config/server.properties
ExecStop=/app/tools/kafka_2.13-3.2.1/bin/kafka-server-stop.sh /app/tools/kafka_2.13-3.2.1/config/server.properties
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

# 重新加载systemd配置,并启用并启动Kafka服务
systemctl daemon-reload && systemctl enable kafka.service --now

kafka-node03:

# 创建Kafka的server.properties配置文件并编辑
cat > /app/tools/kafka_2.13-3.2.1/config/server.properties <<'EOF'
broker.id=212
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.20.1.210:2181,172.20.1.211:2181,172.20.1.212:2181/caijintian
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

# 创建Kafka的systemd服务文件
cat > /usr/lib/systemd/system/kafka.service << 'EOF'
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service

[Service]
Type=simple
Environment=JAVA_HOME=/app/tools/jdk1.8.0_291
User=root
Group=root
ExecStart=/app/tools/kafka_2.13-3.2.1/bin/kafka-server-start.sh /app/tools/kafka_2.13-3.2.1/config/server.properties
ExecStop=/app/tools/kafka_2.13-3.2.1/bin/kafka-server-stop.sh /app/tools/kafka_2.13-3.2.1/config/server.properties
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

# 重新加载systemd配置,并启用并启动Kafka服务
systemctl daemon-reload && systemctl enable kafka.service --now

三、Kafka测试

1、创建Topic

kafka-topics.sh --bootstrap-server 172.20.1.210:9092,172.20.1.211:9092,172.20.1.212:9092 --create  --topic test


2、查看Topic

kafka-topics.sh --bootstrap-server 172.20.1.210:9092,172.20.1.211:9092,172.20.1.212:9092 --list

3、生产者发送消息

[root@kafka-node01 ~]# kafka-console-producer.sh --broker-list 172.20.1.210:9092,172.20.1.211:9092,172.20.1.212:9092 --topic test
#消息内容
>hello,I am a man ! 
>

4、消费者接受消息

[root@kafka-node03 ~]# kafka-console-consumer.sh --bootstrap-server 172.20.1.210:9092,172.20.1.211:9092,172.20.1.212:9092 --topic test --from-beginning                                 
#然后均能收到消息
hello,I am a man !

Comment