• java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

  • java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

kafka-认证与授权

目录

  • 目录
  • 认证
    • 比较
  • 适用场景
    • gssapi
    • plain
    • scram
      • 配置实例
    • oauthbearer
    • delegation
  • 授权
    • kafka的acl
    • 开启acl
    • 超级用户
    • kafka-acls
    • 授权机制的单独使用
    • 配置ssl
      • broker配置
      • 客户端配置
  • 最佳实践

认证

sasl与ssl

做 SSL 认证.要启用双路认证,也就是说 Broker 也要认证客户端的证书。

kafka支持sasl机制有5种

  1. gssapi:kerberos的安全接口
  2. plain:简单用户名/密码认证
  3. scram:解决plain机制安全问题的新机制
  4. oauthbearer:基于oauth2认证框架的机制
  5. delegation token:补充现有sasl机制的轻量级认证机制

比较

使用 SSL 做信道加密的情况更多一些,但使用 SSL 实现认证不如使用 SASL。毕竟,SASL 能够支持你选择不同的实现机制,如 GSSAPI、SCRAM、PLAIN 等。因此,我的建议是你可以使用 SSL 来做通信加密,使用 SASL 来做 Kafka 的认证实现。

适用场景

gssapi

SASL/GSSAPI 主要是给 Kerberos 使用的。如果你的公司已经做了 Kerberos 认证(比如使用 Active Directory),那么使用 GSSAPI 是最方便的了。因为你不需要额外地搭建 Kerberos,只要让你们的 Kerberos 管理员给每个 Broker 和要访问 Kafka 集群的操作系统用户申请 principal 就好了。总之,GSSAPI 适用于本身已经做了 Kerberos 认证的场景,这样的话,SASL/GSSAPI 可以实现无缝集成。

plain

它是一个简单的用户名 / 密码认证机制,通常与 SSL 加密搭配使用。注意,这里的 PLAIN 和 PLAINTEXT 是两回事。PLAIN 在这里是一种认证机制,而 PLAINTEXT 说的是未使用 SSL 时的明文传输。对于一些小公司而言,搭建公司级的 Kerberos 可能并没有什么必要,他们的用户系统也不复杂,特别是访问 Kafka 集群的用户可能不是很多。对于 SASL/PLAIN 而言,这就是一个非常合适的应用场景。总体来说,SASL/PLAIN 的配置和运维成本相对较小,适合于小型公司中的 Kafka 集群。

但是,SASL/PLAIN 有这样一个弊端:它不能动态地增减认证用户,你必须重启 Kafka 集群才能令变更生效。为什么呢?这是因为所有认证用户信息全部保存在静态文件中,所以只能重启 Broker,才能重新加载变更后的静态文件。

scram

通过将认证用户信息保存在 ZooKeeper 的方式,避免了动态修改需要重启 Broker 的弊端。在实际使用过程中,你可以使用 Kafka 提供的命令动态地创建和删除用户,无需重启整个集群。因此,如果你打算使用 SASL/PLAIN,不妨改用 SASL/SCRAM 试试。

配置实例

admin 用户、writer 用户和 reader 用户。admin 用户用于实现 Broker 间通信,writer 用户用于生产消息,reader 用户用于消费消息。

  1. 创建用户
1
2
3
4
5
6
7
8
9
10
11
12
$ cd kafka_2.12-2.3.0/
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin
Completed Updating config for entity: user-principal 'admin'.


$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=writer],SCRAM-SHA-512=[password=writer]' --entity-type users --entity-name writer
Completed Updating config for entity: user-principal 'writer'.


$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=reader],SCRAM-SHA-512=[password=reader]' --entity-type users --entity-name reader
Completed Updating config for entity: user-principal 'reader'.

  1. 创建jaas文件
1
2
3
4
5
6

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
  • 不要忘记最后一行和倒数第二行结尾处的分号;
  • JAAS 文件中不需要任何空格键。

配置broker参数

1
2
3
4
5
6
7
8
9
10
11
12
# 开启 SCRAM 认证机制,并启用 SHA-256 算法;
sasl.enabled.mechanisms=SCRAM-SHA-256

#为 Broker 间通信也开启 SCRAM 认证,同样使用 SHA-256 算法;
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256

#Broker 间通信不配置 SSL
security.inter.broker.protocol=SASL_PLAINTEXT

#设置 listeners 使用 SASL_PLAINTEXT,依然是不使用 SSL。
listeners=SASL_PLAINTEXT://localhost:9092

  1. 启动broker
1
2
3
4
5
6
7
8
9
10
11
12
13

$KAFKA_OPTS=-Djava.security.auth.login.config=<your_path>/kafka-broker.jaas bin/kafka-server-start.sh config/server1.properties
......
[2019-07-02 13:30:34,822] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-02 13:30:34,822] INFO Kafka startTimeMs: 1562045434820 (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-02 13:30:34,823] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)


$KAFKA_OPTS=-Djava.security.auth.login.config=<your_path>/kafka-broker.jaas bin/kafka-server-start.sh config/server2.properties
......
[2019-07-02 13:32:31,976] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-02 13:32:31,976] INFO Kafka startTimeMs: 1562045551973 (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-02 13:32:31,978] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
  1. 发送消息

创建配置文件producer.conf

1
2
3
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer";

发送消息

1
2
3
4

$ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093 --topic test --producer.config <your_path>/producer.conf
>hello, world
>
  1. 消费消息
    创建配置consumer.conf
    1
    2
    3
    4

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=SCRAM-SHA-256
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader";
    消费消息
    1
    2
    3

    $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093 --topic test --from-beginning --consumer.config <your_path>/consumer.conf
    hello, world
  2. 动态增减用户
1
2
3
4
5
6
7
8
9

$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name writer
Completed Updating config for entity: user-principal 'writer'.

$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name writer
Completed Updating config for entity: user-principal 'writer'.

$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=new_writer]' --entity-type users --entity-name new_writer
Completed Updating config for entity: user-principal 'new_writer'.

oauthbearer

主要是为了实现与 OAuth 2 框架的集成。Kafka 不提倡单纯使用 OAUTHBEARER,因为它生成的不安全的 JSON Web Token,必须配以 SSL 加密才能用在生产环境中

delegation

主要目的是补充现有的 SASL 或 SSL 认证。如果要使用 Delegation Token,你需要先配置好 SASL 认证,然后再利用 Kafka 提供的 API 去获取对应的 Delegation Token。这样,Broker 和客户端在做认证的时候,可以直接使用这个 token,不用每次都去 KDC 获取对应的 ticket(Kerberos 认证)或传输 Keystore 文件(SSL 认证)。

授权

授权:对信息安全或计算机安全相关的资源授予访问权限,特别是存取控制
kafka使用的ACL

常见的四种控制权限:

  1. ACL:access-control list,访问控制列表
    用户与权限的直接映射关系
    用户–>权限

  2. RBAC:role-based access control,基于角色的权限控制
    用户–>角色–>权限

  3. ABAC:attribute-based control,基于属性的权限控制

  1. PBAC:policy-based access control,基于策略的权限控制

kafka的acl

Principal P is [Allowed/Denied] Operation O From Host H On Resource R.

  • principal:kafka集群的用户
  • operation:具体的访问类型
  • host:链接kafka集群的客户端应用ip地址,host支持星号占位符
  • resource:kafka资源类型,topic,cluster,group,transactionalid,delegationtoken

开启acl

kafka提供了一个可插拔的授权实现机制,所有的acl配置项保存在zk的/kafka-acl节点中.可以通过kafka-acls脚本动态的对acl项进行增删改查

开启acl,在broker的配置文件(server.properties)种增加

1
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

authorizer.class.name 参数指定了 ACL 授权机制的实现类。当前 Kafka 提供了 Authorizer 接口,允许你实现你自己的授权机制,但更常见的做法,还是直接使用 Kafka 自带的 SimpleAclAuthorizer 实现类。一旦设置好这个参数的值,并且启动 Broker 后,该 Broker 就默认开启了 ACL 授权验证。在实际生产环境中,你需要为集群中的每台 Broker 都做此设置。

超级用户

开启acl之后,需要显示地为不同用户设置访问某项资源的权限,否则,在默认情况下,没有配置的任何acl的资源是不能被访问的,不过超级用户能够访问所有资源,在server.properties种配置

1
super.users=User:super1;User:super2

如果你要一次性指定多个超级用户,那么分隔符是分号而不是逗号,这是为了避免出现用户名中包含逗号从而无法分割的问题。

kafka-acls

1
2
# All 表示所有操作,topic 中的星号则表示所有主题,指定 --cluster 则说明我们要为 Alice 设置的是集群权限。
$ kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Alice --operation All --topic '*' --cluster

授权机制的单独使用

授权机制脱离认证机制单独使用,不过只能针对ip.
简单示例

1
2
3
4
5
6
7
8
9
10

$ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --deny-principal User:* --deny-host 127.0.0.1 --operation Write --topic test

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
[2019-07-16 10:10:57,283] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2019-07-16 10:10:57,284] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [test] (org.apache.kafka.clients.Metadata)
[2019-07-16 10:10:57,284] ERROR Error when sending message to topic test with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: <span class="orange">Not authorized to access topics: [test]
</span class="orange">

配置ssl

broker配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

#!/bin/bash

#设置环境变量
BASE_DIR=/Users/huxi/testenv #你需要修改此处
CERT_OUTPUT_PATH="$BASE_DIR/certificates"
PASSWORD=test1234
KEY_STORE="$CERT_OUTPUT_PATH/server.keystore.jks"
TRUST_STORE="$CERT_OUTPUT_PATH/server.truststore.jks"
CLIENT_KEY_STORE="$CERT_OUTPUT_PATH/client.keystore.jks"
CLIENT_TRUST_STORE="$CERT_OUTPUT_PATH/client.truststore.jks"
KEY_PASSWORD=$PASSWORD
STORE_PASSWORD=$PASSWORD
TRUST_KEY_PASSWORD=$PASSWORD
TRUST_STORE_PASSWORD=$PASSWORD
CERT_AUTH_FILE="$CERT_OUTPUT_PATH/ca-cert"
DAYS_VALID=365
DNAME="CN=Xi Hu, OU=YourDept, O=YourCompany, L=Beijing, ST=Beijing, C=CN"


mkdir -p $CERT_OUTPUT_PATH

echo "1. 产生key和证书......"
keytool -keystore $KEY_STORE -alias kafka-server -validity $DAYS_VALID -genkey -keyalg RSA \
-storepass $STORE_PASSWORD -keypass $KEY_PASSWORD -dname "$DNAME"

keytool -keystore $CLIENT_KEY_STORE -alias kafka-client -validity $DAYS_VALID -genkey -keyalg RSA \
-storepass $STORE_PASSWORD -keypass $KEY_PASSWORD -dname "$DNAME"

echo "2. 创建CA......"
openssl req -new -x509 -keyout $CERT_OUTPUT_PATH/ca-key -out "$CERT_AUTH_FILE" -days "$DAYS_VALID" \
-passin pass:"$PASSWORD" -passout pass:"$PASSWORD" \
-subj "/C=CN/ST=Beijing/L=Beijing/O=YourCompany/OU=YourDept,CN=Xi Hu"

echo "3. 添加CA文件到broker truststore......"
keytool -keystore "$TRUST_STORE" -alias CARoot \
-importcert -file "$CERT_AUTH_FILE" -storepass "$TRUST_STORE_PASSWORD" -keypass "$TRUST_KEY_PASS" -noprompt

echo "4. 添加CA文件到client truststore......"
keytool -keystore "$CLIENT_TRUST_STORE" -alias CARoot \
-importcert -file "$CERT_AUTH_FILE" -storepass "$TRUST_STORE_PASSWORD" -keypass "$TRUST_KEY_PASS" -noprompt

echo "5. 从keystore中导出集群证书......"
keytool -keystore "$KEY_STORE" -alias kafka-server -certreq -file "$CERT_OUTPUT_PATH/server-cert-file" \
-storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

keytool -keystore "$CLIENT_KEY_STORE" -alias kafka-client -certreq -file "$CERT_OUTPUT_PATH/client-cert-file" \
-storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

echo "6. 使用CA签发证书......"
openssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CERT_OUTPUT_PATH/server-cert-file" \
-out "$CERT_OUTPUT_PATH/server-cert-signed" -days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"

openssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CERT_OUTPUT_PATH/client-cert-file" \
-out "$CERT_OUTPUT_PATH/client-cert-signed" -days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"

echo "7. 导入CA文件到keystore......"
keytool -keystore "$KEY_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt

keytool -keystore "$CLIENT_KEY_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt

echo "8. 导入已签发证书到keystore......"
keytool -keystore "$KEY_STORE" -alias kafka-server -import -file "$CERT_OUTPUT_PATH/server-cert-signed" \
-storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

keytool -keystore "$CLIENT_KEY_STORE" -alias kafka-client -import -file "$CERT_OUTPUT_PATH/client-cert-signed" \
-storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

echo "9. 删除临时文件......"
rm "$CERT_OUTPUT_PATH/ca-cert.srl"
rm "$CERT_OUTPUT_PATH/server-cert-signed"
rm "$CERT_OUTPUT_PATH/client-cert-signed"
rm "$CERT_OUTPUT_PATH/server-cert-file"
rm "$CERT_OUTPUT_PATH/client-cert-file"

该脚本主要的产出是 4 个文件,分别是:server.keystore.jks、server.truststore.jks、client.keystore.jks 和 client.truststore.jks。

需要把以 server 开头的两个文件,拷贝到集群中的所有 Broker 机器上,把以 client 开头的两个文件,拷贝到所有要连接 Kafka 集群的客户端应用程序机器上。

配置每个 Broker 的 server.properties 文件,增加以下内容:

1
2
3
4
5
6
7
8
9

listeners=SSL://localhost:9093
ssl.truststore.location=/Users/huxi/testenv/certificates/server.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/Users/huxi/testenv/certificates/server.keystore.jks
ssl.keystore.password=test1234
security.inter.broker.protocol=SSL
ssl.client.auth=required
ssl.key.password=test1234

客户端配置

创建client-ssl.config

1
2
3
4
5
6
7
8
9
10

security.protocol=SSL
ssl.truststore.location=/Users/huxi/testenv/certificates/client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/Users/huxi/testenv/certificates/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
# 自 Kafka 2.0 版本开始,它默认会验证服务器端的主机名是否匹配 Broker 端证书里的主机名。如果你要禁掉此功能的话,一定要将该参数设置为空字符串
ssl.endpoint.identification.algorithm=

测试

1
$ bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.config

最佳实践

  1. 开启acl
  2. 采用白名单机制
    不要设置:allow.everyone.if.no.acl.found=true。
  3. 使用kafka-acls脚本为ssl用户授予集群权限
    配置ssl时,指定用户的 Distinguished Name 为“CN=Xi Hu, OU=YourDept, O=YourCompany, L=Beijing, ST=Beijing, C=CN”。之前在设置 Broker 端参数时,我们指定了 security.inter.broker.protocol=SSL,即强制指定 Broker 间的通讯也采用 SSL 加密。
    如果不为指定的 Distinguished Name 授予集群操作的权限,你是无法成功启动 Broker 的。因此,你需要在启动 Broker 之前执行下面的命令:
    1
    2

    $ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:"CN=Xi Hu,OU=YourDept,O=YourCompany,L=Beijing,ST=Beijing,C=CN" --operation All --cluster
  4. 为客户端程序授予相应的权限
    1
    2
    3
    4
    5

    $ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:"CN=Xi Hu,OU=YourDept,O=YourCompany,L=Beijing,ST=Beijing,C=CN" --producer --topic 'test'


    $ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:"CN=Xi Hu,OU=YourDept,O=YourCompany,L=Beijing,ST=Beijing,C=CN" --consumer --topic 'test' --group '*'

授予的权限越少,kafka集群越安全

kafka-运维
kafka-生产者,消费者与消息
  1. 1. 目录
  2. 2. 认证
    1. 2.1. 比较
  3. 3. 适用场景
    1. 3.1. gssapi
    2. 3.2. plain
    3. 3.3. scram
      1. 3.3.1. 配置实例
    4. 3.4. oauthbearer
    5. 3.5. delegation
  4. 4. 授权
    1. 4.1. kafka的acl
    2. 4.2. 开启acl
    3. 4.3. 超级用户
    4. 4.4. kafka-acls
    5. 4.5. 授权机制的单独使用
    6. 4.6. 配置ssl
      1. 4.6.1. broker配置
      2. 4.6.2. 客户端配置
  5. 5. 最佳实践
© 2023 haoxp
Hexo theme