• java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

  • java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

kafka-sth

##目录

  • 目录
  • kafka副本
    • 副本(replica)
    • 副本角色
    • 追随者副本的集合
    • unclean领导者选举
  • 请求处理流程
    • reactor模式
      • kafka中的reactor
  • rebalance流程
    • 触发
    • 通知
    • 消费者组的状态
    • 消费者重平衡的流程
    • broker端的重平衡
  • kafka控制器
    • 控制器如何被选出来
    • 控制器的作用
    • 控制器的数据
    • 控制器故障转移(Failover)
    • 控制器的内部设计
  • 高水位与leader-epoch
    • 水位
      • 高水位得作用
      • 高水位的更新
    • leader-epoch

kafka副本

副本机制(replication)的优势:

  1. 提供数据冗余,即使系统部分组件失效,系统依然能够运转.增加了整体可用性及数据持久性
  2. 提高伸缩性,支持横向扩展,通过增加机器的方式来提升读性能,进而提高操作吞吐量.
  3. 改善数据局部性,允许将数据放入与用户地理位置相近的地区,降低延时.

kafka只有1的优势

副本(replica)

kafka按照topic分为若干分区,副本的概念实际上是在分区层级下定义,每个分区配置有若干个副本.
即每个分区配置有若干副本.

所谓副本(replica) 本至上市一个只能追加写消息的提交日志.
根据kafka副本机制定义,同一个分区下的所有副本保存有相同的消息序列.这些副本分散保存在不同的 Broker 上,从而能够对抗部分 Broker 宕机带来的数据不可用。

在实际生产环境中,每台 Broker 都可能保存有各个主题下不同分区的不同副本,因此,单个 Broker 上存有成百上千个副本的现象是非常正常的。

副本角色

类型: 基于领导者(leader-based)的副本机制

kafak中,副本分为领导者副本(leader replica)与追随者副本(follower replica)
kafka 追随者副本不对外提供任何读写.它只从领导者副本异步拉取数据
当领导者副本挂掉,kafka依托于zookeeper的监控能实时感知到,开启新一轮的领导者选举,作为新的leader,老的leader副本回归,会作为追随者副本.

追随者不提供对外服务的设计优势:

  1. 方便实现read-your-write
  2. 方便实现单调读(monotonic reads)

追随者副本的集合

in-sync-replicasISR 中的副本都是与 Leader 同步的副本,相反,不在 ISR 中的追随者副本就被认为是与 Leader 不同步的.Leader 副本天然就在 ISR 中。也就是说,ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本。

Kafka 判断 Follower 是否与 Leader 同步的标准:
replica.lag.time.max.ms,这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。这就是说,只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。ISR 是一个动态调整的集合,而非静态不变的。

unclean领导者选举

uncliean leader election

既然 ISR 是可以动态调整的,那么自然就可以出现这样的情形:ISR 为空。因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。

Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。

在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。

CAP 理论:一个分布式系统通常只能同时满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)中的两个。显然,在这个问题上,Kafka 赋予你选择 C 或 A 的权利。

请求处理流程

kafka自己定义了一组请求协议,用于实现各种交互操作.常见的produce请求是用于生产消息的.fetch请求时用于消费消息的.metadata请求获取元数据.所有请求都是通过tcp网络以socket方式进行通讯

reactor模式

reactor模式是事件驱动架构的一种实现方式.适用于处理多个客户端并发向服务器端发送请求的场景.

epoll是一种io模型,reactor是一种io处理模式.

多个客户端发送请求给reactor,reactor有个请求分发线程dispatcher,会将不同的请求下发到多个工作线程中处理.

kafka中的reactor

kafka中的broker端有个SocketServer组件,类似于reactor中的dispatcher,也有对应的acceptor(dispatcher)线程和一个工作线程池(kafka中,叫做网络线程池).
kafka提供了broker端参数num.network.threads调整网络线程的线程数(默认值是3).

acceptor采用轮询的方式将入站请求公平的分发到所有网络中.但是网络线程接收到请求之后,又做了一层异步线程池的处理.

网络线程池接收到请求后,将请求放入一个共享队列中.broker有个io线程池,负责该队的取出奥做.如果是produce生产请求,则将消息写入磁盘日志中,如果是fetch请求,则从磁盘或者页缓存中读取请求.

io线程池中的线程才是执行请求逻辑的线程.num.io.trheads控制了io线程的线程数,默认为8.当io线程处理完毕后,会将生成的响应发送到网络线程池的响应队列中.

rebalance流程

触发

rebalance触发条件:

  1. 组成员变化(最常见,每次消费者组启动,必然触发)
  2. 订阅主题数量变化(正则表达式的topic)
  3. 订阅主题分区数发生变化(分区数增加)

通知

kafka java消费者定时发送心跳请求(heartbeat request)到broker端的协调者.
重平衡的通知机制,就是通过心跳请求线程实现.开启新一轮rebalance后,会将REBALANCE_IN_PROGRESS封装进心跳请求的响应中,发送给消费者实例.

消费者端参数heartbeat.interval.ms的真正作用就是控制重平衡的通知频率.

消费者组的状态

重平衡开启,broker端的借条这组件开始控制消费者组的状态流转.

Kafka 设计了一套消费者组状态机(State Machine),来帮助协调者完成整个重平衡流程。

目前,Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。

状态 含义
empty 组内没有任何成员,消费者组可能存在已提交的位移数据,而且这些位移尚未过期
dead 组内没有任何成员,但组的元数据信息已经在协调者端被移除.协调者组件保存着当前向它注册过的所有组信息,所谓元数据信息就类似这个这注册信息
preparingRebalance 消费者组准备开启重平衡,所有成员要重新请求加入消费者组
completingRebalance 消费者组下的所有成员已经加入,各个成员正在等待分配方案.
stable 消费者组的稳定状态.重平衡已经完成,可以正常消费了

消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。

当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。

当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。

在 Kafka 的日志中

Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds.
kafka开始定期尝试删除过期位移.只有empty状态的组,才会志馨过期位移删除操作.

消费者重平衡的流程

在消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。

当组内成员加入组时,它会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。

消费者的领导者它不是副本,也不是协调者,它的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。

选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统一做出分配方案后,进入到下一步:发送 SyncGroup 请求。

broker端的重平衡

新成员加入
新成员入组是指组处于 Stable 状态后,有新成员加入。

当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。

成员的主动离组
主动离组:消费者组实例所在线程或者进程调用close(),发送LeaveGroup请求

向协调者发送离组之后,协调者通知preparereBalance.

组成员崩溃离组
消费者出现故障,宕机离组

协调者等待一段时间后,感知到离组,由消费者端参数session.timeout.ms控制.

重平衡时协调者对组内成员提交的位移

kafka控制器

控制器组件(controller),是apache kafka的核心组件.主要作用是 apache zookeeper的帮助下管理和协调整个kafka集群.

集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。换句话说,每个正常运转的 Kafka 集群,在任意时刻都有且只有一个控制器。官网上有个名为 activeController 的 JMX 指标,可以帮助我们实时监控控制器的存活状态。这个 JMX 指标非常关键,在实际运维操作过程中,一定要实时查看这个指标的值
控制器是重度依赖ZooKeeper

控制器如何被选出来

Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器。

控制器的作用

主题管理
执行 kafka-topics 脚本时,大部分的后台工作都是控制器来完成的。

分区重分配
kafka-reassign-partitions 脚本

preferred领导者选举
Preferred 领导者选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案

集群成员管理(broker增加、关闭、宕机)
控制器组件会利用 Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点.
zk通过watch机制通知控制器.

数据服务
控制器的最后一大类工作,就是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。

控制器的数据

  • 所有主题信息。包括具体的分区信息,比如领导者副本是谁,ISR 集合中有哪些副本等。
  • 所有 Broker 信息。包括当前都有哪些运行中的 Broker,哪些正在关闭中的 Broker 等。
  • 所有涉及运维任务的分区。包括当前正在进行 Preferred 领导者选举以及分区重分配的分区列表。

控制器故障转移(Failover)

故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka 能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器。这个过程就被称为 Failover,该过程是自动完成的,无需你手动干预。

最开始时,Broker 0 是控制器。当 Broker 0 宕机后,ZooKeeper 通过 Watch 机制感知到并删除了 /controller 临时节点。之后,所有存活的 Broker 开始竞选新的控制器身份。Broker 3 最终赢得了选举,成功地在 ZooKeeper 上重建了 /controller 节点。之后,Broker 3 会从 ZooKeeper 中读取集群元数据信息,并初始化到自己的缓存中。至此,控制器的 Failover 完成,可以行使正常的工作职责了。

控制器的内部设计

控制器内部引入一个事件处理线程,统一处理各种控制器事件,然后控制器将执行的操作建模成一个个独立事件,发送到专属的事件队列中,供此县城消费(单线程+队列的实现)

针对控制器的第二个改进就是,将之前同步操作 ZooKeeper 全部改为异步操作。ZooKeeper 本身的 API 提供了同步写和异步写两种方式。之前控制器操作 ZooKeeper 使用的是同步的 API,性能很差,集中表现为,当有大量主题分区发生变更时,ZooKeeper 容易成为系统的瓶颈。新版本 Kafka 修改了这部分设计,完全摒弃了之前的同步 API 调用,转而采用异步 API 写入 ZooKeeper,性能有了很大的提升。根据社区的测试,改成异步之后,ZooKeeper 写入提升了 10 倍!

之前 Broker 对接收的所有请求都是一视同仁的,不会区别对待。这种设计对于控制器发送的请求非常不公平,因为这类请求应该有更高的优先级。
简单的例子,假设我们删除了某个主题,那么控制器就会给该主题所有副本所在的 Broker 发送一个名为 StopReplica 的请求。如果此时 Broker 上存有大量积压的 Produce 请求,那么这个 StopReplica 请求只能排队等。如果这些 Produce 请求就是要向该主题发送消息的话,这就显得很讽刺了:主题都要被删除了,处理这些 Produce 请求还有意义吗?此时最合理的处理顺序应该是,赋予 StopReplica 请求更高的优先级,使它能够得到抢占式的处理。(自 2.2 开始,Kafka 正式支持这种不同优先级请求的处理。简单来说,Kafka 将控制器发送的请求与普通数据类请求分开,实现了控制器请求单独处理的逻辑)

一个小窍门。当你觉得控制器组件出现问题时,比如主题无法删除了,或者重分区 hang 住了,你不用重启 Kafka Broker 或控制器。有一个简单快速的方式是,去 ZooKeeper 中手动删除 /controller 节点。具体命令是 rmr /controller。这样做的好处是,既可以引发控制器的重选举,又可以避免重启 Broker 导致的消息处理中断。

高水位与leader-epoch

依托于高水位,Kafka 既界定了消息的对外可见性,又实现了异步的副本同步机制。
其设计上的缺陷给 Kafka 留下了很多数据丢失或数据不一致的潜在风险。

水位

水位一词多用于流式处理领域
在时刻 T,任意创建时间(Event Time)为 T’,且 T’≤T 的所有事件都已经到达或被观测到,那么 T 就被定义为水位。

Kafka 的水位不是时间戳,更与时间无关。它是和位置信息绑定的,具体来说,它是用消息位移来表征的。另外,Kafka 源码使用的表述是高水位

高水位得作用

  1. 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
  2. 帮助 Kafka 完成副本同步。

在分区高水位以下的消息被认为是已提交消息,反之就是未提交消息。消费者只能消费已提交消息,即图中位移小于 8 的所有消息。注意,这里我们不讨论 Kafka 事务,因为事务机制会影响消费者所能看到的消息的范围,它不只是简单依赖高水位来判断。它依靠一个名为 LSO(Log Stable Offset)的位移值来判断事务型消费者的可见性。

位移值等于高水位的消息也属于未提交消息。也就是说,高水位上的消息是不能被消费者消费的。

同一个副本对象,其高水位值不会大于 LEO 值。

高水位和 LEO 是副本对象的两个重要属性。Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。

高水位的更新

leader-epoch

Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。

Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。基于此,社区在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。
副本是否执行日志截断不再依赖于高水位进行判断。

所谓 Leader Epoch,我们大致可以认为是 Leader 版本。

  1. Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
  2. 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。
zookeeper简介
kafka-运维
  1. 1. kafka副本
    1. 1.1. 副本(replica)
    2. 1.2. 副本角色
    3. 1.3. 追随者副本的集合
    4. 1.4. unclean领导者选举
  2. 2. 请求处理流程
    1. 2.1. reactor模式
      1. 2.1.1. kafka中的reactor
  3. 3. rebalance流程
    1. 3.1. 触发
    2. 3.2. 通知
    3. 3.3. 消费者组的状态
    4. 3.4. 消费者重平衡的流程
    5. 3.5. broker端的重平衡
  4. 4. kafka控制器
    1. 4.1. 控制器如何被选出来
    2. 4.2. 控制器的作用
    3. 4.3. 控制器的数据
    4. 4.4. 控制器故障转移(Failover)
    5. 4.5. 控制器的内部设计
  5. 5. 高水位与leader-epoch
    1. 5.1. 水位
      1. 5.1.1. 高水位得作用
      2. 5.1.2. 高水位的更新
    2. 5.2. leader-epoch
© 2023 haoxp
Hexo theme