Zookeeper集群与kafka集群部署搭建及Prometheus 监控

1 Zookeeper 概述

Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。

Zookeeper 工作机制

Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。也就是说 Zookeeper = 文件系统 + 通知机制。

Zookeeper 特点

(1)Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
(2)Zookeepe集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器。
(3)全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。
(4)更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行,即先进先出。
(5)数据更新原子性,一次数据更新要么成功,要么失败。
(6)实时性,在一定时间范围内,Client能读到最新数据。

Zookeeper 数据结构

ZooKeeper数据模型的结构与Linux文件系统很类似,整体上可以看作是一棵树,每个节点称做一个ZNode。每一个ZNode默认能够存储1MB的数据,每个ZNode都可以通过其路径唯一标识。

Zookeeper 应用场景

提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

●统一命名服务
在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。例如:IP不容易记住,而域名容易记住。

●统一配置管理
(1)分布式环境下,配置文件同步非常常见。一般要求一个集群中,所有节点的配置信息是一致的,比如Kafka集群。对配置文件修改后,希望能够快速同步到各个节点上。
(2)配置管理可交由ZooKeeper实现。可将配置信息写入ZooKeeper上的一个Znode。各个客户端服务器监听这个Znode。一旦 Znode中的数据被修改,ZooKeeper将通知各个客户端服务器。

●统一集群管理
(1)分布式环境中,实时掌握每个节点的状态是必要的。可根据节点实时状态做出一些调整。
(2)ZooKeeper可以实现实时监控节点状态变化。可将节点信息写入ZooKeeper上的一个ZNode。监听这个ZNode可获取它的实时状态变化。

●服务器动态上下线
客户端能实时洞察到服务器上下线的变化。

●软负载均衡
在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。

Zookeeper 选举机制

●第一次启动选举机制
(1)服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;
(2)服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
(3)服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;
(4)服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING;
(5)服务器5启动,同4一样当小弟。

●非第一次启动选举机制
(1)当ZooKeeper 集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:
1)服务器初始化启动。
2)服务器运行期间无法和Leader保持连接。

(2)而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:
1)集群中本来就已经存在一个Leader。
对于已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和 Leader机器建立连接,并进行状态同步即可。

2)集群中确实不存在Leader。
假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻,3和5服务器出现故障,因此开始进行Leader选举。

选举Leader规则:

1.EPOCH大的直接胜出
2.EPOCH相同,事务id大的胜出
3.事务id相同,服务器id大的胜出

SID:服务器ID。用来唯一标识一台ZooKeeper集群中的机器,每台机器不能重复,和myid一致。
ZXID:事务ID。ZXID是一个事务ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的ZXID值不一定完全一致,这和ZooKeeper服务器对于客户端“更新请求”的处理逻辑速度有关。
Epoch:每个Leader任期的代号。没有Leader时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加

部署 Zookeeper 集群

环境配置

准备 3 台服务器做 Zookeeper 集群

主机名 IP
zk-kafka-01 10.0.0.201
zk-kafka-02 10.0.0.202
zk-kafka-03 10.0.0.203

搭建Zookeeper 三台集群

ZK 3.5.7 下载地址:https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/

# 三台虚拟机分别修改主机名
[root@localhost ~]# hostnamectl set-hostname zk-kafka-01
[root@localhost ~]# hostnamectl set-hostname zk-kafka-02
[root@localhost ~]# hostnamectl set-hostname zk-kafka-03//关闭三台的防火墙
[root@zk-kafka-01 ~]# systemctl stop firewalld.service && setenforce 0
[root@zk-kafka-02 ~]# systemctl stop firewalld.service && setenforce 0
[root@zk-kafka-03 ~]# systemctl stop firewalld.service && setenforce 0
# 安装 JDK (三台节点)
[root@zk-kafka-01 ~]# apt install openjdk-8-jdk -y
[root@zk-kafka-01 ~]# java -version
openjdk version "1.8.0_402"
OpenJDK Runtime Environment (build 1.8.0_402-8u402-ga-2ubuntu1~22.04-b06)
OpenJDK 64-Bit Server VM (build 25.402-b06, mixed mode)

# 上传apche-zookeeper-3.5.7-bin.tat.gz的压缩包到/opt下,并解压 (在01节点)
[root@zk-kafka-01 opt]# tar zvxf apache-zookeeper-3.5.7-bin.tar.gz 
[root@zk-kafka-01 opt]# mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
[root@zk-kafka-01 opt]# cd /usr/local/zookeeper-3.5.7/
[root@zk-kafka-01 zookeeper-3.5.7]# ls
bin   docs  LICENSE.txt  README.md
conf  lib   NOTICE.txt   README_packaging.txt

# 修改配置文件(在01节点)
[root@zk-kafka-01 zookeeper-3.5.7]# cd conf/
[root@zk-kafka-01 conf]# ls
configuration.xsl  log4j.properties  zoo_sample.cfg
[root@zk-kafka-01 conf]# cp zoo_sample.cfg zoo.cfg
[root@zk-kafka-01 conf]# vim zoo.cfg
dataDir=/usr/local/zookeeper-3.5.7/data
dataLogDir=/usr/local/zookeeper-3.5.7/logs

#添加集群信息
server.1=20.0.0.230:3188:3288
server.2=20.0.0.240:3188:3288
server.3=20.0.0.250:3188:3288

配置解释:

server.A=B:C:D
●A是一个数字,表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件myid,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
●B是这个服务器的地址。
●C是这个服务器Follower与集群中的Leader服务器交换信息的端口。

●D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

当中的配置文件信息的意思:
tickTime=2000   #通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
initLimit=10    #Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
syncLimit=5     #Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
dataDir=/usr/local/zookeeper-3.5.7/data      ●修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataLogDir=/usr/local/zookeeper-3.5.7/logs   ●添加,指定存放日志的目录,目录需要单独创建
clientPort=2181   #客户端连接端口
# 拷贝配置好的 Zookeeper 配置文件目录到02节点上
[14:57:49 root@zk-kafka-01 conf]#scp -r /usr/local/zookeeper-3.5.7/ 10.0.0.202:/usr/local/

# 拷贝配置好的 Zookeeper 配置文件到03节点上
[14:57:49 root@zk-kafka-01 conf]#scp -r /usr/local/zookeeper-3.5.7/ 10.0.0.203:/usr/local/
# 在01节点上创建数据目录和日志目录
[root@zk-kafka-01 conf]# mkdir /usr/local/zookeeper-3.5.7/data
[root@zk-kafka-01 conf]# mkdir /usr/local/zookeeper-3.5.7/logs

# 在02节点上创建数据目录和日志目录
[root@zk-kafka-02 ~]# mkdir /usr/local/zookeeper-3.5.7/data
[root@zk-kafka-02 ~]# mkdir /usr/local/zookeeper-3.5.7/logs

# 在03节点上创建数据目录和日志目录
[root@zk-kafka-03 ~]# mkdir /usr/local/zookeeper-3.5.7/data
[root@zk-kafka-03 ~]# mkdir /usr/local/zookeeper-3.5.7/logs

zk-kafka-01

# 在01节点的dataDir指定的目录下创建一个 myid 的文件
[root@zk-kafka-01 conf]# echo 1 > /usr/local/zookeeper-3.5.7/data/myid

# 启动 zk
[15:20:27 root@zk-kafka-01 zookeeper-3.5.7]#bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

zk-kafka-02

# 在02节点的dataDir指定的目录下创建一个 myid 的文件
[root@zk-kafka-02 ~]# echo 2 > /usr/local/zookeeper-3.5.7/data/myid

# 启动 zk
[15:20:27 root@zk-kafka-02 zookeeper-3.5.7]#bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

zk-kafka-03

# 在03节点的dataDir指定的目录下创建一个 myid 的文件
[root@zk-kafka-03 ~]# echo 3 > /usr/local/zookeeper-3.5.7/data/myid

# 启动 zk
[15:20:27 root@zk-kafka-03 zookeeper-3.5.7]#bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

查看服务已启动

# 查看 zk 服务端口已启动
[15:25:57 root@zk-kafka-01 zookeeper-3.5.7]#ss -ntl| grep  2181
LISTEN 0      50                       *:2181             *:*          

# 查看 zk 服务端口已启动
[15:21:14 root@zk-kafka-02 zookeeper-3.5.7]#ss -ntl| grep  2181
LISTEN 0      50                       *:2181             *:*

# 查看 zk 服务端口已启动
[15:21:14 root@zk-kafka-03 zookeeper-3.5.7]#ss -ntl| grep  2181
LISTEN 0      50                       *:2181             *:*

链接验证 zk

[15:53:09 root@zk-kafka-01 zookeeper-3.5.7]#/usr/local/zookeeper-3.5.7/bin/zkCli.sh -server 127.0.0.1:2181

WatchedEvent state:SyncConnected type:None path:null
# 回车

[zk: 127.0.0.1:2181(CONNECTED) 0] 
# 如果显示 “CONNECTED” 则说明链接成功

# 执行 help 查询命令说明
[zk: 127.0.0.1:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
    addauth scheme auth
    close 
    config [-c] [-w] [-s]
    connect host:port
    create [-s] [-e] [-c] [-t ttl] path [data] [acl]
    delete [-v version] path
    deleteall path
    delquota [-n|-b] path
    get [-s] [-w] path
    getAcl [-s] path
    history 
    listquota path
    ls [-s] [-w] [-R] path
    ls2 path [watch]
    printwatches on|off
    quit 
    reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
    redo cmdno
    removewatches path [-c|-d|-a] [-l]
    rmr path
    set [-s] [-v version] path data
    setAcl [-s] [-v version] [-R] path acl
    setquota -n|-b val path
    stat [-w] path
    sync path
Command not found: Command not found help

# 退出 zk
quit

然后在不同节点查看 zk 状态可以看到 02 节点为 leader ,其他两个节点为 follower

[16:12:51 root@zk-kafka-02 zookeeper-3.5.7]#./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader

[16:13:40 root@zk-kafka-01 zookeeper-3.5.7]#./bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

[16:13:40 root@zk-kafka-03 zookeeper-3.5.7]#./bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

以上就是 ZK 集群部署,下面我们需要将 kafka 对接至 zk

2 kafka

前言:

在说Kafka之前,假设你有一定的消息队列的知识。知道消息队列的模式(点对点模式,发布/订阅模式),也知道消息队列的优点,如果不知道没关系,去百度或者Google搜索都有相关详细的资料。那么我们接下来说说Kafka。

为什么选择Kafka

消息中间件有很多。比如ActiveMQ,RabbitMQ,RocketMQ,Kafka。那你在选型的时候一般考虑哪些因素呢?我们来比较下这几个中间件的特点。

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,吞吐量比RocketMQ和Kafka要低了一个数量级 万级,吞吐量同ActiveMQ 10万级,可以支撑高吞吐量 10万级别,高吞吐量。适合日志采集,实时计算等场景
topic数量对吞吐量的影响 topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic topic从几十个到几百个的时候,吞吐量会「大幅度下降」所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源
时效性 ms级 微秒级,这是rabbitmq的一大特点,延迟是最低的 ms级 延迟在ms级以内
可用性 高,基于主从架构实现高可用性 同ActiveMQ 非常高,分布式架构 非常高,同样也是分布式式
消息可靠性 有较低的概率丢失数据 经过参数优化配置,可以做到0丢失 同RocketMQ一样也可以做到消息零丢失
功能支持 MQ领域的功能极其完备 基于erlang开发,所以并发能力很强,性能极其好,延时很低 MQ功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准

优劣总结

「ActiveMQ」

  • 优点:非常成熟,功能强大,在业内大量的公司以及项目中都有应用
  • 缺点:偶尔会有较低概率丢失消息。而且现在社区以及国内应用都越来越少,官方社区现在对 ActiveMQ 5.x维护越来越少,几个月才发布一个版本。较少在大规模吞吐的场景中使用。

「RabbitMQ」

  • 优点:erlang语言开发,性能极其好,延时很低。吞吐量到万级,MQ功能比较完备。而且开源提供的管理界面非常棒,用起来很好用。社区相对比较活跃,几乎每个月都发布几个版本分。在国内一些互联网公司近几年用rabbitmq也比较多一些。
  • 缺点:RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。而且rabbitmq集群动态扩展会很麻烦,不过这个我觉得还好。其实主要是erlang语言本身带来的问题。很难读源码,很难定制和掌控。

「RocketMQ」

  • 优点:接口简单易用,而且毕竟在阿里大规模应用过,有阿里品牌保障。日处理消息上百亿之多,可以做到大规模吞吐,性能也非常好,分布式扩展也很方便,社区维护还可以,可靠性和可用性都不错的,还可以支撑大规模的topic数量,支持复杂MQ业务场景。
  • 缺点:社区活跃度相对较为一般,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险。

「Kafka」

  • 优点:就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时kafka最好是支撑较少的topic数量即可,保证其超高吞吐量。
  • 缺点:有可能消息重复消费。对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略。

从上面的总结我们知道,Kafka可以用于较简单的消息队列(如果对你来说足够使用)。并且较要求较高的吞吐,那么Kafka是你最合适的选择。

什么是Kafka

Kafka本质还是一个存储容器,最初由LinkedIn公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。

Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。

Kafka组成架构

从上面的架构图我们获得几个词:

  • Producer :消息生产者,就是向kafka broker发消息的客户端;
  • Consumer :消息消费者,向kafka broker取消息的客户端;
  • Topic :可以理解为一个队列;
  • Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
  • Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;
  • Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;
  • Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。

Kafka 部署

下载

下载地址:http://kafka.apache.org/downloads

我们选取这个下载

安装

zk-kafka-01

把下载的压缩包拷贝到Linux上,解压:

[09:55:39 root@zk-kafka-01 ~]#mkdir kafka
[09:55:41 root@zk-kafka-01 ~]#cd kafka/
[09:56:30 root@zk-kafka-01 kafka]#tar xf kafka_2.13-3.1.0.tgz
[10:04:32 root@zk-kafka-01 kafka]#mv kafka_2.13-3.1.0 /usr/local/kafka
[10:05:36 root@zk-kafka-01 kafka]#cd /usr/local/kafka/config/
[10:05:56 root@zk-kafka-01 config]#ls
connect-console-sink.properties    connect-mirror-maker.properties  server.properties
connect-console-source.properties  connect-standalone.properties    tools-log4j.properties
connect-distributed.properties     consumer.properties              trogdor.conf
connect-file-sink.properties       kraft                            zookeeper.properties
connect-file-source.properties     log4j.properties
connect-log4j.properties           producer.properties

修改 config

# 修改以下配置,listeners=PLAINTEXT://:9092 表示*:9092 以方便 kafka_exporter 链接
[10:05:59 root@zk-kafka-01 config]#vim server.properties 
listeners=PLAINTEXT://:9092
log.dirs=/usr/local/kafka/logs
zookeeper.connect=10.0.0.201:2181,10.0.0.202:2181,10.0.0.203:2181

拷贝 kafka 到 zk-kafka-02,zk-kafka-03 节点

# 推到 local 目录下
[10:10:30 root@zk-kafka-01 config]#cd ../..
[10:10:45 root@zk-kafka-01 local]#scp -r kafka/ 10.0.0.202:/usr/local
[10:10:45 root@zk-kafka-01 local]#scp -r kafka/ 10.0.0.203:/usr/local

修改 zk-kafka-02,zk-kafka-03 节点上 kafka 配置

# 02 节点
[09:52:28 root@zk-kafka-02 ~]#vim /usr/local/kafka/config/server.properties 
broker.id=1
listeners=PLAINTEXT://:9092

# 03 节点
[09:52:28 root@zk-kafka-02 ~]#vim /usr/local/kafka/config/server.properties 
broker.id=2
listeners=PLAINTEXT://:9092

添加kafka的环境变量(三台节点都添加)

# 01 节点操作
[10:29:48 root@zk-kafka-01 local]# vim /etc/profile
## 到最后一行添加配置信息
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin

[10:29:48 root@zk-kafka-01 local]#source /etc/profile

# 02 节点操作
[10:29:48 root@zk-kafka-02 local]# vim /etc/profile
## 到最后一行添加配置信息
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin

[10:29:48 root@zk-kafka-02 local]#source /etc/profile

# 03 节点操作
[10:29:48 root@zk-kafka-03 local]# vim /etc/profile
## 到最后一行添加配置信息
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin

[10:29:48 root@zk-kafka-03 local]#source /etc/profile

给 01 02 03 节点设置 kafka 启动项

[10:35:03 root@zk-kafka-01 bin]# cd /etc/systemd/system/

[10:35:04 root@zk-kafka-01 system]#vim kafka.service

[Unit]
# Kafka服务的描述
Description=Kafka Service
# 服务依赖—在什么服务之后启动,一般为在网络服务启动后启动
After=network.target zookeeper.service

[Service]
Type=forking
# 启动环境参数
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/jdk1.8/bin"
User=root
Group=root

# 启动命令ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
# 停止命令
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

启动 01 节点

[10:36:55 root@zk-kafka-01 system]# chmod 755 kafka.service

[10:37:01 root@zk-kafka-01 system]#systemctl daemon-reload 
[10:37:18 root@zk-kafka-01 system]#systemctl enable --now kafka.service

[10:37:36 root@zk-kafka-01 system]#systemctl status kafka.service 
● kafka.service - Kafka Service
     Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
     Active: active (running) since Thu 2024-04-11 10:37:36 CST; 23s ago
    Process: 74900 ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/confi>
   Main PID: 75223 (java)
      Tasks: 36 (limit: 2178)
     Memory: 276.6M
        CPU: 26.417s
     CGroup: /system.slice/kafka.service
             └─75223 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHea>

Apr 11 10:37:30 zk-kafka-01 systemd[1]: Starting Kafka Service...
Apr 11 10:37:36 zk-kafka-01 systemd[1]: Started Kafka Service.

将启动文件拷贝至 02 03 节点

[10:41:56 root@zk-kafka-01 system]#scp kafka.service 10.0.0.202:/etc/systemd/system/ 
[10:42:13 root@zk-kafka-01 system]#scp kafka.service 10.0.0.203:/etc/systemd/system/

# 02 节点
[10:36:33 root@zk-kafka-02 ~]#systemctl daemon-reload 
[10:42:51 root@zk-kafka-02 ~]#systemctl enable --now kafka.service

# 03 节点
[10:32:13 root@zk-kafka-03 ~]#systemctl daemon-reload 
[10:42:58 root@zk-kafka-03 ~]#systemctl enable --now kafka.service

验证 kafka

# 01 节点上操作
[10:44:09 root@zk-kafka-01 bin]#pwd 
/usr/local/kafka/bin

[10:44:38 root@zk-kafka-01 bin]#kafka-topics.sh --create --bootstrap-server 10.0.0.201:9092 --replication-factor 1 --partitions 3 --topic test

Created topic test.

# 查看当前服务器中所有的topic
[10:45:28 root@zk-kafka-01 bin]#kafka-topics.sh --list --bootstrap-server 10.0.0.201:9092,10.0.0.202:9092,10.0.0.203:9092 hd-topic

test

# 查看某个topic的详情
[10:46:29 root@zk-kafka-01 bin]#kafka-topics.sh --describe --bootstrap-server 10.0.0.201:9092 hd-topic
Topic: test TopicId: cWmpGxsMQu2vMU6qbLQBmw PartitionCount: 3   ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: test Partition: 1    Leader: 2   Replicas: 2 Isr: 2
    Topic: test Partition: 2    Leader: 1   Replicas: 1 Isr: 1

02 节点上消费消息

# from-beginning:会把主题中以往所有的数据都读取出来
[10:49:38 root@zk-kafka-02 ~]#cd /usr/local/kafka/bin/
[10:50:59 root@zk-kafka-02 bin]#kafka-console-consumer.sh --bootstrap-server 10.0.0.201:9092,10.0.0.202:9092,10.0.0.203:9092 --topic test 

01 节点发布消息:

[10:51:59 root@zk-kafka-01 bin]#kafka-console-producer.sh --broker-list 10.0.0.201:9092,10.0.0.202:9092,10.0.0.203:9092  --topic test
>1
>2
>3
>111
>222
>333

可以看到 02 节点已经消费了该 topic

3 对接 Prometheus

使用的监控方式为:kafka_exporter+prometheus

3.1 部署 exporter

注:1个kafka集群只需要1个exporter,在集群上的任意1台服务器部署。

这里我部署在 01 节点上

[14:19:14 root@zk-kafka-01 ~]#mkdir kafka_exporter

[14:19:26 root@zk-kafka-01 ~]#cd kafka_exporter/

[14:19:29 root@zk-kafka-01 kafka_exporter]#wget https://github.com/danielqsj/kafka_exporter/releases/download/v1.7.0/kafka_exporter-1.7.0.linux-amd64.tar.gz

[14:34:53 root@zk-kafka-01 kafka_exporter]#tar xf kafka_exporter-1.7.0.linux-amd64.tar.gz 

[14:34:56 root@zk-kafka-01 kafka_exporter]#mv kafka_exporter-1.7.0.linux-amd64 /usr/local/kafka_exporter

# 添加开机自启动
[14:37:41 root@zk-kafka-01 kafka_exporter]#cat > /etc/systemd/system/kafka_exporter.service << "EOF"
[Unit]
Description=kafka_exporter
After=local-fs.target network-online.target network.target
Wants=local-fs.target network-online.target network.target

# kafka.server 填写 kafka 集群 IP+port即可
[Service]
ExecStart=/usr/local/kafka_exporter/kafka_exporter --kafka.server=10.0.0.201:9092
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF

# 加载并设置为开机启动 
[14:37:41 root@zk-kafka-01 kafka_exporter]#systemctl daemon-reload
[14:38:46 root@zk-kafka-01 kafka_exporter]#systemctl enable --now kafka_exporter.service 

浏览器访问 exporter

Kafka Exporter

3.2 对接 Prometheus

这里我将对接 K8S 中的 Prometheus-operator

# 创建一个单独得 kafka 监控 NS
[15:04:02 root@master ~]#kubectl create ns kafka-mon

[15:05:31 root@master ~]#mkdir kafka_mon
[15:05:34 root@master ~]#cd kafka_mon/

编写 yaml

# 编写 endpoint
[15:47:50 root@master kafka_mon]#vim kafka_end.yaml
apiVersion: v1
kind: Endpoints
metadata:
  name: kafka-metrics                           # 需要和 svc 中 name 字段保持一致
  namespace: kafka-mon
  labels:
    k8s-app: kafka-metrics  
subsets:
# 下面分别是 kafka-exporter 的地址
  - addresses:
      - ip: "10.0.0.201"
    ports:
      - name: metrics
        port: 9308      # 后端 kafka-exporter 端口
        protocol: TCP

# 编写 svc
[15:57:15 root@master kafka_mon]#cat kafka_svc.yaml 
apiVersion: v1
kind: Service
metadata:
  name: kafka-metrics   # 所以这里的名字与上面的 Endpoints 中名字相同
  namespace: kafka-mon
  labels:
    k8s-app: kafka-metrics  
spec:
  ports:
  - name: metrics
    port: 9308
    protocol: TCP
    targetPort: 9308

# 编写 sm 对接 Prometheus POD
[15:57:27 root@master kafka_mon]#vim kafka_sm.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  labels:
    k8s-app: kafka-metrics
  name: kafka-metrics
  namespace: monitoring
spec:
  namespaceSelector:
    matchNames:
    - kafka-mon
  # selector 字段中匹配 endpoint 的标签
  selector:
    matchLabels:
      k8s-app: kafka-metrics
  endpoints:
  - interval: 15s
    port: metrics       # 匹配 svc port 名称
    path: /metrics      # 匹配监控指标路径
  jobLabel: instance
[15:53:36 root@master kafka_mon]#kubectl apply -f kafka_end.yaml 
[16:00:10 root@master kafka_mon]#kubectl apply -f kafka_svc.yaml 
[16:00:10 root@master kafka_mon]#kubectl apply -f kafka_sm.yaml 

Prometheus 验证

# 查看 Prometheus svc
[16:00:56 root@master kafka_mon]#kubectl get svc -n monitoring
prometheus-k8s          NodePort    172.30.0.101   <none>        9090:31005/TCP,8080:30344/TCP   21d

获取成功

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇