• java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

  • java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

kafka-admin-client

目录

  • 目录
  • admin-client
    • 功能与 工作流程
  • 主要流程
  • 构造和销毁实例
  • 常见示例
    • 创建主题
    • 消费者组位移
    • broker磁盘占用

admin-client

完整类路径是 org.apache.kafka.clients.admin.AdminClient,而不是 kafka.admin.AdminClient
依赖

1
2
3
4
5
6

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${latest}</version>
</dependency>

功能与 工作流程

  1. 主题管理:创建删除查询
  2. 权限管理:具体的权限配置与删除
  3. 配置参数管理:kafka各种资源参数的设置.详情查询.kafka资源主要由 broker,主题,用户,client-id
  4. 副本日志管理:底层日志路径的变更和详情查询
  5. 分区管理:即创建额外的主题分区
  6. 消息删除:即删除指定位移之前的消息分区.
  7. delegationToken管理:消费者组的查询,位移查询和删除.
  8. 消费者组管理:消费者组的查询,位移查询和删除
  9. preferred领导者选举: 指定主题分区的preferred broker为领导者.

主要流程

从设计上来看,AdminClient 是一个双线程的设计:前端主线程和后端 I/O 线程(kafka-admin-client-thread)

前端线程负责将用户要执行的操作转换成对应的请求,然后再将请求发送到后端 I/O 线程的队列中;而后端 I/O 线程从队列中读取相应的请求,然后发送到对应的 Broker 节点上,之后把执行结果保存起来,以便等待前端线程的获取。

前端主线程会创建名为 Call 的请求对象实例。该实例有两个主要的任务。

  1. 构建对应的请求对象。比如,如果要创建主题,那么就创建 CreateTopicsRequest;如果是查询消费者组位移,就创建 OffsetFetchRequest。
  2. 指定响应的回调逻辑。比如从 Broker 端接收到 CreateTopicsResponse 之后要执行的动作。一旦创建好 Call 实例,前端主线程会将其放入到新请求队列(New Call Queue)中,此时,前端主线程的任务就算完成了。它只需要等待结果返回即可

后台io线程使用3个队列,为了确保前端主线程不会因为monitor锁被阻塞,后端io线程定期地将请求队列中的所有call示例全部搬移到待发送请求队列中进行处理.

构造和销毁实例

创建 AdminClient 实例和创建 KafkaProducer 或 KafkaConsumer 实例的方法是类似的,你需要手动构造一个 Properties 对象或 Map 对象,然后传给对应的方法。社区专门为 AdminClient 提供了几十个专属参数,最常见而且必须要指定的参数,是我们熟知的 bootstrap.servers 参数。

完整的参数列表

1
2
3
4
5
6
7
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:port");
props.put("request.timeout.ms", 600000);

try (AdminClient client = AdminClient.create(props)) {
// 执行你要做的操作……
}

常见示例

创建主题

1
2
3
4
5
6
7

String newTopicName = "test-topic";
try (AdminClient client = AdminClient.create(props)) {
NewTopic newTopic = new NewTopic(newTopicName, 10, (short) 3);
CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
result.all().get(10, TimeUnit.SECONDS);
}

AdminClient 各个方法的返回类型都是名为 ***Result 的对象。这类对象会将结果以 Java Future 的形式封装起来。如果要获取运行结果,你需要调用相应的方法来获取对应的 Future 对象,然后再调用相应的 get 方法来取得执行结果。

对于创建主题而言,一旦主题被成功创建,任务也就完成了,它返回的结果也就不重要了,只要没有抛出异常就行。查询消费者组位移

消费者组位移

1
2
3
4
5
6
7
8

String groupID = "test-group";
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
Map<TopicPartition, OffsetAndMetadata> offsets =
result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
System.out.println(offsets);
}

调用 AdminClient 的 listConsumerGroupOffsets 方法去获取指定消费者组的位移数据
它返回的 Map 对象中保存着按照分区分组的位移数据。你可以调用 OffsetAndMetadata 对象的 offset() 方法拿到实际的位移数据。

broker磁盘占用

kafka jmx监控指标没有提供磁盘空间占用率的监控..使用adminClient监控

1
2
3
4
5
6
7
8
9
10
11
12

try (AdminClient client = AdminClient.create(props)) {
DescribeLogDirsResult ret = client.describeLogDirs(Collections.singletonList(targetBrokerId)); // 指定Broker id
long size = 0L;
for (Map<String, DescribeLogDirsResponse.LogDirInfo> logDirInfoMap : ret.all().get().values()) {
size += logDirInfoMap.values().stream().map(logDirInfo -> logDirInfo.replicaInfos).flatMap(
topicPartitionReplicaInfoMap ->
topicPartitionReplicaInfoMap.values().stream().map(replicaInfo -> replicaInfo.size))
.mapToLong(Long::longValue).sum();
}
System.out.println(size);
}
kafka-集群部署
kafka-mirror-maker
  1. 1. 目录
  2. 2. admin-client
    1. 2.1. 功能与 工作流程
  3. 3. 主要流程
  4. 4. 构造和销毁实例
  5. 5. 常见示例
    1. 5.1. 创建主题
    2. 5.2. 消费者组位移
    3. 5.3. broker磁盘占用
© 2023 haoxp
Hexo theme