kafka 配置认证与授权

大数据70

本例不使用kerberos做认证,使用用户名和密码的方式来进行认证

1.0 配置server.properties 添加如下配置

#配置 ACL 入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#本例使用 SASL PLAINTEXT
listeners=SASL_PLAINTEXT://hadoop4:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
#设置本例中 admin 为超级用户
super.users=User:admin

1.1 创建服务端的jaas.conf文件,文件信息如下:

[hduser@hadoop4 config]$ cat jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_reader="reader"
user_writer="writer";
};

1.2 修改启动脚本kafka-server-start.sh,

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data1/hadoop/kafka/config/jaas.conf  kafka.Kafka "$@"

其中:-Djava.security.auth.login.config=/data1/hadoop/kafka/config/jaas.conf 是新加的

2.1 生成jaas文件

[hduser@hadoop4 config]$ cat writer_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username = "writer"
password="writer";
};

2.2 配置生产者启动脚本

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/hadoop/kafka/config/writer_jaas.conf  kafka.tools.ConsoleProducer "$@"

2.3 配置启动脚本

kafka-console-producer.sh --bootstrap-server 192.168.43.15:9092  --topic test2  --producer-property security.protocol=SASL_PLAINTEXT  --producer-property sasl.mechanism=PLAIN

可以发现,需要添加协议参数:

security.protocol: 表示开启安全协议,使用SASL,
sasl.mechanism: 协议机制,如果是使用Kerberos,那么就配置kerberos

如果继续执行上述的命令,可以发现还是失败,失败的原因是对于topic test2来说,没有授权。

2.4 授权
在设置具体的 ACL 规则之前,首先简单学习一下 Kafka ACL 的格式。根据官网 的介绍,
Kafka 一条 ACL 的格式为 "Principal P is [Allowed/Denied] Operation O From Host H On
Resource R",含义描述如下:

principal :表示 Kafka user
operation :表示 个具体的操作类型,如 WRITE、READ 、DESCRIBE 。完整的操
作列表详见 http://docs.confluent.io/current/kafka/authorization.html#overview
Host 表示连 Kafka 集群的 client IP 地址,如果是"*"则表示所有四。注意 ,当
Kafka 不支持主机名,只能指定 IP 地址。
Resource :表示一种 Kafka 资源类型 。当前共有 种类型 TOPIC CLUSTER GROUP
和 TRANSACTIONID

kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:writer --operation Write --topic test2

3.1 配置jaas文件

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="reader"
    password="reader";
};

3.2 消费者启动脚本配置

exec $(dirname $0)/kafka-run-class.sh  -Djava.security.auth.login.config=/data1/hadoop/kafka/config/reader_jaas.conf kafka.tools.ConsoleConsumer "$@"

3.3 创建消费者配置文件

[hduser@hadoop4 ~]$ cat consumer.config
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=test-group

3.4 消费数据

  • 如果不指定consumer.config,将会出现下面的异常
[hduser@hadoop4 ~]$ kafka-console-consumer.sh  --bootstrap-server 192.168.43.15:9092  --from-beginning --topic test2
[2021-05-08 09:44:35,771] WARN [Consumer clientId=consumer-console-consumer-85632-1, groupId=console-consumer-85632] Bootstrap broker 192.168.43.15:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-08 09:44:36,187] WARN [Consumer clientId=consumer-console-consumer-85632-1, groupId=console-consumer-85632] Bootstrap broker 192.168.43.15:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-08 09:44:36,599] WARN [Consumer clientId=consumer-console-consumer-85632-1, groupId=console-consumer-85632] Bootstrap broker 192.168.43.15:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-08 09:44:37,006] WARN [Consumer clientId=consumer-console-consumer-85632-1, groupId=console-consumer-85632] Bootstrap broker 192.168.43.15:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
  • 接着指定consumer.config
[hduser@hadoop4 ~]$ kafka-console-consumer.sh  --bootstrap-server 192.168.43.15:9092  --from-beginning --topic test2 --consumer.config consumer.config
[2021-05-08 09:46:10,044] WARN [Consumer clientId=consumer-test-group-1, groupId=test-group] Error while fetching metadata with correlation id 2 : {test2=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2021-05-08 09:46:10,045] ERROR [Consumer clientId=consumer-test-group-1, groupId=test-group] Topic authorization failed for topics [test2] (org.apache.kafka.clients.Metadata)
[2021-05-08 09:46:10,047] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test2]

可以发现跟生产者是一样的,没有权限访问topic test2

3.5 授权

[hduser@hadoop4 ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --topic test2
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test2, patternType=LITERAL)`:
    (principal=User:reader, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test2, patternType=LITERAL)`:
    (principal=User:writer, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:reader, host=*, operation=READ, permissionType=ALLOW)

3.6 重新消费
接着消费还是会发现没有对组test-group的操作权限

[hduser@hadoop4 ~]$ kafka-console-consumer.sh  --bootstrap-server 192.168.43.15:9092  --from-beginning --topic test2 --consumer.config consumer.config
[2021-05-08 09:48:07,842] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: test-group
Processed a total of 0 messages

赋予权限

[hduser@hadoop4 ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --group test-group
Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=test-group, patternType=LITERAL)`:
    (principal=User:reader, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=test-group, patternType=LITERAL)`:
    (principal=User:reader, host=*, operation=READ, permissionType=ALLOW)

生产者发送

[hduser@hadoop4 ~]$ kafka-console-producer.sh --bootstrap-server 192.168.43.15:9092  --topic test2  --producer-property security.protocol=SASL_PLAINTEXT  --producer-property sasl.mechanism=PLAIN
>hahaha
>wanm^H^H
>完美
>

消费者消费

[hduser@hadoop4 ~]$ kafka-console-consumer.sh  --bootstrap-server 192.168.43.15:9092  --from-beginning --topic test2 --consumer.config consumer.config
hahaha
wanm
完美

使用admin用户查看用户的组信息
4.1 配置jaas.conf文件

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin";
};

4.2 配置脚本kafka-consumer-groups.sh

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/hadoop/kafka/config/admin_jaas.conf kafka.admin.ConsumerGroupCommand "$@"

4.3 配置安全协议属性

[hduser@hadoop4 ~]$ cat admin_sasl.config
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

4.4 查看组信息

[hduser@hadoop4 ~]$ kafka-consumer-groups.sh --group test-group --describe --command-config admin_sasl.config --bootstrap-server 192.168.43.15:9092

Consumer group 'test-group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-group      test2           0          3               3               0               -               -               -
test-group      test            1          1001515         1001516         1               -               -               -
test-group      test            0          992785          992786          1               -               -               -
test-group      test            3          1000894         1000894         0               -               -               -
test-group      test            2          1000772         1000773         1               -               -               -
test-group      test            4          1004034         1004034         0               -               -               -

一般生产环境还是得使用Kerberos配合ranger+ldap。

借鉴kafka实战

Original: https://www.cnblogs.com/yjt1993/p/14739130.html
Author: 北漂-boy
Title: kafka 配置认证与授权



相关阅读

Title: iceberg合并小文件冲突测试

基于iceberg的master分支的9b6b5e0d2(2022-2-9)。

1、PARTIAL_PROGRESS_ENABLED(partial-progress.enabled)
默认为 false。该参数能够让合并任务以group为单位做提交,当其中一个group任务失败,可以单独对该group任务重试。

2、USE_STARTING_SEQUENCE_NUMBER(use-starting-sequence-number)
默认为 true。
该参数使用做合并时的sequenceNumber作为新的数据文件的sequenceNumber。

一、append方式生成 a,b,c三个snapshot,基于b做文件合并。

模拟的场景是:已存在a,b快照,现在基于b快照做小文件合并,但任务还未完成时,另一条数据流基于b快照做了append类型的数据:

右:USE_STARTING_SEQUENCE_NUMBER,下:PARTIAL_PROGRESS_ENABLED true false true 成功 成功 false 成功 成功

  • 成功:生成新的快照,最终snapshot是 a,b,c,d。
  • 新生成的大文件是基于a,b的数据文件的总和,
  • 快照d中包含了c的数据,以及基于a,b合并的数据。
  • a,b,c,d对应的squenceNumber分别是1,2,3,4:
    USE_STARTING_SEQUENCE_NUMBER 为 true时,d里面生成的新的大文件对应的manifest的squeceid是用的以前的2,删除的manifest用的是新的id4
    kafka 配置认证与授权
    kafka 配置认证与授权

结论:
1、纯append流的数据,做小文件合并都能成功。
2、可以通过设置 USE_STARTING_SEQUENCE_NUMBER 字段来控制合并任务中的manifest的squencyNumber。

二、append方式生成 a,b,c 三个 snapshot,先基于c做一次合并,合并成功后,基于c再做一次合并。

模拟的场景是,基于c快照做小文件合并,该任务还未完成,又启动了一个基于c快照做小文件合并的任务:

右:USE_STARTING_SEQUENCE_NUMBER,下:PARTIAL_PROGRESS_ENABLED true false true 成功,但没生成新快照 成功,但没生成新快照 false 失败 失败

  • 成功,但没生成新快照:只是表示该任务是完整执行了,没有出现异常退出的情况,但最终并未生成新的快照。PARTIAL_PROGRESS_ENABLED 设置 true,会打印出异常信息,但由于是部分提交,这些异常被忽略,最终程序执行成功,但也没有生成新的 snapshot。Failure during rewrite commit process, partial progress enabled. Ignoring。
  • 失败:提示 Cannot commit rewrite because of a ValidationException or CommitFailedException. This usually means that this rewrite has conflicted with another concurrent Iceberg operation. To reduce the likelihood of conflicts, set partial-progress.enabled which will break up the rewrite into multiple smaller commits controlled by partial-progress.max-commits. Separate smaller rewrite commits can succeed independently while any commits that conflict with another Iceberg operation will be ignored. This mode will create additional snapshots in the table history, one for each commit.

结论:目前功能上不能基于同一个快照做多次合并,只会成功一次。

三、append方式生成 a,b 两个snapshot,对a的数据做更新生成c,再基于b做文件合并。

模拟的场景是:基于b快照做合并,此时还未完成,另一条数据流对a快照中的数据做了更新,且提交成功生成了c快照:

右:USE_STARTING_SEQUENCE_NUMBER,下:PARTIAL_PROGRESS_ENABLED true false true 成功 成功,但没生成新快照 false 成功 失败

  • 失败:提示 Cannot commit, found new delete for replaced data file。
  • 成功,生成最新快照d,生成快照a,b中数据合并的大文件
  • 成功,但没生成新的快照:参考上面说明

结论:在合并的过程中,有另一条数据流对需要合并的数据做修改,可以通过设置 USE_STARTING_SEQUENCE_NUMBER 来使任务成功。

四、append方式生成 a,b,c 三个snapshot,对c的数据做更新生成d,再基于b做文件合并。

模拟的场景:基于b快照做合并,此时还未完成,另一条数据流先做append,生成了c快照,然后又对c快照里的数据做修改生成了快照d:

右:USE_STARTING_SEQUENCE_NUMBER,下:PARTIAL_PROGRESS_ENABLED true false true 成功 成功,但没生成新快照 false 成功 失败

  • 失败,提示:
    Cannot commit, found new delete for replaced data file
    Cannot commit rewrite because of a ValidationException or CommitFailedException. This usually means that this rewrite has conflicted with another concurrent Iceberg operation. To reduce the likelihood of conflicts, set partial-progress.enabled which will break up the rewrite into multiple smaller commits controlled by partial-progress.max-commits. Separate smaller rewrite commits can succeed independently while any commits that conflict with another Iceberg operation will be ignored. This mode will create additional snapshots in the table history, one for each commit.

因为在获取文件的时候,此时只能读取到a和b下的数据文件,a和b下没有delete文件,所以没有读取a和b下数据文件的min和max。 在做merge的时候,最新的snapshot是d,此时有delete文件,所以需要判断该delete文件是否能够匹配上前面读取的数据文件。匹配条件有两个,一个是sequenceNumber,一个是最大最小值是否有交集。delete的文件是后生成的,它的sequenceNumber肯定是大于前面a,b下的数据文件,所以该条件满足。因为在读取a,b下的数据文件的时候,没有读取min,max,导致不能够跟delete文件做值的交叉范围判断,所以data文件中被关联了delete文件,所以这种情况下,虽然只对 a和b做合并,且后续修改的数据文件没有被修改,但依然会合并失败。
如果在读取a,b下数据文件的时候,把对应的min,max也读取上来,那么就可以合并成功,对应流程在 ManifestGroup.planFiles中,读取内容由 columns 决定。

  • 成功:生成最新快照e,最终的快照为:a,b,c,d,e
  • 成功,但没生成新的快照:参考上面说明

结论:虽然后续修改的数据并不在合并的数据中,但USE_STARTING_SEQUENCE_NUMBER为false依然会失败,具体原因已在上面说明。

1、PARTIAL_PROGRESS_ENABLED
当该参数为true时,虽然它最终能让任务执行完成,但实际上它忽略子提交失败的情况,所以实际有没有做合并与该参数无关。

2、USE_STARTING_SEQUENCE_NUMBER
设置该参数为true,可以修改新生成的dataFile和manifest的sequenceNumber为原来的number,这样在读取数据的时候,就可以把delteFile应用到新生成的dataFile中了,可以解决大多数数据冲突的情况。

Original: https://www.cnblogs.com/payapa/p/15932512.html
Author: 努力爬呀爬
Title: iceberg合并小文件冲突测试