Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。
关于kafka的信息可以参考官方文档: https://kafka.apache.org/documentation/
虚拟机环境,3台服务器,对应ip:
192.168.142.128
192.168.142.129
192.168.142.130
先把主机名改掉, 分别到3台机器上改自己的:
##192.168.142.128 [root@localhost ~]# hostnamectl set-hostname kafka1 ##192.168.142.129 [root@localhost ~]# hostnamectl set-hostname kafka2 ##192.168.142.130 [root@localhost ~]# hostnamectl set-hostname kafka3
关掉防火墙
[root@kafka1 ~]# systemctl stop firewalld.service [root@kafka1 ~]# systemctl disable firewalld.service [root@kafka2 ~]# systemctl stop firewalld.service [root@kafka2 ~]# systemctl disable firewalld.service [root@kafka3 ~]# systemctl stop firewalld.service [root@kafka3 ~]# systemctl disable firewalld.service
下载kafka,并修改配置文件
##3台服务器都需处理 [root@kafka1 ~]# wget -O kafka_2.13-3.1.0.tgz https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz [root@kafka1 ~]# tar -zxvf kafka_2.13-3.1.0.tgz [root@kafka1 ~]# mv kafka_2.13-3.1.0/ /usr/local/kafka [root@kafka1 ~]# cd /usr/local/kafka/ [root@kafka1 kafka]# [root@kafka1 kafka]# ls bin config libs LICENSE licenses NOTICE site-docs
配置zookeeper集群,修改配置文件zookeeper.properties
#每个节点都要执行 [root@kafka1 kafka]# vim config/zookeeper.properties dataDir=/data/zookeeper clientPort=2181 tickTime=2000 initLimit=20 syncLimit=10 server.1=192.168.142.128:2182:2183 server.2=192.168.142.129:2182:2183 server.3=192.168.142.130:2182:2183 [root@kafka2 kafka]# vim config/zookeeper.properties dataDir=/data/zookeeper clientPort=2181 tickTime=2000 initLimit=20 syncLimit=10 server.1=192.168.142.128:2182:2183 server.2=192.168.142.129:2182:2183 server.3=192.168.142.130:2182:2183 [root@kafka3 kafka]# vim config/zookeeper.properties dataDir=/data/zookeeper clientPort=2181 tickTime=2000 initLimit=20 syncLimit=10 server.1=192.168.142.128:2182:2183 server.2=192.168.142.129:2182:2183 server.3=192.168.142.130:2182:2183
tickTime=2000
为zk的基本时间单元,毫秒 initLimit=10
Leader-Follower初始通信时限(tickTime*10
) syncLimit=5
Leader-Follower同步通信时限(tickTime*5
) server.实例集群标识=实例地址:数据通信端口:选举通信端口
tickTime: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳
initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
syncLimit:这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里
192.168.142.128为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2182,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是2183)
创建zookeeper所需的目录
分别在3个机器上执行,为实例添加集群标识:
第一台:192.168.142.128
[root@kafka1 ~]# mkdir -p /data/zookeeper [root@kafka1 ~]# echo "1" > /data/zookeeper/myid
第二台:192.168.142.129
[root@kafka2 ~]# mkdir -p /data/zookeeper [root@kafka2 ~]# echo "2" > /data/zookeeper/myid
第三台:192.168.142.130
[root@kafka3 ~]# mkdir -p /data/zookeeper [root@kafka3 ~]# echo "3" > /data/zookeeper/myid
启动集群服务Zookeeper
##192.168.142.128 kafka1 [root@kafka1 kafka]# bin/zookeeper-server-start.sh config/zookeeper.properties ##192.168.142.129 kafka2 [root@kafka2 kafka]# bin/zookeeper-server-start.sh config/zookeeper.properties ##192.168.142.130 kafka3 [root@kafka3 kafka]# bin/zookeeper-server-start.sh config/zookeeper.properties
效果图
配置Kafka集群环境
Kafka集群节点>=2时便可对外提供高可用服务
1)修改Kafka配置文件config/server.properties
注意:注释掉所有节点的broker.id
root@kafka1 kafka]# vim config/server.properties log.dirs=/data/kafka-logs #broker.id=0 zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181 listeners=PLAINTEXT://192.168.142.128:9092 advertised.listeners=PLAINTEXT://192.168.142.129:9092 [root@kafka2 kafka]# vim config/server.properties log.dirs=/data/kafka-logs #broker.id=0 zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181 listeners=PLAINTEXT://192.168.142.129:9092 advertised.listeners=PLAINTEXT://192.168.142.129:9092 [root@kafka3 kafka]# vim config/server.properties log.dirs=/data/kafka-logs #broker.id=0 zookeeper.connect=kafka1:2181,kafka2:2181,kafka3:2181 listeners=PLAINTEXT://192.168.142.130:9092 advertised.listeners=PLAINTEXT://192.168.142.130:9092
创建日志目录:
[root@kafka1 kafka]# mkdir -p /data/kafka-logs [root@kafka2 kafka]# mkdir -p /data/kafka-logs [root@kafka3 kafka]# mkdir -p /data/kafka-logs
分别在3台机器上启动kafka
[root@kafka1 kafka]# bin/kafka-server-start.sh config/server.properties [root@kafka2 kafka]# bin/kafka-server-start.sh config/server.properties [root@kafka3 kafka]# bin/kafka-server-start.sh config/server.properties
创建topic测试:
[root@kafka1 kafka]# bin/kafka-topics.sh --create --bootstrap-server kafka1:9092 --replication-factor 3 --partitions 1 --topic my-topic Created topic my-topic.
--replication-factor 2:副本集数量
--partition 4:分区数
查看所有的topic:
[root@kafka1 kafka]# bin/kafka-topics.sh --list --bootstrap-server kafka1:9092 tp1 my-topic
模拟一个数据:
[root@kafka3 kafka]# bin/kafka-console-producer.sh --broker-list kafka1:9092 --topic my-topic >message two >1111111 >222 >333 >444 >
起一个消费者:
[root@kafka1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic my-topic --from-beginning 1111 222 222 444 测试数据 哈哈
查看哪个borke【kafka服务器】在工作
可以描述Topic分区数/副本数/副本Leader/副本ISR等信息:
[root@kafka2 kafka]# bin/kafka-topics.sh --describe --bootstrap-server kafka1:9092 --topic my-topic Topic: my-topic TopicId: FthKwxf8TjKb-U8VZ7cztQ PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: my-topic Partition: 0 Leader: 1002 Replicas: 1002,1001,1003 Isr: 1002,1001,1003
leader:是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。(哪个broker在读写“leader”)
replicas:是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着(当前可以正常工作的kafka集群。当leader挂掉时会自动替补)
isr:是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader(同步消息的列表集合)
删除Topic
注意,只是删除Topic在zk的元数据,日志数据仍需手动删除。
bin/kafka-topics.sh --bootstrap-server kafka1:9092 --delete --topic my-topic
消费者
新模式,offset
存储在borker
--new-consumer Use new consumer. This is the default.
--bootstrap-server <server to connectto> REQUIRED (unless old consumer is used): The server to connect to.
老消费模式,offset
存储在zk
--zookeeper <urls> REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
【待测】
到这里kafka集群已经ok
把zookeeper和kafka做成系统服务并开机自动启动(可以把之前在终端执行的启动命令关了,不然起不来)
在3台机器上分别运行
[root@kafka1 ~]# mkdir /etc/cluster/
创建zookeeper的service文件
[root@kafka1 cluster]# cat /etc/cluster/zookeeper.service [Unit] Description=zookeeper [Service] ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties SyslogIdentifier=zookeeper [Install] WantedBy=multi-user.target
加为系统服务并开机启动
[root@kafka1 cluster]# ln -s /etc/cluster/zookeeper.service /lib/systemd/system [root@kafka1 cluster]# systemctl start zookeeper ### 开机启动: [root@kafka1 cluster]# systemctl enable zookeeper
设置kafka系统服务并开机启动
[root@kafka1 cluster]# cat /etc/cluster/kafka.service [Unit] Description=kafka [Service] ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties SyslogIdentifier=kafka [Install] WantedBy=multi-user.target
设置开机启动
[root@kafka1 cluster]# ln -s /etc/cluster/kafka.service /lib/systemd/system/ [root@kafka1 cluster]# systemctl start kafka [root@kafka1 cluster]# systemctl enable kafka
查看服务状态:
[root@kafka1 cluster]# systemctl status kafka ● kafka.service - kafka Loaded: loaded (/etc/cluster/kafka.service; enabled; vendor preset: disabled) Active: active (running) since Sun 2021-12-05 20:49:52 EST; 45min ago Main PID: 138011 (java) Tasks: 74 (limit: 48706) Memory: 383.6M
查看服务日志:
[root@kafka1 cluster]# journalctl -u kafka -f
参考:
https://blog.csdn.net/weixin_30878501/article/details/99027529
本文为崔凯原创文章,转载无需和我联系,但请注明来自冷暖自知一抹茶ckhttp://www.cksite.cn