• java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

  • java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

kafka-运维

目录

  • 目录
  • 主题管理
    • 主题日常管理
    • 创建
      • 查询
      • 查询主题的详细数据
      • 修改分区
      • 修改主题级别参数
      • 变更副本数
      • 修改主题限速
      • 主题分区迁移
      • 删除主题
    • 特殊主题的管理与运维
    • 删除主题失败的处理
    • consumer_offsets占用太多磁盘
  • kafka动态配置
    • 动态broker参数配置
    • 使用场景
    • 动态参数的保存
    • 常用的几个值
      • 日志保留时间
      • 线程数
      • 与ssl相关的参数
      • follower拉数据线程数
  • 重设消费者组位移
    • 为什么重设消费者位移组
    • 重设位移策略
    • 重设位移的实现
      • earliest
      • latest
      • current
      • specified-offset
      • shift-by-n
      • datetime策略
      • duration策略
  • 常用工具及脚本
    • kafka-broker-api-versions
    • 生产消息
    • 消费消息
    • 测试生产者性能
    • 测试消费者性能
    • 查看主题消息总数
    • 查看消息文件数据
    • 消费者组位移

主题管理

主题日常管理

kafka-topics,kafka-reassign-partitions,kafka-configs脚本

创建

1
2
3
#从 Kafka 2.2 版本开始,社区推荐用 --bootstrap-server 参数替换 --zookeeper 参数.--zookeeper也显示地被标记为弃用
# --zookeeper会绕过kafka的安全体系,即使为kafka设置了安全认证,限制了主题创建.如果使用--zookeeper也会创建成功.不受认证体系限制.
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1

查询

1
bin/kafka-topics.sh --bootstrap-server broker_host:port --list

查询主题的详细数据

1
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>

如果 describe 命令不指定具体的主题名称,那么 Kafka 默认会返回所有“可见”主题的详细数据给你
可见指发起这个命令的用户能够看到的kafka主题.

修改分区

1
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>

修改主题级别参数

1
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760

这个脚本也能指定 –bootstrap-server 参数,只是它是用来设置动态参数的。

变更副本数

修改主题限速

这里主要是指设置 Leader 副本和 Follower 副本使用的带宽。有时候,我们想要让某个主题的副本在执行副本同步机制时,不要消耗过多的带宽。Kafka 提供了这样的功能。

先设置 Broker 端参数 leader.replication.throttled.rate 和 follower.replication.throttled.rate,命令如下:

1
2
3
# entity-name:broker-id
# 若该主题的副本分别在 0、1、2、3 多个 Broker 上,那么你还要依次为 Broker 1、2、3 执行这条命令
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0

设置好这个参数之后,我们还需要为该主题设置要限速的副本

1
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test

主题分区迁移

对主题各个分区的副本进行“手术”般的调整,比如把某些分区批量迁移到其他 Broker 上。

删除主题

1
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete  --topic <topic_name>

特殊主题的管理与运维

consumer_offsets,transaction_state.这两个内部主题默认都有 50 个分区,因此,分区子目录会非常得多。

__consumer_offsets,自动创建时,会综合考虑当前运行的broker台数和broker端参数offsets.topic.replication.factor值,然后取两者较小值作为该主题的副本数.但是这违背了用户设置offsets.topic.replication.factor的初衷.0.11之后,kafka会严格遵守offsets.topic.replication.factor,如果当前允许的broker数量小于offsets.topic.replication.factor值,kafka创建主题失败,并显示抛出异常.

增加副本数

第 1 步是创建一个 json 文件,显式提供 50 个分区对应的副本数。注意,replicas 中的 3 台 Broker 排列顺序不同,目的是将 Leader 副本均匀地分散在 Broker 上。该文件具体格式如下:

1
2
3
4
5
6
7
8
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
...
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}`

第 2 部,执行脚本

1
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

查看这些内部主题的消息内容。特别是对于 __consumer_offsets 而言,由于它保存了消费者组的位移数据,有时候直接查看该主题消息是很方便的事情。下面的命令可以帮助我们直接查看消费者组提交的位移数据。

1
2

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

除了查看位移提交数据,我们还可以直接读取该主题消息,查看消费者组的状态信息。

1
2

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

删除主题失败的处理

最常见的原因有两个:副本所在的 Broker 宕机了;待删除主题的部分分区依然在执行迁移过程。

  1. 手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。
  2. 手动删除该主题磁盘的分区目录
  3. 在zk中执行 rmr /controller,触发controller重选举,刷新controller缓存,最后一步,一定要谨慎,可能造成大面积分区leader重选举.仅执行前两步也可以,只是controller缓存中没有清空待删除主题.不影响使用

consumer_offsets占用太多磁盘

一旦你发现这个主题消耗了过多的磁盘空间,那么,你一定要显式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。倘若真是这个原因导致的,那我们就只能重启相应的 Broker 了。另外,请你注意保留出错日志,因为这通常都是 Bug 导致的,最好提交到社区看一下。

kafka动态配置

动态broker参数配置

2.3 版本中的 Broker 端参数有 200 多个。
参数配置列表

每个代理配置更新方式有3类:

  1. read-only 被标记为read-only的参数和原来的参数行为一样,只有重启broker才能修改生效
  2. per-broker,被标记为per-broker的参数属于动态参数,修改之后,会在对应的broker中生效
  3. cluster-wide,被标记为cluster-wide的参数也属于动态参数,修改之后,在整个集群范围内生效。也可以为具体的broker修改cluster-wide参数。

使用场景

  • 动态调整broker端各种线程池大小
  • 动态调整broker端连接信息或安全配置
  • 动态更新ssl keystore有效期
  • 动态调整broker端compact操作性能
  • 实时变更 JMX指标收集器

动态参数的保存

kafka的动态参数保存在zookeeper中,/config/brokers znode保存动态broker参数,其下有两大类:

  1. default cluster-wide范围的动态参数
  2. 以broker.id为名,保存的是per-broker范围参数.

配置信息都是持久化节点,保证zk重启也不会丢失配置
cluster-wide、per-broker 和 static 参数的优先级是这样的:

per-broker 参数 > cluster-wide 参数 > static 参数 > Kafka 默认值

修改命令

1
2
3

$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=true
Completed updating default config for brokers in the cluster,

如果要设置 cluster-wide 范围的动态参数,需要显式指定 entity-default

1
2
3
4

$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --describe
Default config for brokers in the cluster are:
unclean.leader.election.enable=true sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true}

per-broker 范围参数

1
2
3

$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
Completed updating config for broker: 1.

查看

1
2
3
4

$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --describe
Configs for broker 1 are:
unclean.leader.election.enable=false sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false, DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true, DEFAULT_CONFIG:unclean.leader.election.enable=false}

删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 删除cluster-wide范围参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --delete-config unclean.leader.election.enable
Completed updating default config for brokers in the cluster,

# 删除per-broker范围参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --delete-config unclean.leader.election.enable
Completed updating config for broker: 1.


# 删除动态参数要指定 delete-config。

# 查看cluster-wide范围参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --describe
Default config for brokers in the cluster are:


# 查看Broker 1上的动态参数配置
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --describe
Configs for broker 1 are:

如果你想要知道动态 Broker 参数都有哪些,一种方式是在 Kafka 官网中查看 Broker 端参数列表,另一种方式是直接运行无参数的 kafka-configs 脚本,该脚本的说明文档会告诉你当前动态 Broker 参数都有哪些。

常用的几个值

日志保留时间

log.retention.ms。

线程数

num.io.threads 和 num.network.threads。

与ssl相关的参数

主要是 4 个参数(ssl.keystore.type、ssl.keystore.location、ssl.keystore.password 和 ssl.key.password)。允许动态实时调整它们之后,我们就能创建那些过期时间很短的 SSL 证书。每当我们调整时,Kafka 底层会重新配置 Socket 连接通道并更新 Keystore。新的连接会使用新的 Keystore,阶段性地调整这组参数,有利于增加安全性。

follower拉数据线程数

num.replica.fetchers。Follower 副本拉取速度慢,在线上 Kafka 环境中一直是一个老大难的问题。针对这个问题,常见的做法是增加该参数值,确保有充足的线程可以执行 Follower 副本向 Leader 副本的拉取

重设消费者组位移

为什么重设消费者位移组

rabbitMq,activeMq消费之后,消息就会被删除.
kafka基于日志结构,消费者进行消费时,仅仅从磁盘中读取,是只读操作,消费者不会删除消息数据.位移数据是由消费者控制,因此可以通过修改位移值,进行重复消费历史数据.

消息队列的选型
如果在你的场景中,消息处理逻辑非常复杂,处理代价很高,同时你又不关心消息之间的顺序,那么传统的消息中间件是比较合适的;反之,如果你的场景需要较高的吞吐量,但每条消息的处理时间很短,同时你又很在意消息的顺序,此时,Kafka 就是你的首选。

重设位移策略

  1. 位移维度:根据位移值重设,直接把消费者位移值设置成指定值

  2. 时间维度:指定时间,让消费者位移调整大于该时间的最小位移,也可以给出一段时间间隔,让消费者直接将位移调回到30分钟之前位移值.

维度 策略 含义 场景
位移 earliest 把位移调整到当前最早位移. 如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略。
位移 latest 位移移动到当前最新位移处 如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用 Latest 策略
位移 current 位移调到最新提交位移处. 你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置
位移 specified-offset 位移调整成指定位置 这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理
位移 shift-by-n 调整到当前位移+n处 如果说 Specified-Offset 策略要求你指定位移的绝对数值的话,那么 Shift-By-N 策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100
时间 DateTime 调整到大于指定时间的最小位移处
时间 Duration 调整到距离当前时间指定间隔的位移处

重设位移的实现

1
2
3
4
5

void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
  1. 要禁止自动提交位移。
  2. 组 ID 要设置成你要重设的消费者组的组 ID。
  3. 调用 seekToBeginning 方法时,需要一次性构造主题的所有分区对象。
  4. 最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))。

earliest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

String topic = "test"; // 要重设位移的Kafka主题
try (final KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Collections.singleton(topic));
consumer.poll(0);
consumer.seekToBeginning(
consumer.partitionsFor(topic).stream().map(partitionInfo ->
new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
}
1
2

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute

latest

Latest 策略和 Earliest 是类似的,我们只需要使用 seekToEnd 方法即可

1
Latest 策略和 Earliest 是类似的,我们只需要使用 seekToEnd 方法即可
1
2

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute

current

1
2
3
4
5
6
consumer.partitionsFor(topic).stream().map(info -> 
new TopicPartition(topic, info.partition()))
.forEach(tp -> {
long committedOffset = consumer.committed(tp).offset();
consumer.seek(tp, committedOffset);
});
1
2

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute

specified-offset

1
2
3
4
5
6

long targetOffset = 1234L;
for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
consumer.seek(tp, targetOffset);
}
1
2

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute

shift-by-n

1
2
3
4
5
6
7

for (PartitionInfo info : consumer.partitionsFor(topic)) {
TopicPartition tp = new TopicPartition(topic, info.partition());
// 假设向前跳123条消息
long targetOffset = consumer.committed(tp).offset() + 123L;
consumer.seek(tp, targetOffset);
}
1
2

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute

datetime策略

借助另一个方法:KafkaConsumer. offsetsForTimes 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14

long ts = LocalDateTime.of(
2019, 6, 20, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
Map<TopicPartition, Long> timeToSearch =
consumer.partitionsFor(topic).stream().map(info ->
new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -> ts));

for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}


1
2

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute

duration策略

1
2
3
4
5
6
7
8
Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream()
.map(info -> new TopicPartition(topic, info.partition()))
.collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000 * 60));

for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
consumer.offsetsForTimes(timeToSearch).entrySet()) {
consumer.seek(entry.getKey(), entry.getValue().offset());
}
1
2

bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute

常用工具及脚本

kafka-broker-api-versions

这个脚本的主要目的是验证不同 Kafka 版本之间服务器和客户端的适配性

生产消息

1
2
3

$ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4
>

消费消息

1
2

$ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property enable.auto.commit=false

测试生产者性能

1
2
3
4
5
6

$ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4

2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.

它会打印出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时。一般情况下,消息延时不是一个简单的数字,而是一组分布。或者说,我们应该关心延时的概率分布情况,仅仅知道一个平均值是没有意义的。这就是这里计算分位数的原因。通常我们关注到 99th 分位就可以了。比如在上面的输出中,99th 值是 604ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 604ms 以内。你完全可以把这个数据当作这个生产者对外承诺的 SLA。

测试消费者性能

1
2
3
4

$ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012

查看主题消息总数

想查看某个主题当前的消息总数。Kafka 自带的命令竟然没有提供这样的功能,要调用一个未被记录在官网上的命令。命令如下:

1
2
3
4
5
6
7
8
9
10

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic

test-topic:0:0
test-topic:1:0

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topic

test-topic:0:5500000
test-topic:1:5500000

我们要使用 Kafka 提供的工具类 GetOffsetShell 来计算给定主题特定分区当前的最早位移和最新位移,将两者的差值累加起来,就能得到该主题当前总的消息数。对于本例来说,test-topic 总的消息数为 5500000 + 5500000,等于 1100 万条。

查看消息文件数据

1
2
3
4
5
6
7

$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
Dumping ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1561597044933 size: 1237 magic: 2 compresscodec: LZ4 crc: 646766737 isvalid: true
baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1237 CreateTime: 1561597044934 size: 1237 magic: 2 compresscodec: LZ4 crc: 3751986433 isvalid: true
......

如果只是指定 –files,那么该命令显示的是消息批次(RecordBatch)或消息集合(MessageSet)的元数据信息,比如创建时间、使用的压缩算法、CRC 校验值等。

看一下每条具体的消息,那么就需要显式指定–deep-iteration参数

在上面的输出中,以竖线开头的就是消息批次下的消息信息。如果你还想看消息里面的实际数据,那么还需要指定–print-data-log参数

消费者组位移

1
kafka-consumer-groups  --bootstrap-server {broker ip:port} --descript --group {groupName}
kafka-sth
kafka-认证与授权
  1. 1. 目录
  2. 2. 主题管理
    1. 2.1. 主题日常管理
    2. 2.2. 创建
      1. 2.2.1. 查询
      2. 2.2.2. 查询主题的详细数据
      3. 2.2.3. 修改分区
      4. 2.2.4. 修改主题级别参数
      5. 2.2.5. 变更副本数
      6. 2.2.6. 修改主题限速
      7. 2.2.7. 主题分区迁移
      8. 2.2.8. 删除主题
    3. 2.3. 特殊主题的管理与运维
    4. 2.4. 删除主题失败的处理
    5. 2.5. consumer_offsets占用太多磁盘
  3. 3. kafka动态配置
    1. 3.1. 动态broker参数配置
    2. 3.2. 使用场景
    3. 3.3. 动态参数的保存
    4. 3.4. 常用的几个值
      1. 3.4.1. 日志保留时间
      2. 3.4.2. 线程数
      3. 3.4.3. 与ssl相关的参数
      4. 3.4.4. follower拉数据线程数
  4. 4. 重设消费者组位移
    1. 4.1. 为什么重设消费者位移组
    2. 4.2. 重设位移策略
    3. 4.3. 重设位移的实现
      1. 4.3.1. earliest
      2. 4.3.2. latest
      3. 4.3.3. current
      4. 4.3.4. specified-offset
      5. 4.3.5. shift-by-n
      6. 4.3.6. datetime策略
      7. 4.3.7. duration策略
  5. 5. 常用工具及脚本
    1. 5.1. kafka-broker-api-versions
    2. 5.2. 生产消息
    3. 5.3. 消费消息
    4. 5.4. 测试生产者性能
    5. 5.5. 测试消费者性能
    6. 5.6. 查看主题消息总数
    7. 5.7. 查看消息文件数据
    8. 5.8. 消费者组位移
© 2023 haoxp
Hexo theme