1 Zookeeper 概述
Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。
Zookeeper 工作机制
Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。也就是说 Zookeeper = 文件系统 + 通知机制。
Zookeeper 特点
(1)Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
(2)Zookeepe集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。
(3)全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
(4)更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行,即先进先出。
(5)数据更新原子性,一次数据更新要么成功,要么失败。
(6)实时性,在一定时间范围内,Client能读到最新数据。
Zookeeper 数据结构
ZooKeeper数据模型的结构与Linux文件系统很类似,整体上可以看作是一棵树,每个节点称做一个ZNode。每一个ZNode默认能够存储1MB的数据,每个ZNode都可以通过其路径唯一标识。
Zookeeper 应用场景
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
●统一命名服务
在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。例如:IP不容易记住,而域名容易记住。
●统一配置管理
(1)分布式环境下,配置文件同步非常常见。一般要求一个集群中,所有节点的配置信息是一致的,比如Kafka集群。对配置文件修改后,希望能够快速同步到各个节点上。
(2)配置管理可交由ZooKeeper实现。可将配置信息写入ZooKeeper上的一个Znode。各个客户端服务器监听这个Znode。一旦 Znode中的数据被修改,ZooKeeper将通知各个客户端服务器。
●统一集群管理
(1)分布式环境中,实时掌握每个节点的状态是必要的。可根据节点实时状态做出一些调整。
(2)ZooKeeper可以实现实时监控节点状态变化。可将节点信息写入ZooKeeper上的一个ZNode。监听这个ZNode可获取它的实时状态变化。
●服务器动态上下线
客户端能实时洞察到服务器上下线的变化。
●软负载均衡
在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。
Zookeeper 选举机制
●第一次启动选举机制
(1)服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;
(2)服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
(3)服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;
(4)服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;
(5)服务器5启动,同4一样当小弟。
●非第一次启动选举机制
(1)当ZooKeeper 集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
1)服务器初始化启动。
2)服务器运行期间无法和Leader保持连接。
(2)而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
1)集群中本来就已经存在一个Leader。
对于已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和 Leader机器建立连接,并进行状态同步即可。
2)集群中确实不存在Leader。
假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。
选举Leader规则:
1.EPOCH大的直接胜出
2.EPOCH相同,事务id大的胜出
3.事务id相同,服务器id大的胜出
SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。
ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑速度有关。
Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加
部署 Zookeeper 集群
环境配置
准备 3 台服务器做 Zookeeper 集群
主机名 | IP |
---|---|
zk-kafka-01 | 10.0.0.201 |
zk-kafka-02 | 10.0.0.202 |
zk-kafka-03 | 10.0.0.203 |
搭建Zookeeper 三台集群
ZK 3.5.7 下载地址:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/
# 三台虚拟机分别修改主机名
[root@localhost ~]# hostnamectl set-hostname zk-kafka-01
[root@localhost ~]# hostnamectl set-hostname zk-kafka-02
[root@localhost ~]# hostnamectl set-hostname zk-kafka-03//关闭三台的防火墙
[root@zk-kafka-01 ~]# systemctl stop firewalld.service && setenforce 0
[root@zk-kafka-02 ~]# systemctl stop firewalld.service && setenforce 0
[root@zk-kafka-03 ~]# systemctl stop firewalld.service && setenforce 0
# 安装 JDK (三台节点)
[root@zk-kafka-01 ~]# apt install openjdk-8-jdk -y
[root@zk-kafka-01 ~]# java -version
openjdk version "1.8.0_402"
OpenJDK Runtime Environment (build 1.8.0_402-8u402-ga-2ubuntu1~22.04-b06)
OpenJDK 64-Bit Server VM (build 25.402-b06, mixed mode)
# 上传apche-zookeeper-3.5.7-bin.tat.gz的压缩包到/opt下,并解压 (在01节点)
[root@zk-kafka-01 opt]# tar zvxf apache-zookeeper-3.5.7-bin.tar.gz
[root@zk-kafka-01 opt]# mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
[root@zk-kafka-01 opt]# cd /usr/local/zookeeper-3.5.7/
[root@zk-kafka-01 zookeeper-3.5.7]# ls
bin docs LICENSE.txt README.md
conf lib NOTICE.txt README_packaging.txt
# 修改配置文件(在01节点)
[root@zk-kafka-01 zookeeper-3.5.7]# cd conf/
[root@zk-kafka-01 conf]# ls
configuration.xsl log4j.properties zoo_sample.cfg
[root@zk-kafka-01 conf]# cp zoo_sample.cfg zoo.cfg
[root@zk-kafka-01 conf]# vim zoo.cfg
dataDir=/usr/local/zookeeper-3.5.7/data
dataLogDir=/usr/local/zookeeper-3.5.7/logs
#添加集群信息
server.1=20.0.0.230:3188:3288
server.2=20.0.0.240:3188:3288
server.3=20.0.0.250:3188:3288
配置解释:
server.A=B:C:D
●A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
●B是这个服务器的地址。
●C是这个服务器Follower与集群中的Leader服务器交换信息的端口。
●D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
当中的配置文件信息的意思:
tickTime=2000 #通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
initLimit=10 #Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
syncLimit=5 #Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
dataDir=/usr/local/zookeeper-3.5.7/data ●修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataLogDir=/usr/local/zookeeper-3.5.7/logs ●添加,指定存放日志的目录,目录需要单独创建
clientPort=2181 #客户端连接端口
# 拷贝配置好的 Zookeeper 配置文件目录到02节点上
[14:57:49 root@zk-kafka-01 conf]#scp -r /usr/local/zookeeper-3.5.7/ 10.0.0.202:/usr/local/
# 拷贝配置好的 Zookeeper 配置文件到03节点上
[14:57:49 root@zk-kafka-01 conf]#scp -r /usr/local/zookeeper-3.5.7/ 10.0.0.203:/usr/local/
# 在01节点上创建数据目录和日志目录
[root@zk-kafka-01 conf]# mkdir /usr/local/zookeeper-3.5.7/data
[root@zk-kafka-01 conf]# mkdir /usr/local/zookeeper-3.5.7/logs
# 在02节点上创建数据目录和日志目录
[root@zk-kafka-02 ~]# mkdir /usr/local/zookeeper-3.5.7/data
[root@zk-kafka-02 ~]# mkdir /usr/local/zookeeper-3.5.7/logs
# 在03节点上创建数据目录和日志目录
[root@zk-kafka-03 ~]# mkdir /usr/local/zookeeper-3.5.7/data
[root@zk-kafka-03 ~]# mkdir /usr/local/zookeeper-3.5.7/logs
zk-kafka-01
# 在01节点的dataDir指定的目录下创建一个 myid 的文件
[root@zk-kafka-01 conf]# echo 1 > /usr/local/zookeeper-3.5.7/data/myid
# 启动 zk
[15:20:27 root@zk-kafka-01 zookeeper-3.5.7]#bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
zk-kafka-02
# 在02节点的dataDir指定的目录下创建一个 myid 的文件
[root@zk-kafka-02 ~]# echo 2 > /usr/local/zookeeper-3.5.7/data/myid
# 启动 zk
[15:20:27 root@zk-kafka-02 zookeeper-3.5.7]#bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
zk-kafka-03
# 在03节点的dataDir指定的目录下创建一个 myid 的文件
[root@zk-kafka-03 ~]# echo 3 > /usr/local/zookeeper-3.5.7/data/myid
# 启动 zk
[15:20:27 root@zk-kafka-03 zookeeper-3.5.7]#bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
查看服务已启动
# 查看 zk 服务端口已启动
[15:25:57 root@zk-kafka-01 zookeeper-3.5.7]#ss -ntl| grep 2181
LISTEN 0 50 *:2181 *:*
# 查看 zk 服务端口已启动
[15:21:14 root@zk-kafka-02 zookeeper-3.5.7]#ss -ntl| grep 2181
LISTEN 0 50 *:2181 *:*
# 查看 zk 服务端口已启动
[15:21:14 root@zk-kafka-03 zookeeper-3.5.7]#ss -ntl| grep 2181
LISTEN 0 50 *:2181 *:*
链接验证 zk
[15:53:09 root@zk-kafka-01 zookeeper-3.5.7]#/usr/local/zookeeper-3.5.7/bin/zkCli.sh -server 127.0.0.1:2181
WatchedEvent state:SyncConnected type:None path:null
# 回车
[zk: 127.0.0.1:2181(CONNECTED) 0]
# 如果显示 “CONNECTED” 则说明链接成功
# 执行 help 查询命令说明
[zk: 127.0.0.1:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path
delquota [-n|-b] path
get [-s] [-w] path
getAcl [-s] path
history
listquota path
ls [-s] [-w] [-R] path
ls2 path [watch]
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
redo cmdno
removewatches path [-c|-d|-a] [-l]
rmr path
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b val path
stat [-w] path
sync path
Command not found: Command not found help
# 退出 zk
quit
然后在不同节点查看 zk 状态可以看到 02 节点为 leader
,其他两个节点为 follower
[16:12:51 root@zk-kafka-02 zookeeper-3.5.7]#./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
[16:13:40 root@zk-kafka-01 zookeeper-3.5.7]#./bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
[16:13:40 root@zk-kafka-03 zookeeper-3.5.7]#./bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
以上就是 ZK 集群部署,下面我们需要将 kafka 对接至 zk
2 kafka
前言:
在说Kafka之前,假设你有一定的消息队列的知识。知道消息队列的模式(点对点模式,发布/订阅模式),也知道消息队列的优点,如果不知道没关系,去百度或者Google搜索都有相关详细的资料。那么我们接下来说说Kafka。
为什么选择Kafka
消息中间件有很多。比如ActiveMQ,RabbitMQ,RocketMQ,Kafka。那你在选型的时候一般考虑哪些因素呢?我们来比较下这几个中间件的特点。
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 | 万级,吞吐量同ActiveMQ | 10万级,可以支撑高吞吐量 | 10万级别,高吞吐量。适合日志采集,实时计算等场景 |
topic数量对吞吐量的影响 | topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic | topic从几十个到几百个的时候,吞吐量会「大幅度下降」所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源 | ||
时效性 | ms级 | 微秒级,这是rabbitmq的一大特点,延迟是最低的 | ms级 | 延迟在ms级以内 |
可用性 | 高,基于主从架构实现高可用性 | 同ActiveMQ | 非常高,分布式架构 | 非常高,同样也是分布式式 |
消息可靠性 | 有较低的概率丢失数据 | 经过参数优化配置,可以做到0丢失 | 同RocketMQ一样也可以做到消息零丢失 | |
功能支持 | MQ领域的功能极其完备 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低 | MQ功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 |
优劣总结
「ActiveMQ」
- 优点:非常成熟,功能强大,在业内大量的公司以及项目中都有应用
- 缺点:偶尔会有较低概率丢失消息。而且现在社区以及国内应用都越来越少,官方社区现在对 ActiveMQ 5.x维护越来越少,几个月才发布一个版本。较少在大规模吞吐的场景中使用。
「RabbitMQ」
- 优点:erlang语言开发,性能极其好,延时很低。吞吐量到万级,MQ功能比较完备。而且开源提供的管理界面非常棒,用起来很好用。社区相对比较活跃,几乎每个月都发布几个版本分。在国内一些互联网公司近几年用rabbitmq也比较多一些。
- 缺点:RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。
「RocketMQ」
- 优点:接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障。日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都不错的,还可以支撑大规模的topic数量,支持复杂MQ业务场景。
- 缺点:社区活跃度相对较为一般,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险。
「Kafka」
- 优点:就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量。
- 缺点:有可能消息重复消费。对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略。
从上面的总结我们知道,Kafka可以用于较简单的消息队列(如果对你来说足够使用)。并且较要求较高的吞吐,那么Kafka是你最合适的选择。
什么是Kafka
Kafka本质还是一个存储容器,最初由LinkedIn公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。
Kafka组成架构
从上面的架构图我们获得几个词:
- Producer :消息生产者,就是向kafka broker发消息的客户端;
- Consumer :消息消费者,向kafka broker取消息的客户端;
- Topic :可以理解为一个队列;
- Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
- Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;
- Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;
- Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。
Kafka 部署
下载
下载地址:http://kafka.apache.org/downloads
我们选取这个下载
安装
zk-kafka-01
把下载的压缩包拷贝到Linux上,解压:
[09:55:39 root@zk-kafka-01 ~]#mkdir kafka
[09:55:41 root@zk-kafka-01 ~]#cd kafka/
[09:56:30 root@zk-kafka-01 kafka]#tar xf kafka_2.13-3.1.0.tgz
[10:04:32 root@zk-kafka-01 kafka]#mv kafka_2.13-3.1.0 /usr/local/kafka
[10:05:36 root@zk-kafka-01 kafka]#cd /usr/local/kafka/config/
[10:05:56 root@zk-kafka-01 config]#ls
connect-console-sink.properties connect-mirror-maker.properties server.properties
connect-console-source.properties connect-standalone.properties tools-log4j.properties
connect-distributed.properties consumer.properties trogdor.conf
connect-file-sink.properties kraft zookeeper.properties
connect-file-source.properties log4j.properties
connect-log4j.properties producer.properties
修改 config
# 修改以下配置,listeners=PLAINTEXT://:9092 表示*:9092 以方便 kafka_exporter 链接
[10:05:59 root@zk-kafka-01 config]#vim server.properties
listeners=PLAINTEXT://:9092
log.dirs=/usr/local/kafka/logs
zookeeper.connect=10.0.0.201:2181,10.0.0.202:2181,10.0.0.203:2181
拷贝 kafka 到 zk-kafka-02,zk-kafka-03 节点
# 推到 local 目录下
[10:10:30 root@zk-kafka-01 config]#cd ../..
[10:10:45 root@zk-kafka-01 local]#scp -r kafka/ 10.0.0.202:/usr/local
[10:10:45 root@zk-kafka-01 local]#scp -r kafka/ 10.0.0.203:/usr/local
修改 zk-kafka-02,zk-kafka-03 节点上 kafka 配置
# 02 节点
[09:52:28 root@zk-kafka-02 ~]#vim /usr/local/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://:9092
# 03 节点
[09:52:28 root@zk-kafka-02 ~]#vim /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://:9092
添加kafka的环境变量(三台节点都添加)
# 01 节点操作
[10:29:48 root@zk-kafka-01 local]# vim /etc/profile
## 到最后一行添加配置信息
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[10:29:48 root@zk-kafka-01 local]#source /etc/profile
# 02 节点操作
[10:29:48 root@zk-kafka-02 local]# vim /etc/profile
## 到最后一行添加配置信息
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[10:29:48 root@zk-kafka-02 local]#source /etc/profile
# 03 节点操作
[10:29:48 root@zk-kafka-03 local]# vim /etc/profile
## 到最后一行添加配置信息
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[10:29:48 root@zk-kafka-03 local]#source /etc/profile
给 01 02 03 节点设置 kafka 启动项
[10:35:03 root@zk-kafka-01 bin]# cd /etc/systemd/system/
[10:35:04 root@zk-kafka-01 system]#vim kafka.service
[Unit]
# Kafka服务的描述
Description=Kafka Service
# 服务依赖—在什么服务之后启动,一般为在网络服务启动后启动
After=network.target zookeeper.service
[Service]
Type=forking
# 启动环境参数
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8/bin"
User=root
Group=root
# 启动命令ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
# 停止命令
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
启动 01 节点
[10:36:55 root@zk-kafka-01 system]# chmod 755 kafka.service
[10:37:01 root@zk-kafka-01 system]#systemctl daemon-reload
[10:37:18 root@zk-kafka-01 system]#systemctl enable --now kafka.service
[10:37:36 root@zk-kafka-01 system]#systemctl status kafka.service
● kafka.service - Kafka Service
Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
Active: active (running) since Thu 2024-04-11 10:37:36 CST; 23s ago
Process: 74900 ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/confi>
Main PID: 75223 (java)
Tasks: 36 (limit: 2178)
Memory: 276.6M
CPU: 26.417s
CGroup: /system.slice/kafka.service
└─75223 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHea>
Apr 11 10:37:30 zk-kafka-01 systemd[1]: Starting Kafka Service...
Apr 11 10:37:36 zk-kafka-01 systemd[1]: Started Kafka Service.
将启动文件拷贝至 02 03 节点
[10:41:56 root@zk-kafka-01 system]#scp kafka.service 10.0.0.202:/etc/systemd/system/
[10:42:13 root@zk-kafka-01 system]#scp kafka.service 10.0.0.203:/etc/systemd/system/
# 02 节点
[10:36:33 root@zk-kafka-02 ~]#systemctl daemon-reload
[10:42:51 root@zk-kafka-02 ~]#systemctl enable --now kafka.service
# 03 节点
[10:32:13 root@zk-kafka-03 ~]#systemctl daemon-reload
[10:42:58 root@zk-kafka-03 ~]#systemctl enable --now kafka.service
验证 kafka
# 01 节点上操作
[10:44:09 root@zk-kafka-01 bin]#pwd
/usr/local/kafka/bin
[10:44:38 root@zk-kafka-01 bin]#kafka-topics.sh --create --bootstrap-server 10.0.0.201:9092 --replication-factor 1 --partitions 3 --topic test
Created topic test.
# 查看当前服务器中所有的topic
[10:45:28 root@zk-kafka-01 bin]#kafka-topics.sh --list --bootstrap-server 10.0.0.201:9092,10.0.0.202:9092,10.0.0.203:9092 hd-topic
test
# 查看某个topic的详情
[10:46:29 root@zk-kafka-01 bin]#kafka-topics.sh --describe --bootstrap-server 10.0.0.201:9092 hd-topic
Topic: test TopicId: cWmpGxsMQu2vMU6qbLQBmw PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test Partition: 2 Leader: 1 Replicas: 1 Isr: 1
02 节点上消费消息
# from-beginning:会把主题中以往所有的数据都读取出来
[10:49:38 root@zk-kafka-02 ~]#cd /usr/local/kafka/bin/
[10:50:59 root@zk-kafka-02 bin]#kafka-console-consumer.sh --bootstrap-server 10.0.0.201:9092,10.0.0.202:9092,10.0.0.203:9092 --topic test
01 节点发布消息:
[10:51:59 root@zk-kafka-01 bin]#kafka-console-producer.sh --broker-list 10.0.0.201:9092,10.0.0.202:9092,10.0.0.203:9092 --topic test
>1
>2
>3
>111
>222
>333
可以看到 02 节点已经消费了该 topic
3 对接 Prometheus
使用的监控方式为:kafka_exporter+prometheus
3.1 部署 exporter
注:1个kafka集群只需要1个exporter,在集群上的任意1台服务器部署。
这里我部署在 01 节点上
[14:19:14 root@zk-kafka-01 ~]#mkdir kafka_exporter
[14:19:26 root@zk-kafka-01 ~]#cd kafka_exporter/
[14:19:29 root@zk-kafka-01 kafka_exporter]#wget https://github.com/danielqsj/kafka_exporter/releases/download/v1.7.0/kafka_exporter-1.7.0.linux-amd64.tar.gz
[14:34:53 root@zk-kafka-01 kafka_exporter]#tar xf kafka_exporter-1.7.0.linux-amd64.tar.gz
[14:34:56 root@zk-kafka-01 kafka_exporter]#mv kafka_exporter-1.7.0.linux-amd64 /usr/local/kafka_exporter
# 添加开机自启动
[14:37:41 root@zk-kafka-01 kafka_exporter]#cat > /etc/systemd/system/kafka_exporter.service << "EOF"
[Unit]
Description=kafka_exporter
After=local-fs.target network-online.target network.target
Wants=local-fs.target network-online.target network.target
# kafka.server 填写 kafka 集群 IP+port即可
[Service]
ExecStart=/usr/local/kafka_exporter/kafka_exporter --kafka.server=10.0.0.201:9092
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
# 加载并设置为开机启动
[14:37:41 root@zk-kafka-01 kafka_exporter]#systemctl daemon-reload
[14:38:46 root@zk-kafka-01 kafka_exporter]#systemctl enable --now kafka_exporter.service
浏览器访问 exporter
3.2 对接 Prometheus
这里我将对接 K8S 中的 Prometheus-operator
# 创建一个单独得 kafka 监控 NS
[15:04:02 root@master ~]#kubectl create ns kafka-mon
[15:05:31 root@master ~]#mkdir kafka_mon
[15:05:34 root@master ~]#cd kafka_mon/
编写 yaml
# 编写 endpoint
[15:47:50 root@master kafka_mon]#vim kafka_end.yaml
apiVersion: v1
kind: Endpoints
metadata:
name: kafka-metrics # 需要和 svc 中 name 字段保持一致
namespace: kafka-mon
labels:
k8s-app: kafka-metrics
subsets:
# 下面分别是 kafka-exporter 的地址
- addresses:
- ip: "10.0.0.201"
ports:
- name: metrics
port: 9308 # 后端 kafka-exporter 端口
protocol: TCP
# 编写 svc
[15:57:15 root@master kafka_mon]#cat kafka_svc.yaml
apiVersion: v1
kind: Service
metadata:
name: kafka-metrics # 所以这里的名字与上面的 Endpoints 中名字相同
namespace: kafka-mon
labels:
k8s-app: kafka-metrics
spec:
ports:
- name: metrics
port: 9308
protocol: TCP
targetPort: 9308
# 编写 sm 对接 Prometheus POD
[15:57:27 root@master kafka_mon]#vim kafka_sm.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
labels:
k8s-app: kafka-metrics
name: kafka-metrics
namespace: monitoring
spec:
namespaceSelector:
matchNames:
- kafka-mon
# selector 字段中匹配 endpoint 的标签
selector:
matchLabels:
k8s-app: kafka-metrics
endpoints:
- interval: 15s
port: metrics # 匹配 svc port 名称
path: /metrics # 匹配监控指标路径
jobLabel: instance
[15:53:36 root@master kafka_mon]#kubectl apply -f kafka_end.yaml
[16:00:10 root@master kafka_mon]#kubectl apply -f kafka_svc.yaml
[16:00:10 root@master kafka_mon]#kubectl apply -f kafka_sm.yaml
Prometheus 验证
# 查看 Prometheus svc
[16:00:56 root@master kafka_mon]#kubectl get svc -n monitoring
prometheus-k8s NodePort 172.30.0.101 <none> 9090:31005/TCP,8080:30344/TCP 21d
获取成功