• java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

  • java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

kafka-生产者,消费者与消息

目录

  • 目录
  • 生产者
    • 生产者的分区策略
    • kafka消息格式与压缩
      • kafka消息格式的历史
      • 消息的压缩
      • 消息的解压
      • 压缩的算法
      • 压缩的实践
    • 拦截器
      • 拦截器应用场景
    • 生产者tcp链接
  • 消息
    • 消息无丢失
      • 消息丢失
      • 最佳实践
    • 消息幂等性
      • 消息交付可靠性保障
      • 幂等性producer
      • 事务型producer
  • 消费者
    • 消费者组
    • Coordinator
    • rebalance过程
      • 触发条件
      • rebalance弊端
      • 非必要rebalance
    • 位移
      • consumer位移提交方式
      • 位移提交
        • 自动提交
        • 手动提交
      • 更精细化的位移管理
      • 提交失败异常
    • 并发消费者
    • 消费者的tcp链接
    • 消费者组消费的进度监控
      • 监控方案

生产者

生产者的分区策略

所谓分区策略即发送消息到哪个分区

Kafka 默认分区策略
java客户端默认的生产者分区策略的实现类为org.apache.kafka.clients.producer.internals.DefaultPartitioner。默认策略为:如果指定了partition就直接发送到该分区;如果没有指定分区但是指定了key,就按照key的hash值选择分区;如果partition和key都没有指定就使用轮询策略。而且如果key不为null,那么计算得到的分区号会是所有分区中的任意一个;如果key为null并且有可用分区时,那么计算得到的分区号仅为可用分区中的任意一个

自定义分区策略
自定义分区策略需要显示设置Partioner接口,通常只实现其int partion(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)接口即可.
topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前 Kafka 集群共有多少主题、多少 Broker 等)。
常见的分区策略

  1. 轮询
  2. 随机
  3. 按消息key保存序列
    kafka允许为每条消息定义消息键,简称key.如果定义了key,可以保证同一个key的所有消息都进入一个分区.

kafka消息格式与压缩

producer压缩—>broker保持—>consumer解压

kafka消息的分层为消息集合(message set),消息(message)

消息集合:一个消息集合中包含若干条日志项(record item),日志项是真正封装消息的地方.
kafka底层消息日志由一系列消息集合日志项组成.
kafka通常不会直接操作一条条具体的消息,而是在集合这个层面上

kafka消息格式的历史

v1,v2.
v2对v1的改进:

  1. 把消息的公共部分抽取出来放到外层消息集合里面
  2. 保存压缩消息的方法发生了变化。之前 V1 版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而 V2 版本的做法是对整个消息集合进行压缩。显然后者应该比前者有更好的压缩效果。

消息的压缩

生产者程序中配置compression.type参数即表示启用指定类型的压缩算法。

broker中同样有个compression.type参数.若与producer不同,则会先解压再压缩

若消息格式不同,为了兼容格式,也会解压和重新压缩.

消息的解压

消费者中解压.

压缩的算法

2.1.0之前支持 gZip,Snappy和LZ4.
2.1.0之后支持Zstandard

对于 Kafka 而言,
吞吐量:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。
物理资源:使用 Snappy 算法占用的网络带宽最多,zstd 最少,zstd就是要提供超高的压缩比;
CPU使用率:各个算法表现得差不多,只是在压缩时Snappy算法使用的 CPU 较多一些,而在解压缩时GZIP算法则可能使用更多的 CPU。

压缩的实践

启用压缩的一个条件就是 Producer 程序运行机器上的 CPU 资源要很充足。如果 Producer 运行机器本身 CPU 已经消耗殆尽了,那么启用消息压缩无疑是雪上加霜,只会适得其反.

如果你的环境中带宽资源有限,那么我也建议你开启压缩。事实上我见过的很多 Kafka 生产环境都遭遇过带宽被打满的情况。这年头,带宽可是比 CPU 和内存还要珍贵的稀缺资源,毕竟万兆网络还不是普通公司的标配,因此千兆网络中 Kafka 集群带宽资源耗尽这件事情就特别容易出现。如果你的客户端机器 CPU 资源有很多富余,我强烈建议你开启 zstd 压缩,这样能极大地节省网络资源消耗。

拦截器

拦截器:基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。

kafka拦截器分为生产者拦截器和消费者拦截器。

生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑
消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑

参数interceptor.classes:指定拦截器类时要指定它们的全限定名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface ProducerInterceptor<K, V> extends Configurable {
// 发送前调用
ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);

// 在成功提交或发送失败.早于发送的callback调用.和onsend不在同一个线程.
// 在producer发送的主路径,不要做过于复杂的处理.不然tps下降
void onAcknowledgement(RecordMetadata var1, Exception var2);

void close();
}

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
// 正式处理前调用
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);

// 提交位移之后调用.
void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);

void close();
}

拦截器应用场景

Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景。

消息堆积可能原因如下:

  1. 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS;
  2. consumer消费性能低,查一下是否有很重的消费逻辑(比如拿到消息后写HDFS或HBASE这种逻辑就挺重的),看看是否可以优化consumer TPS;
  3. 确保consumer端没有因为异常而导致消费hang住;
  4. 如果使用的是消费者组,确保没有频繁地发生rebalance

生产者tcp链接

在创建kafka producer 实例时,生产者会在后台创建sender线程.

创建的producer会链接bootstrap.servers内所有broker(通常指的3-4台即可,不必全部填写.).

KafkaProducer 实例创建的线程和Sender线程共享的可变数据结构只有RecordAccumulator类,故维护了 RecordAccumulator 类的线程安全,也就实现了KafkaProducer类的线程安全。

RecordAccumulator主要的数据结构是一个 ConcurrentMap。TopicPartition 是 Kafka 用来表示主题分区的 Java 对象,本身是不可变对象。而 RecordAccumulator 代码中用到 Deque 的地方都有锁的保护,所以基本上可以认定 RecordAccumulator 类是线程安全的。

TCP 连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没有连接,那么它就会创建一个 TCP 连接。同样地,当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,也会创建一个。

Producer 端关闭 TCP 连接的方式有两种:一种是用户主动关闭(kill -9,producer.close());一种是 Kafka 自动关闭。Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。用户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1,TCP 连接将成为永久长连接。当然这只是软件层面的“长连接”机制,由于 Kafka 创建的这些 Socket 连接都开启了 keepalive,因此 keepalive 探活机制还是会遵守的。TCP 连接是在 Broker 端被关闭的,但其实这个 TCP 连接的发起方是客户端,因此在 TCP 看来,这属于被动关闭的场景,即 passive close。被动关闭的后果就是会产生大量的 CLOSE_WAIT 连接,因此 Producer 端或 Client 端没有机会显式地观测到此连接已被中断。成为“僵尸”连接。

消息

消息无丢失

Kafka 只对已提交的消息(committed message)做有限度的持久化保证。

已提交的消息:当Kafka的若干个Broker成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。
那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个 Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。

有限度的持久化保证:消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失。

Kafka 是能做到不丢失消息的,只不过这些消息必须是已提交的消息,而且还要满足一定的条件。

消息丢失

  1. 生产者丢失数据

一个 Producer 应用向 Kafka 发送消息,最后发现 Kafka 没有保存.

目前 Kafka Producer 是异步发送消息的,也就是说如果你调用的是producer.send(msg)这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。这种发送方式叫做fire and forget,如果发生消息丢失,则调用方无感知.

丢失原因: 网络抖动,消息本身不合格.
解决方案:永远使用带有回调通知的api,使用producer.send(msg,callBack)

  1. 消费者丢失数据

Consumer 端丢失数据主要体现在 Consumer 端要消费的消息不见了。

丢失原因: Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。先移动offset,再消费消息.中间重启了,那么之前offset和移动后的offset中间的数据就丢失了.

解决方案: 维持先消费消息(阅读),再更新位移(书签)的顺序.这种处理方式可能带来的问题是消息的重复处理,但这不属于消息丢失的情形。

  1. 消费者多线程丢失数据

场景:消费者开启多线程进行消费数据,其中开启新线程立刻他更新位移,然后消费失败.
解决方案: 如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移.
单个 Consumer 程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。

最佳实践

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  5. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
  8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

消息幂等性

消息交付可靠性保障

所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:

  1. 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  2. 至少一次(at least once):
  3. 消息不会丢失,但有可能被重复发送。精确一次(exactly once):消息不会丢失,也不会被重复发送。

kafka默认提供的交付可靠性保障为第二种.Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。

幂等性producer

Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。

1
2
3
props.put(“enable.idempotence”, ture)
or
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。

enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。Kafka 自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。

首先,它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。那么你可能会问,如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!

事务型producer

Kafka 的事务概念类似于我们熟知的数据库提供的事务。在数据库领域,事务提供的安全性保障是经典的 ACID,即原子性(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。隔离性表明并发执行的事务彼此相互隔离,互不影响。

开启事务型producer

和幂等性 Producer 一样,开启 enable.idempotence = true。
设置 Producer 端参数 transactional. id。最好为其设置一个有意义的名字。

1
2
3
4
5
6
7
8
9
10

producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}

以上代码,record1,record2都会被统一写入到kafka.
要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。因此在 Consumer 端,读取事务型 Producer 发送的消息也是需要一些变更的。

修改起来也很简单,设置 isolation.level 参数的值即可。
当前这个参数有两个取值:

  • read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
  • read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

消费者

消费者组

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。

  1. consumer group 可以有一个或多个consumer实例。
  2. groupid是一个字符串,在一个kafka集群
  3. consumer group下所有实例订阅的主题的单个分区,只能分配给组内的某个consumer实例消费(也可以被其他group消费)

consumer group能够订阅相同的一组主题而互不干涉.
Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引擎系统的两大模型
如果所有实例都属于同一个 Group,那么它实现的就是消息队列模型;
如果所有实例分别属于不同的 Group,那么它实现的就是发布 / 订阅模型。

在了解了 Consumer Group 以及它的设计亮点之后,你可能会有这样的疑问:在实际使用场景中,理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。(可小,不推荐大于,多的consumer会一直空闲)

offset对于consumer group而言,是一组kv对.key是分区,v是对应consumer消费该分区的最新位移.(老版本的consumer group将位移信息保存在zk中)

Coordinator

Coordinator 协调者专为consumer group服务,负责group执行rebalance,以及提供唯一管理和组成员管理.

Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。

consumer group定位coordinator
Kafka 为某个 Consumer Group 确定 Coordinator 所在的 Broker 的算法有 2 个步骤。

  1. 确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
  2. 找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。

rebalance过程

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。

比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance

触发条件

  1. 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
  2. 订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  3. 订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

rebalance弊端

  1. 影响consumer端tps.rebalance对consumer group消费过程影响大.在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
  2. rebalance很慢
  3. rebalance效率低.Group 下所有的 Consumer 实例都会协调在一起共同参与。

非必要rebalance

rebalance触发的条件1和2都为主动操作,难以避免.
且一般情况的rebalance都是第一种情况.

组成员增加:向group增加consumer示例,coordinator回接纳新实例.需要重新分配分区.此种情况无需规避rebalance.

组成员减少:手动停止consumer不必多说.有些情况会让coordinator错误地认为consumer已停止而被踢出group
consumer group完成rebalance之后,会定时发心跳,session.timeout.ms默认为10s.session.timeout.ms 决定了 Consumer 存活性的时间间隔.Consumer 还提供了一个允许你控制发送心跳请求频率的参数,就是heartbeat.interval.ms,Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即max.poll.interval.ms参数。默认为5分钟.

非必要的rebalance

  1. consumer未及时发送心跳,导致consumer被下线group
    • 设置session.timeout.ms = 6s。要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
    • 设置heartbeat.interval.ms = 2s。
  2. consumer消费时间过长

问题排查: 如果发现了rebalance,可以排查一下consumer端的gc表现.是否频繁的full gc导致过长时间停顿.从而引发了rebalance

位移

kafka内部主题(__consumer_offsets)
位移管理机制其实也很简单,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 consumer_offsets 中。可以这么说,consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。
提交过程不仅要实现高持久性,还要支持高频的写操作。

位移主题是一个普通的 Kafka 主题,它的消息格式却是 Kafka 自己定义的,用户不能修改.
Kafka Consumer 有 API 帮你提交位移,也就是向位移主题写消息.消息中得key包含 consumer group id,主题,分区号.其值有三种格式

  1. 位移值,以及位移提交得一些其他元数据(时间戳等).
  2. 用于保存group信息得消息(用来注册 Consumer Group)
  3. 用于删除group过期位移甚至删除group消息(tombstone 消息,即墓碑消息,也称 delete mark,消息体是null),一旦某个 Consumer Group 下的所有 Consumer 实例都停止了,而且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写入 tombstone 消息,表明要彻底删除这个 Group 的信息。

如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3
当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。我们说过,位移主题就是普通的 Kafka 主题,那么它自然也有对应的分区数。

如果是 Kafka 自动创建的,分区数是怎么设置的呢?这就要看Broker端参数offsets.topic.num.partitions的取值了。它的默认值是 50,因此 Kafka 会自动创建一个 50 分区的位移主题。如果你曾经惊讶于 Kafka 日志路径下冒出很多 __consumer_offsets-xxx 这样的目录,那么现在应该明白了吧,这就是 Kafka 自动帮你创建的位移主题。
副本数是Broker 端另一个参数 offsets.topic.replication.factor 要做的事情了。它的默认值是 3。

consumer位移提交方式

自动提交与手动提交

enable.auto.commit 是否启用自动提交

auto.commit.interval.ms 提交间隔

使用手动提交则需要使用Kafka Consumer API 为你提供了位移提交的方法,如consumer.commitSync等。

自动提交得弊端:

  1. 丧失了灵活性和可控性
  2. 只要consumer开启,就会不断写入唯一消息.

Compact 策略位移消息整理删除策略.
对同一个key得m1,m2消息,m1发送早于m2,则m1为过期数据.compact过程就提出过期得消息.把剩下得消息整理在一起.

Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。如果位移主题无线膨胀,优先检查log cleaner线程状态.可能是线程挂掉导致.

位移提交

consumer向kafka汇报自己的位移数据,汇报过程被称为提交位移
Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。

自动提交

自动提交的顺序
一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于,它可能会出现重复消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
手动提交

同步提交
KafkaConsumer#commitSync()。该方法会提交 KafkaConsumer#poll() 返回的最新位移。同步操作,会一直等待,直到成功提交才返回.如果提交过程发生异常,则被抛出.

调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。

1
2
3
4
5
6
7
8
9
10
11

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}

异步提交
KafkaConsumer#commitAsync(),调用该方法后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。

commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

1
2
3
4
5
6
7
8
9
10

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}

同步异步组合使用:

  1. 利用同步提交规避瞬时错误,网络抖动,broker端gc.
  2. 不希望程序总处于阻塞状态.影响tps
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17

    try {
    while(true) {
    ConsumerRecords<String, String> records =
    consumer.poll(Duration.ofSeconds(1));
    process(records); // 处理消息
    commitAysnc(); // 使用异步提交规避阻塞
    }
    } catch(Exception e) {
    handle(e); // 处理异常
    } finally {
    try {
    consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
    } finally {
    consumer.close();
    }
    }

    更精细化的位移管理

    Kafka Consumer API 还提供了一组更为方便的方法,可以帮助你实现更精细化的位移管理功能。
    对于一次要处理很多消息的 Consumer 而言,它会关心社区有没有方法允许它在消费的中间进行位移提交。(大量消息,分批次提交)

commitSync(Map) 和 commitAsync(Map)。它们的参数是一个 Map 对象,键就是 TopicPartition,即消费的分区,而值是一个 OffsetAndMetadata 对象,保存的主要是位移数据。就拿刚刚提过的那个例子来说,如何每处理 100 条消息就提交一次位移呢?在这里,我以 commitAsync 为例,展示一段代码,实际上,commitSync 的调用方法和它是一模一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是null
count++;
}
}

提交失败异常

CommitFailedException:Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

这段话前半部分的意思是,本次提交位移失败了,原因是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例。出现这个情况的原因是,你的消费者实例连续两次调用 poll 方法的时间间隔超过了期望的 max.poll.interval.ms 参数值。这通常表明,你的消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法。
解决方案:

  1. 增加期望的时间间隔max.poll.interval.ms参数值
  2. 减少 poll 方法一次性返回的消息数量,即减少 max.poll.records参数值

消息处理超时导致异常

  1. 缩短单条处理时间
  2. 调整max.poll.interval.ms,该值默认为5min
  3. 减少下游一次性poll消息的总数.max.poll.records默认为500
  4. 使用多线程处理(apache flink集成kafka时,就是创建了多个kafka consumer thread线程)

错误的group idgroup id冲突也可能导致commit failed exception.(单机版consumer也需要设置group id,若与某个consumer集群的group id一样,则会抛出此错误)

并发消费者

KafkaConsumer不是线程安全的.所有的网络io处理都是在用户主线程中.不能在多个线程中共享同一个consumer.否则会抛出ConcurrentModificationException.
不过KafkaConsumer中的wakeup()方法是线程安全的.可以调用KafkaConsumer.wakeup()来唤醒consumer.

并发的方案:

  1. 消费者启动多个线程.每个线程维护自己的consumer.(线程数受限分区数,处理超时rebalance.占用资源多)
  2. 消费者使用单线程或多线程获取消息(单个或多个线程维护自己的consumer),同时创建多个消费现场执行消息处理逻辑.(处理链路长,不易于位移提交.)实践可参考Kafka Connector或者rocket mq中client 并发处理的方案.

消费者的tcp链接

发起tcp链接
consumer的tcp链接是在调用poll()的时候创建的.

  1. 发起FindCoordinator请求,首次调用poll()会像集群发送一个findCoodrinator的请求.
  2. 链接协调者,获得findCoordinator响应,会向broker简历socket链接.
  3. 消费数据.消费者会为每个要消费的分区创建与该分区领导者副本所在的broker链接的tcp.(此类创建完毕后会放弃第一类)

关闭tcp
手动调用 KafkaConsumer.close() 方法,或者是执行 Kill 命令
kafka自动关闭是由消费者端参数 connection.max.idle.ms控制.默认为9min

消费者的tcp链接一旦断开,将触发rebalance.实际场景中,很多都将connection.max.idle.ms 设置成 -1,但是,tcp链接不会被定期清除,变成永久的僵尸链接.

消费者组消费的进度监控

滞后度(消费者lag):通常来说,Lag 的单位是消息数

监控方案

kafka-consumer-groups

1
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group id>

JavaAPI

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32


public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
try {
Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自动提交位移
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 处理中断异常
// ...
return Collections.emptyMap();
} catch (ExecutionException e) {
// 处理ExecutionException
// ...
return Collections.emptyMap();
} catch (TimeoutException e) {
throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
}
}
}

Kafka JMX

Kafka 消费者提供了一个名为 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指标,里面有很多属性。和我们今天所讲内容相关的有两组属性:records-lag-max 和 records-lead-min

这里的 Lead 值是指消费者最新消费消息的位移与分区当前第一条消息位移的差值。很显然,Lag 和 Lead 是一体的两个方面:Lag 越大的话,Lead 就越小,反之也是同理。

Kafka 消费者还在分区级别提供了额外的 JMX 指标,用于单独监控分区级别的 Lag 和 Lead 值。JMX 名称为:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”。

kafka-认证与授权
索引
  1. 1. 目录
  2. 2. 生产者
    1. 2.1. 生产者的分区策略
    2. 2.2. kafka消息格式与压缩
      1. 2.2.1. kafka消息格式的历史
      2. 2.2.2. 消息的压缩
      3. 2.2.3. 消息的解压
      4. 2.2.4. 压缩的算法
      5. 2.2.5. 压缩的实践
    3. 2.3. 拦截器
      1. 2.3.1. 拦截器应用场景
    4. 2.4. 生产者tcp链接
  3. 3. 消息
    1. 3.1. 消息无丢失
      1. 3.1.1. 消息丢失
      2. 3.1.2. 最佳实践
    2. 3.2. 消息幂等性
      1. 3.2.1. 消息交付可靠性保障
      2. 3.2.2. 幂等性producer
      3. 3.2.3. 事务型producer
  4. 4. 消费者
    1. 4.1. 消费者组
    2. 4.2. Coordinator
    3. 4.3. rebalance过程
      1. 4.3.1. 触发条件
      2. 4.3.2. rebalance弊端
      3. 4.3.3. 非必要rebalance
    4. 4.4. 位移
      1. 4.4.1. consumer位移提交方式
      2. 4.4.2. 位移提交
        1. 4.4.2.1. 自动提交
        2. 4.4.2.2. 手动提交
      3. 4.4.3. 更精细化的位移管理
      4. 4.4.4. 提交失败异常
    5. 4.5. 并发消费者
    6. 4.6. 消费者的tcp链接
    7. 4.7. 消费者组消费的进度监控
      1. 4.7.1. 监控方案
© 2023 haoxp
Hexo theme