作者:管理员  历史版本:1  更新时间:2024-06-27 10:21

了解Kafka

Kafka是什么

Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统

Kafka的基本术语

消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。

批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。

主题:消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。

分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

生产者: 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。

消费者:订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。

消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。

偏移量:偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

broker: 一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

broker 集群:broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。

副本:Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。

重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

Kafka 的特性(设计原则)

  • 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
  • 高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
  • 持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
  • 容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
  • 高并发: 支持数千个客户端同时读写

Kafka的使用场景

  • 活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
  • 传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
  • 度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 流式处理:流式处理是有一个能够提供多种应用程序的领域。
  • 限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。

Kafka的消息队列

Kafka 的消息队列一般分为两种模式:点对点模式和发布订阅模式

Kafka 是支持消费者群组的,也就是说 Kafka 中会有一个或者多个消费者,如果一个生产者生产的消息由一个消费者进行消费的话,那么这种模式就是点对点模式

如果一个生产者或者多个生产者产生的消息能够被多个消费者同时消费的情况,这样的消息队列成为发布订阅模式的消息队列

Kafka系统架构

如上图所示,一个典型的 Kafka 集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

Kafka安装和配置(物理部署)

安装Java环境

在安装Kafka之前,确保系统上已经有java环境

安装Zookeeper环境

Kafka 的底层使用 Zookeeper 储存元数据,确保一致性,所以安装 Kafka 前需要先安装 Zookeeper,Kafka 的发行版自带了 Zookeeper ,也可以直接使用脚本来启动。

安装zookeeper时注意要和kafka中的版本一致

Zookeeper单机搭建

  1. 下载解压
    直接从 https://www.apache.org/dyn/closer.cgi/zookeeper/ 官网下载一个稳定版本的 Zookeeper ,解压即可

  2. 修改配置解压完成后,cd 到 zookeeper的根目录,创建一个 data 文件夹,然后进入到 conf 文件夹下,使用 mv zoo_sample.cfg zoo.cfg 进行重命名操作

    然后使用 vi 打开 zoo.cfg ,更改一下dataDir = zookeeper根目录/data ,保存。

  3. 启动服务
    进入bin目录,启动服务输入命令./zkServer.sh start

  4. 关闭服务
    关闭服务输入命令,./zkServer.sh stop

  5. 查看状态信息
    使用 ./zkServer.sh status 可以查看状态信息。

Zookeeper集群搭建

  1. 安装
    安装步骤同单节点一样,版本一致即可

  2. 配置
    Zookeeper根目录下新建 datalog 文件夹,编辑 conf/zoo.cfg 文件,内容如下:

    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=zookeeper根目录/data
    dataLogDir=zookeeper根目录/log
    clientPort=12181
    server.1=192.168.64.132:2888:3888
    server.2=192.168.64.133:2888:3888
    server.3=192.168.64.134:2888:3888

    tickTime: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

    initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒

    syncLimit: 这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒

    dataDir: 快照日志的存储路径

    dataLogDir: 事务日志的存储路径,如果不配置这个那么事务日志会默认存储到dataDir指定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事务日志、快照日志太多

    clientPort: 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

    server.1 中的这个 1 表示的是服务器的标识也可以是其他数字,表示这是第几号服务器,这个标识要和下面我们配置的 myid 的标识一致可以。

    IP地址:13888 为集群中的 ip 地址,第一个端口表示的是 master 与 slave 之间的通信接口,默认是 2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口,默认是 3888

  3. 创建 myid 文件
    创建每个集群节点的 myidmyid 就是 server.1 的这个1,类似的,需要为集群中的每个服务都指定标识,使用 echo 命令进行创建

    # server.1
    echo "1" > /zookeeper根目录/data/myid
    # server.2
    echo "2" > /zookeeper根目录/data/myid
    # server.3
    echo "3" > /zookeeper根目录/data/myid
  4. 启动服务并测试
    在zookeeper的根目录下执行以下命令:

    ./bin/zkServer.sh start

    检查服务状态:

    ./bin/zkServer.sh status

zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。

Kafka集群搭建

  1. 准备条件
    安装步骤和zookeeper集群一样;安装完成后进入kafka的根目录,打开config文件夹。可以看到很多properties配置文件,这里主要关注server.properties文件即可。

  2. 修改配置项
    需要为 每个服务 都修改一下配置项,也就是server.properties,需要更新和添加的内容如下:

    broker.id=0 //初始是0,每个 server 的broker.id 都应该设置为不一样的,就和 myid 一样 我的三个服务分别设置的是 1,2,3
    log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log
    
    #在log.retention.hours=168 下面新增下面三项
    message.max.byte=5242880
    default.replication.factor=2
    replica.fetch.max.bytes=5242880
    
    #分节点配置
    listeners=PLAINTEXT://192.168.64.132:9092 //对应kafka所在服务器的ip地址
    
    #设置zookeeper的连接端口
    zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181

    配置项的含义:

    broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
    port=9092 #当前kafka对外提供服务的端口默认是9092
    host.name=192.168.1.7 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
    num.network.threads=3 #这个是borker进行网络处理的线程数
    num.io.threads=8 #这个是borker进行I/O处理的线程数
    log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
    socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
    socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
    socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
    num.partitions=1 #默认的分区数,一个topic默认1个分区数
    log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
    message.max.byte=5242880  #消息保存的最大值5M
    default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
    replica.fetch.max.bytes=5242880  #取消息的最大直接数
    log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
    log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
    log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
    zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #设置zookeeper的连接端口
  3. 启动Kafka集群并测试

    • 进入到kafka的根目录下:

      # 启动后台进程
      ./bin/kafka-server-start.sh -daemon ../config/server.properties
    • 检查服务是否启动

      # 执行命令 jps
      6201 QuorumPeerMain
      7035 Jps
      6972 Kafka
    • kafka已经启动,在kafka根目录下执行以下命令,创建topic验证是否创建成功

      bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic test
      

      参数说明:

      –replication-factor 2 复制两份

      –partitions 1 创建1个分区

      –topic 创建主题

      查看创建的主题:

      bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181
      

      启动一个服务就能把集群启动起来

    • 创建发布者

      # 创建一个broker,发布者
      ./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic
    • 创建订阅者

      # 创建一个consumer, 消费者
      bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
    • 其他命令
      显示topic

      bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181

      查看topic状态

      bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic

      Leader 负责给定分区的所有读取和写入的节点,每个节点都会通过随机选择成为 leader。

      Replicas 是为该分区复制日志的节点列表,无论它们是 Leader 还是当前处于活动状态。

      Isr 是同步副本的集合。它是副本列表的子集,当前仍处于活动状态并追随Leader。

      至此,kafka 集群搭建完毕。

  4. kafka集群特性说明

Kafka集群(Docker部署)

拉取镜像

docker pull wurstmeister/kafka

docker pull wurstmeister/zookeeper

启动zookeeper容器

docker run --name zookeeper -p 12181:2181 -d wurstmeister/zookeeper:latest

启动kafka容器

docker run -p 19092:9092 --name kafka1 -d -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=宿主机IP:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://宿主机IP:19092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest

docker run -p 19093:9093 --name kafka2 -d -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=宿主机IP:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://宿主机IP:19093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest

docker run -p 19094:9094 --name kafka3 -d -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=宿主机IP:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://宿主机IP:19094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest

进入容器进行测试

#在守护式容器中启动一个交互式进程
docker exec -i -t kafka3 /bin/bash
#创建主题test3
./kafka-topics.sh --zookeeper 192.168.64.132:12181 --create --topic test3 --replication-factor 1 --partitions 3 Created topic "test3"
#查看主题test3
./kafka-topics.sh --zookeeper 192.168.64.132:12181 --describe --topic test3
#向test1发消息
./kafka-console-producer.sh --broker-list 192.168.64.132:19092,192.168.64.132:19093,192.168.64.132:19094 --topic test1
#接收消息
./kafka-console-consumer.sh --bootstrap-server 192.168.64.132:19092,192.168.64.132:19093,192.168.64.132:19094 --topic test1 --from-beginning

认证机制

Kafka 为我们提供了这么多种认证机制,在实际使用过程中,我们应该如何选择合适的认证框架呢?下面我们就来比较一下。

目前来看,使用 SSL 做信道加密的情况更多一些,但使用 SSL 实现认证不如使用 SASL。毕竟,SASL 能够支持你选择不同的实现机制,如 GSSAPI、SCRAM、PLAIN 等。因此,我的建议是你可以使用 SSL 来做通信加密,使用 SASL 来做 Kafka 的认证实现。

SASL 下又细分了很多种认证机制,我们应该如何选择呢?

SASL/GSSAPI 主要是给 Kerberos 使用的。如果你的公司已经做了 Kerberos 认证(比如使用 Active Directory),那么使用 GSSAPI 是最方便的了。因为你不需要额外地搭建 Kerberos,只要让你们的 Kerberos 管理员给每个 Broker 和要访问 Kafka 集群的操作系统用户申请 principal 就好了。总之,GSSAPI 适用于本身已经做了 Kerberos 认证的场景,这样的话,SASL/GSSAPI 可以实现无缝集成。

而 SASL/PLAIN,就像前面说到的,它是一个简单的用户名 / 密码认证机制,通常与 SSL 加密搭配使用。注意,这里的 PLAIN 和 PLAINTEXT 是两回事。PLAIN 在这里是一种认证机制,而 PLAINTEXT 说的是未使用 SSL 时的明文传输。对于一些小公司而言,搭建公司级的 Kerberos 可能并没有什么必要,他们的用户系统也不复杂,特别是访问 Kafka 集群的用户可能不是很多。对于 SASL/PLAIN 而言,这就是一个非常合适的应用场景。总体来说,SASL/PLAIN 的配置和运维成本相对较小,适合于小型公司中的 Kafka 集群。

但是,SASL/PLAIN 有这样一个弊端:它不能动态地增减认证用户,你必须重启 Kafka 集群才能令变更生效。为什么呢?这是因为所有认证用户信息全部保存在静态文件中,所以只能重启 Broker,才能重新加载变更后的静态文件。

我们知道,重启集群在很多场景下都是令人不爽的,即使是轮替式升级(Rolling Upgrade)。SASL/SCRAM 就解决了这样的问题。它通过将认证用户信息保存在 ZooKeeper 的方式,避免了动态修改需要重启 Broker 的弊端。在实际使用过程中,你可以使用 Kafka 提供的命令动态地创建和删除用户,无需重启整个集群。因此,如果你打算使用 SASL/PLAIN,不妨改用 SASL/SCRAM 试试。不过要注意的是,后者是 0.10.2 版本引入的。你至少要升级到这个版本后才能使用。

img

SASL/SCRAM-SHA-256 配置实例

  1. 创建用户
    配置 SASL/SCRAM 的第一步,是创建能否连接 Kafka 集群的用户。在本次测试中,我会创建 3 个用户,分别是 admin 用户、writer 用户和 reader 用户。admin 用户用于实现 Broker 间通信,writer 用户用于生产消息,reader 用户用于消费消息。

    `shell
    $ cd kafka_2.12-2.3.0/
    $ bin/kafka-configs.sh –zookeeper localhost:2181 –alter –add-config ‘SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]’ –entity-type users –entity-name admin

    Completed Updating config for entity: user-principal ‘admin’.

$ bin/kafka-configs.sh –zookeeper localhost:2181 –alter –add-config ‘SCRAM-SHA-256=[password=writer],SCRAM-SHA-512=[password=writer]’ –entity-type users –entity-name writer

Completed Updating config for entity: user-principal ‘writer’.

$ bin/kafka-configs.sh –zookeeper localhost:2181 –alter –add-config ‘SCRAM-SHA-256=[password=reader],SCRAM-SHA-512=[password=reader]’ –entity-type users –entity-name reader

Completed Updating config for entity: user-principal ‘reader’.

查看创建的用户数据

$ bin/kafka-configs.sh –zookeeper localhost:2181 –describe –entity-type users –entity-name writer

Configs for user-principal ‘writer’ are SCRAM-SHA-512=salt=MWt6OGplZHF6YnF5bmEyam9jamRwdWlqZWQ=,stored_key=hR7+vgeCEz61OmnMezsqKQkJwMCAoTTxw2jftYiXCHxDfaaQU7+9/dYBq8bFuTio832mTHk89B4Yh9frj/ampw==,server_key=C0k6J+9/InYRohogXb3HOlG7s84EXAs/iw0jGOnnQAt4jxQODRzeGxNm+18HZFyPn7qF9JmAqgtcU7hgA74zfA==,iterations=4096,SCRAM-SHA-256=salt=MWV0cDFtbXY5Nm5icWloajdnbjljZ3JqeGs=,stored_key=sKjmeZe4sXTAnUTL1CQC7DkMtC+mqKtRY0heEHvRyPk=,server_key=kW7CC3PBj+JRGtCOtIbAMefL8aiL8ZrUgF5tfomsWVA=,iterations=4096


2. 创建JAAS文件

   单独为每个Broker机器创建一份JAAS 文件

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username=”admin”
password=”admin”;
};


   > 关于这个文件内容,你需要注意以下两点:不要忘记最后一行和倒数第二行结尾处的分号;JAAS 文件中不需要任何空格键。

   这里,我们使用 admin 用户实现 Broker 之间的通信。接下来,我们来配置 Broker 的 server.properties 文件,下面这些内容,是需要单独配置的:

   ```shell
   # 表明开启 SCRAM 认证机制,并启用 SHA-256 算法
   sasl.enabled.mechanisms=SCRAM-SHA-256
   # 为 Broker 间通信也开启 SCRAM 认证,同样使用 SHA-256 算法
   sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
   # 表示 Broker 间通信不配置 SSL
   security.inter.broker.protocol=SASL_PLAINTEXT
   # 设置 listeners 使用 SASL_PLAINTEXT,依然是不使用 SSL
   listeners=SASL_PLAINTEXT://localhost:9092
  1. 启动broker
    指定JAAS文件的位置

    $KAFKA_OPTS=-Djava.security.auth.login.config=<your_path>/kafka-broker.jaas bin/kafka-server-start.sh config/server1.properties
  2. 发送消息
    创建一个名为producer.conf的配置文件

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=SCRAM-SHA-256
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer";

    之后运行

    bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093 --topic test  --producer.config <your_path>/producer.conf
  3. 消费消息
    创建consumer.conf文件

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=SCRAM-SHA-256
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader";

    正常消费消息

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093 --topic test --from-beginning --consumer.config <your_path>/consumer.conf
  4. 动态增减用户

    $ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name writer
    Completed Updating config for entity: user-principal 'writer'.
    
    $ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name writer
    Completed Updating config for entity: user-principal 'writer'.
    
    $ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=new_writer]' --entity-type users --entity-name new_writer
    Completed Updating config for entity: user-principal 'new_writer'.
  5. springboot集成认证配置
    在springboot的配置文件中,添加kafka认证相关配置。需要注意的是,sasl.jaas.config 配置中的 ; 是必须的,否则会报错,usernamepassword则为上述操作中自己设置的即可

    spring:
      kafka:
        producer:
          properties:
            sasl.mechanism: SCRAM-SHA-256
            security.protocol: SASL_PLAINTEXT
            sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";
    

Kafka Manager

kafka-manager是目前最受欢迎的kafka集群管理工具,最早由雅虎开源,用户可以在Web界面执行一些简单的集群管理操作。具体支持以下内容:

  • 管理多个集群
  • 轻松检查群集状态(主题,消费者,偏移,代理,副本分发,分区分发)
  • 运行首选副本选举
  • 使用选项生成分区分配以选择要使用的代理
  • 运行分区重新分配(基于生成的分配)
  • 使用可选主题配置创建主题(0.8.1.1具有与0.8.2+不同的配置)
  • 删除主题(仅支持0.8.2+并记住在代理配置中设置delete.topic.enable = true)
  • 主题列表现在指示标记为删除的主题(仅支持0.8.2+)
  • 批量生成多个主题的分区分配,并可选择要使用的代理
  • 批量运行重新分配多个主题的分区
  • 将分区添加到现有主题
  • 更新现有主题的配置

kafka-manager 项目地址:https://github.com/yahoo/CMAK

  1. 下载并解压安装包(GitHub下载即可)

  2. sbt编译(需要安装sbt)

    在kafka-manager根目录下执行如下命令

    ./sbt clean dist

    编译完成后,对应的包会在kafka-manage的根目录下的 target/universal/kafka-manager-xxxx.zip

  3. 安装
    解压编译好后的 kafka-manager-xxxx.zip
    修改配置文件 conf/application.conf

    # 修改kafka-manager.zkhosts列表为自己的zk节点
    kafka-manager.zkhosts="192.168.64.132:2181,192.168.64.133:2181,192.168.64.134:2181"
    # 启动登录验证
    basicAuthentication.enabled=true
    # 设置用户名
    basicAuthentication.username="admin"
    # 设置密码
    basicAuthentication.password="admin"
    # 删除相关值可关闭对应功能
    application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]
  4. 启动服务
    启动zk集群,kafka集群,再启动kafka-manager服务。
    bin/kafka-manager 默认的端口是9000,可通过 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:

    nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000 &
  5. 参数说明

    • Enable JMX Polling
      是否开启 JMX 轮训,该部分直接影响部分 kafka broker 和 topic 监控指标指标的获取(生效的前提是 kafka 启动时开启了 JMX_PORT。主要影响如下之指标的查看:

    • Poll consumer information
      是否开启获取消费信息,直接影响能够在消费者页面和 topic 页面查看消费信息。

    • Enable Active OffsetCache
      是否开启 offset 缓存,决定 kafka-manager 是否缓存住 topic 的相关偏移量。

    其余参数说明

    | 参数名 | 参数说明 | 默认值 | 备注 |
    | :———————————–: | :————————————: | :——-: | :—————————————–: |
    | brokerViewUpdatePeriodSeconds | Broker视图周期更新时间/单位(s) | 30 | |
    | clusterManagerThreadPoolSize | 集群管理线程池大小 | 2 | |
    | clusterManagerThreadPoolQueueSize | 集群管理线程池列队大小 | 100 | |
    | KafkaCommandThreadPoolSize | Kafka命令线程池大小 | 2 | |
    | logkafkaCommandThreadPoolQueueSize | logkafka命令线程池列队大小 | 100 | |
    | logkafkaUpdatePeriodSeconds | Logkafka周期更新时间/单位(s) | 30 | |
    | partitionOffsetCacheTimeoutSecs | Partition Offset缓存过期时间/单位(s) | 5 | |
    | brokerViewThreadPoolSize | Broker视图线程池大小 | 8 | 3 * number_of_brokers |
    | brokerViewThreadPoolQueue Size | Broker视图线程池队列大小 | 1000 | 3 * total # of partitions across all topics |
    | offsetCacheThreadPoolSize | Offset缓存线程池大小 | 8 | |
    | offsetCacheThreadPoolQueueSize | Offset缓存线程池列队大小 | 1000 | |
    | kafkaAdminClientThreadPoolSize | Kafka管理客户端线程池大小 | 8 | |
    | kafkaAdminClientTheadPoolQueue Sizec | Kafka管理客户端线程池队列大小 | 1000 | |
    | kafkaManagedOffsetMetadataCheckMillis | Offset元数据检查时间 | 30000 | (这部分解释属自己理解) |
    | kafkaManagedOffsetGroupCacheSize | Offset组缓存大小 | 100000 | (这部分解释属自己理解) |
    | kafkaManagedOffsetGroupExpireDays | Offset组缓存保存时间 | 7 | (这部分解释属自己理解) |
    | Security Protocol | 安全协议 | PLAINTEXT | [SASL_PLAINTEXT,SASL_SSL,SSL] |

集群注意事项

  1. zookeeper的版本必须和kafka中的 lib 文件夹中的zookeeper版本一致,否则启动可能会报错

  2. 多个服务器部署时,注意关闭防火墙,或者开启对应的端口,否则集群间的同步或者选举无法正常进行

  3. springboot集成时,向未创建的topic发送消息时,默认会创建对应的topic,默认的分区数为1,副本为1

  4. kafka manager默认不开启JMX监控,如需开启,修改kafka根目录下的 bin/kafka-server-start.sh ,添加JXM_PORT参数

    if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then
    export KAFKA_HEAP_OPTS=”-Xmx1G -Xms1G”
    export JMX_PORT=”9999” // 添加JMX端口配置
    fi

    或者通过启动时添加端口设置

    JMX_PORT=9999 ./bin/kafka-server-start.sh config/server.properties

  5. 创建topic时,必须指定 partitionsreplication-factor 的数量; replication-factor 的数量不能大于broker的数量