目录
主题管理
主题日常管理
kafka-topics,kafka-reassign-partitions,kafka-configs脚本
创建
1 | #从 Kafka 2.2 版本开始,社区推荐用 --bootstrap-server 参数替换 --zookeeper 参数.--zookeeper也显示地被标记为弃用 |
查询
1 | bin/kafka-topics.sh --bootstrap-server broker_host:port --list |
查询主题的详细数据
1 | bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name> |
如果 describe 命令不指定具体的主题名称,那么 Kafka 默认会返回所有“可见”主题的详细数据给你
可见指发起这个命令的用户能够看到的kafka主题.
修改分区
1 | bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数> |
修改主题级别参数
1 | bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760 |
这个脚本也能指定 –bootstrap-server 参数,只是它是用来设置动态参数的。
变更副本数
修改主题限速
这里主要是指设置 Leader 副本和 Follower 副本使用的带宽。有时候,我们想要让某个主题的副本在执行副本同步机制时,不要消耗过多的带宽。Kafka 提供了这样的功能。
先设置 Broker 端参数 leader.replication.throttled.rate 和 follower.replication.throttled.rate,命令如下:
1 | # entity-name:broker-id |
设置好这个参数之后,我们还需要为该主题设置要限速的副本
1 | bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test |
主题分区迁移
对主题各个分区的副本进行“手术”般的调整,比如把某些分区批量迁移到其他 Broker 上。
删除主题
1 | bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name> |
特殊主题的管理与运维
consumer_offsets,transaction_state.这两个内部主题默认都有 50 个分区,因此,分区子目录会非常得多。
__consumer_offsets,自动创建时,会综合考虑当前运行的broker台数和broker端参数offsets.topic.replication.factor
值,然后取两者较小值作为该主题的副本数.但是这违背了用户设置offsets.topic.replication.factor
的初衷.0.11之后,kafka会严格遵守offsets.topic.replication.factor
,如果当前允许的broker数量小于offsets.topic.replication.factor
值,kafka创建主题失败,并显示抛出异常.
增加副本数
第 1 步是创建一个 json 文件,显式提供 50 个分区对应的副本数。注意,replicas 中的 3 台 Broker 排列顺序不同,目的是将 Leader 副本均匀地分散在 Broker 上。该文件具体格式如下:
1 | {"version":1, "partitions":[ |
第 2 部,执行脚本
1 | bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute |
查看这些内部主题的消息内容。特别是对于 __consumer_offsets 而言,由于它保存了消费者组的位移数据,有时候直接查看该主题消息是很方便的事情。下面的命令可以帮助我们直接查看消费者组提交的位移数据。
1 |
|
除了查看位移提交数据,我们还可以直接读取该主题消息,查看消费者组的状态信息。
1 |
|
删除主题失败的处理
最常见的原因有两个:副本所在的 Broker 宕机了;待删除主题的部分分区依然在执行迁移过程。
- 手动删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。
- 手动删除该主题磁盘的分区目录
- 在zk中执行 rmr /controller,触发controller重选举,刷新controller缓存,最后一步,一定要谨慎,可能造成大面积分区leader重选举.仅执行前两步也可以,只是controller缓存中没有清空待删除主题.不影响使用
consumer_offsets占用太多磁盘
一旦你发现这个主题消耗了过多的磁盘空间,那么,你一定要显式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题。倘若真是这个原因导致的,那我们就只能重启相应的 Broker 了。另外,请你注意保留出错日志,因为这通常都是 Bug 导致的,最好提交到社区看一下。
kafka动态配置
动态broker参数配置
2.3 版本中的 Broker 端参数有 200 多个。
参数配置列表
每个代理配置更新方式有3类:
- read-only 被标记为read-only的参数和原来的参数行为一样,只有重启broker才能修改生效
- per-broker,被标记为per-broker的参数属于动态参数,修改之后,会在对应的broker中生效
- cluster-wide,被标记为cluster-wide的参数也属于动态参数,修改之后,在整个集群范围内生效。也可以为具体的broker修改cluster-wide参数。
使用场景
- 动态调整broker端各种线程池大小
- 动态调整broker端连接信息或安全配置
- 动态更新ssl keystore有效期
- 动态调整broker端compact操作性能
- 实时变更 JMX指标收集器
动态参数的保存
kafka的动态参数保存在zookeeper中,/config/brokers znode保存动态broker参数,其下有两大类:
- default cluster-wide范围的动态参数
- 以broker.id为名,保存的是per-broker范围参数.
配置信息都是持久化节点,保证zk重启也不会丢失配置
cluster-wide、per-broker 和 static 参数的优先级是这样的:
per-broker 参数 > cluster-wide 参数 > static 参数 > Kafka 默认值
修改命令
1 |
|
如果要设置 cluster-wide 范围的动态参数,需要显式指定 entity-default
1 |
|
per-broker 范围参数
1 |
|
查看
1 |
|
删除
1 |
|
如果你想要知道动态 Broker 参数都有哪些,一种方式是在 Kafka 官网中查看 Broker 端参数列表,另一种方式是直接运行无参数的 kafka-configs 脚本,该脚本的说明文档会告诉你当前动态 Broker 参数都有哪些。
常用的几个值
日志保留时间
log.retention.ms。
线程数
num.io.threads 和 num.network.threads。
与ssl相关的参数
主要是 4 个参数(ssl.keystore.type、ssl.keystore.location、ssl.keystore.password 和 ssl.key.password)。允许动态实时调整它们之后,我们就能创建那些过期时间很短的 SSL 证书。每当我们调整时,Kafka 底层会重新配置 Socket 连接通道并更新 Keystore。新的连接会使用新的 Keystore,阶段性地调整这组参数,有利于增加安全性。
follower拉数据线程数
num.replica.fetchers。Follower 副本拉取速度慢,在线上 Kafka 环境中一直是一个老大难的问题。针对这个问题,常见的做法是增加该参数值,确保有充足的线程可以执行 Follower 副本向 Leader 副本的拉取
重设消费者组位移
为什么重设消费者位移组
rabbitMq,activeMq消费之后,消息就会被删除.
kafka基于日志结构,消费者进行消费时,仅仅从磁盘中读取,是只读操作,消费者不会删除消息数据.位移数据是由消费者控制,因此可以通过修改位移值,进行重复消费历史数据.
消息队列的选型
如果在你的场景中,消息处理逻辑非常复杂,处理代价很高,同时你又不关心消息之间的顺序,那么传统的消息中间件是比较合适的;反之,如果你的场景需要较高的吞吐量,但每条消息的处理时间很短,同时你又很在意消息的顺序,此时,Kafka 就是你的首选。
重设位移策略
位移维度:根据位移值重设,直接把消费者位移值设置成指定值
时间维度:指定时间,让消费者位移调整大于该时间的最小位移,也可以给出一段时间间隔,让消费者直接将位移调回到30分钟之前位移值.
维度 | 策略 | 含义 | 场景 |
---|---|---|---|
位移 | earliest | 把位移调整到当前最早位移. | 如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略。 |
位移 | latest | 位移移动到当前最新位移处 | 如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用 Latest 策略 |
位移 | current | 位移调到最新提交位移处. | 你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置 |
位移 | specified-offset | 位移调整成指定位置 | 这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理 |
位移 | shift-by-n | 调整到当前位移+n处 | 如果说 Specified-Offset 策略要求你指定位移的绝对数值的话,那么 Shift-By-N 策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100 |
时间 | DateTime | 调整到大于指定时间的最小位移处 | |
时间 | Duration | 调整到距离当前时间指定间隔的位移处 |
重设位移的实现
1 |
|
- 要禁止自动提交位移。
- 组 ID 要设置成你要重设的消费者组的组 ID。
- 调用 seekToBeginning 方法时,需要一次性构造主题的所有分区对象。
- 最重要的是,一定要调用带长整型的 poll 方法,而不要调用 consumer.poll(Duration.ofSecond(0))。
earliest
1 |
|
1 |
|
latest
Latest 策略和 Earliest 是类似的,我们只需要使用 seekToEnd 方法即可
1 | Latest 策略和 Earliest 是类似的,我们只需要使用 seekToEnd 方法即可 |
1 |
|
current
1 | consumer.partitionsFor(topic).stream().map(info -> |
1 |
|
specified-offset
1 |
|
1 |
|
shift-by-n
1 |
|
1 |
|
datetime策略
借助另一个方法:KafkaConsumer. offsetsForTimes 方法
1 |
|
1 |
|
duration策略
1 | Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream() |
1 |
|
常用工具及脚本
kafka-broker-api-versions
这个脚本的主要目的是验证不同 Kafka 版本之间服务器和客户端的适配性
生产消息
1 |
|
消费消息
1 |
|
测试生产者性能
1 |
|
它会打印出测试生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时。一般情况下,消息延时不是一个简单的数字,而是一组分布。或者说,我们应该关心延时的概率分布情况,仅仅知道一个平均值是没有意义的。这就是这里计算分位数的原因。通常我们关注到 99th 分位就可以了。比如在上面的输出中,99th 值是 604ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 604ms 以内。你完全可以把这个数据当作这个生产者对外承诺的 SLA。
测试消费者性能
1 |
|
查看主题消息总数
想查看某个主题当前的消息总数。Kafka 自带的命令竟然没有提供这样的功能,要调用一个未被记录在官网上的命令。命令如下:
1 |
|
我们要使用 Kafka 提供的工具类 GetOffsetShell 来计算给定主题特定分区当前的最早位移和最新位移,将两者的差值累加起来,就能得到该主题当前总的消息数。对于本例来说,test-topic 总的消息数为 5500000 + 5500000,等于 1100 万条。
查看消息文件数据
1 |
|
如果只是指定 –files,那么该命令显示的是消息批次(RecordBatch)或消息集合(MessageSet)的元数据信息,比如创建时间、使用的压缩算法、CRC 校验值等。
看一下每条具体的消息,那么就需要显式指定–deep-iteration参数
在上面的输出中,以竖线开头的就是消息批次下的消息信息。如果你还想看消息里面的实际数据,那么还需要指定–print-data-log参数
消费者组位移
1 | kafka-consumer-groups --bootstrap-server {broker ip:port} --descript --group {groupName} |