一、前言
1、Kafka简介
Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch、Hadoop等)
Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。
2、Kafka架构
在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。
准备搭建环境工作
主机:
软件包:
部署过程:
先搭建zookeeper
kafka-node01:
# 如果不存在,创建目录/app/tools
install -d /app/tools
# 将JDK解压到/app/tools/
tar -xf /root/jdk-8u291-linux-x64.tar.gz -C /app/tools/
# 将Kafka解压到/app/tools/
tar -xf /root/kafka_2.13-3.2.1.tgz -C /app/tools/
# 将Zookeeper解压到/app/tools/,并创建data目录
tar -xf /root/apache-zookeeper-3.8.0-bin.tar.gz -C /app/tools/
mkdir /app/tools/apache-zookeeper-3.8.0-bin/data
echo "210" > /app/tools/apache-zookeeper-3.8.0-bin/data/myid
# 复制并编辑Zookeeper配置文件
cd /app/tools/apache-zookeeper-3.8.0-bin/conf;
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 根据指定的设置修改zoo.cfg文件
# 设置Kafka、Zookeeper和Java的环境变量
cat > /etc/profile.d/kafka.sh <<'EOF'
#!/bin/bash
export JAVA_HOME=/app/tools/jdk1.8.0_291
export PATH=$PATH:$JAVA_HOME/bin
export ZK_HOME=/app/tools/apache-zookeeper-3.8.0-bin
export PATH=$PATH:$ZK_HOME/bin
export KAFKA_HOME=/app/tools/kafka_2.13-3.2.1
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile
# 设置Zookeeper的Java环境变量
cat > /app/tools/apache-zookeeper-3.8.0-bin/conf/java.env << 'EOF'
export JAVA_HOME=/app/tools/jdk1.8.0_291
export JVMFLAGS="-Xms256m -Xmx256m $JVMFLAGS"
EOF
# 创建Zookeeper的systemd服务文件
cat > /usr/lib/systemd/system/zookeeper.service << 'EOF'
[Unit]
Description=zookeeper.service
After=network.target
[Service]
Type=forking
Environment=JAVA_HOME=/app/tools/jdk1.8.0_291
ExecStart=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh start
ExecStop=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh stop
ExecReload=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh restart
[Install]
WantedBy=multi-user.target
EOF
# 重新加载systemd配置,启用并启动Zookeeper服务
systemctl daemon-reload && systemctl enable --now zookeeper.service
kafka-node02:
# 如果不存在,创建目录/app/tools
install -d /app/tools
# 将JDK解压到/app/tools/
tar -xf /root/jdk-8u291-linux-x64.tar.gz -C /app/tools/
# 将Kafka解压到/app/tools/
tar -xf /root/kafka_2.13-3.2.1.tgz -C /app/tools/
# 将Zookeeper解压到/app/tools/,并创建data目录
tar -xf /root/apache-zookeeper-3.8.0-bin.tar.gz -C /app/tools/
mkdir /app/tools/apache-zookeeper-3.8.0-bin/data
echo "211" > /app/tools/apache-zookeeper-3.8.0-bin/data/myid
# 复制并编辑Zookeeper配置文件
cd /app/tools/apache-zookeeper-3.8.0-bin/conf;
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 根据指定的设置修改zoo.cfg文件
# 设置Kafka、Zookeeper和Java的环境变量
cat > /etc/profile.d/kafka.sh <<'EOF'
#!/bin/bash
export JAVA_HOME=/app/tools/jdk1.8.0_291
export PATH=$PATH:$JAVA_HOME/bin
export ZK_HOME=/app/tools/apache-zookeeper-3.8.0-bin
export PATH=$PATH:$ZK_HOME/bin
export KAFKA_HOME=/app/tools/kafka_2.13-3.2.1
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile
# 设置Zookeeper的Java环境变量
cat > /app/tools/apache-zookeeper-3.8.0-bin/conf/java.env << 'EOF'
export JAVA_HOME=/app/tools/jdk1.8.0_291
export JVMFLAGS="-Xms256m -Xmx256m $JVMFLAGS"
EOF
# 创建Zookeeper的systemd服务文件
cat > /usr/lib/systemd/system/zookeeper.service << 'EOF'
[Unit]
Description=zookeeper.service
After=network.target
[Service]
Type=forking
Environment=JAVA_HOME=/app/tools/jdk1.8.0_291
ExecStart=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh start
ExecStop=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh stop
ExecReload=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh restart
[Install]
WantedBy=multi-user.target
EOF
# 重新加载systemd配置,启用并启动Zookeeper服务
systemctl daemon-reload && systemctl enable --now zookeeper.service
kafka-node03:
# 如果不存在,创建目录/app/tools
install -d /app/tools
# 将JDK解压到/app/tools/
tar -xf /root/jdk-8u291-linux-x64.tar.gz -C /app/tools/
# 将Kafka解压到/app/tools/
tar -xf /root/kafka_2.13-3.2.1.tgz -C /app/tools/
# 将Zookeeper解压到/app/tools/,并创建data目录
tar -xf /root/apache-zookeeper-3.8.0-bin.tar.gz -C /app/tools/
mkdir /app/tools/apache-zookeeper-3.8.0-bin/data
echo "212" > /app/tools/apache-zookeeper-3.8.0-bin/data/myid
# 复制并编辑Zookeeper配置文件
cd /app/tools/apache-zookeeper-3.8.0-bin/conf;
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 根据指定的设置修改zoo.cfg文件
# 设置Kafka、Zookeeper和Java的环境变量
cat > /etc/profile.d/kafka.sh <<'EOF'
#!/bin/bash
export JAVA_HOME=/app/tools/jdk1.8.0_291
export PATH=$PATH:$JAVA_HOME/bin
export ZK_HOME=/app/tools/apache-zookeeper-3.8.0-bin
export PATH=$PATH:$ZK_HOME/bin
export KAFKA_HOME=/app/tools/kafka_2.13-3.2.1
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile
# 设置Zookeeper的Java环境变量
cat > /app/tools/apache-zookeeper-3.8.0-bin/conf/java.env << 'EOF'
export JAVA_HOME=/app/tools/jdk1.8.0_291
export JVMFLAGS="-Xms256m -Xmx256m $JVMFLAGS"
EOF
# 创建Zookeeper的systemd服务文件
cat > /usr/lib/systemd/system/zookeeper.service << 'EOF'
[Unit]
Description=zookeeper.service
After=network.target
[Service]
Type=forking
Environment=JAVA_HOME=/app/tools/jdk1.8.0_291
ExecStart=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh start
ExecStop=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh stop
ExecReload=/app/tools/apache-zookeeper-3.8.0-bin/bin/zkServer.sh restart
[Install]
WantedBy=multi-user.target
EOF
# 重新加载systemd配置,启用并启动Zookeeper服务
systemctl daemon-reload && systemctl enable --now zookeeper.service
搭建kafka集群:
kafka-node01:
# 创建Kafka的server.properties配置文件并编辑
cat > /app/tools/kafka_2.13-3.2.1/config/server.properties <<'EOF'
broker.id=210
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.20.1.210:2181,172.20.1.211:2181,172.20.1.212:2181/caijintian
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
# 创建Kafka的systemd服务文件
cat > /usr/lib/systemd/system/kafka.service << 'EOF'
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
Environment=JAVA_HOME=/app/tools/jdk1.8.0_291
User=root
Group=root
ExecStart=/app/tools/kafka_2.13-3.2.1/bin/kafka-server-start.sh /app/tools/kafka_2.13-3.2.1/config/server.properties
ExecStop=/app/tools/kafka_2.13-3.2.1/bin/kafka-server-stop.sh /app/tools/kafka_2.13-3.2.1/config/server.properties
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
# 重新加载systemd配置,并启用并启动Kafka服务
systemctl daemon-reload && systemctl enable kafka.service --now
kafka-node02:
# 创建Kafka的server.properties配置文件并编辑
cat > /app/tools/kafka_2.13-3.2.1/config/server.properties <<'EOF'
broker.id=211
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.20.1.210:2181,172.20.1.211:2181,172.20.1.212:2181/caijintian
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
# 创建Kafka的systemd服务文件
cat > /usr/lib/systemd/system/kafka.service << 'EOF'
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
Environment=JAVA_HOME=/app/tools/jdk1.8.0_291
User=root
Group=root
ExecStart=/app/tools/kafka_2.13-3.2.1/bin/kafka-server-start.sh /app/tools/kafka_2.13-3.2.1/config/server.properties
ExecStop=/app/tools/kafka_2.13-3.2.1/bin/kafka-server-stop.sh /app/tools/kafka_2.13-3.2.1/config/server.properties
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
# 重新加载systemd配置,并启用并启动Kafka服务
systemctl daemon-reload && systemctl enable kafka.service --now
kafka-node03:
# 创建Kafka的server.properties配置文件并编辑
cat > /app/tools/kafka_2.13-3.2.1/config/server.properties <<'EOF'
broker.id=212
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.20.1.210:2181,172.20.1.211:2181,172.20.1.212:2181/caijintian
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
# 创建Kafka的systemd服务文件
cat > /usr/lib/systemd/system/kafka.service << 'EOF'
[Unit]
Description=Apache Kafka server (broker)
After=network.target zookeeper.service
[Service]
Type=simple
Environment=JAVA_HOME=/app/tools/jdk1.8.0_291
User=root
Group=root
ExecStart=/app/tools/kafka_2.13-3.2.1/bin/kafka-server-start.sh /app/tools/kafka_2.13-3.2.1/config/server.properties
ExecStop=/app/tools/kafka_2.13-3.2.1/bin/kafka-server-stop.sh /app/tools/kafka_2.13-3.2.1/config/server.properties
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
# 重新加载systemd配置,并启用并启动Kafka服务
systemctl daemon-reload && systemctl enable kafka.service --now
三、Kafka测试
1、创建Topic
kafka-topics.sh --bootstrap-server 172.20.1.210:9092,172.20.1.211:9092,172.20.1.212:9092 --create --topic test
2、查看Topic
kafka-topics.sh --bootstrap-server 172.20.1.210:9092,172.20.1.211:9092,172.20.1.212:9092 --list
3、生产者发送消息
[root@kafka-node01 ~]# kafka-console-producer.sh --broker-list 172.20.1.210:9092,172.20.1.211:9092,172.20.1.212:9092 --topic test
#消息内容
>hello,I am a man !
>
4、消费者接受消息
[root@kafka-node03 ~]# kafka-console-consumer.sh --bootstrap-server 172.20.1.210:9092,172.20.1.211:9092,172.20.1.212:9092 --topic test --from-beginning
#然后均能收到消息
hello,I am a man !