写点什么

Flink 读写多套 Kerberos 认证的 Kafka 方案

  • 2022-10-28
    江苏
  • 本文字数:3524 字

    阅读完需:约 12 分钟

本文介绍了 Kafka 的动态 Kerberos 认证,分析了它的原理和优势,同时给出了在非互信的情况下应用的动态配置方式及 Flink 测试用例,打通了多个集群间的数据流转,可扩展到异地 Kafka 集群间的应用读写方案,最大程度的挖掘数据价值。

一、前言

鉴于线上安全认证的迫切性,绝大部分 Kafka 集群都加入了 Kerberos 认证机制。开启 Kerberos 认证的集群运行时,集群内的节点使用密钥得到认证。只有被认证过节点才能正常使用。企图冒充的节点由于没有事先得到的密钥信息,无法与集群内部的节点通信。防止了恶意的使用或篡改 Kafka 集群的问题,确保 Kafka 集群的可靠安全。

由于 Kafka 相关业务随着时间而不停增长,因此通常情况下,为了满足不同业务的相关需求,会存在多个 Kafka 集群同时运行的情况。业务需要对于不同集群的数据进行不同的处理,多个集群中的数据需要能够互相流通。

此时,读写不同 Kerberos 服务上的 Kafka 集群,有以下两种不同方案:互信方案和非互信方案。

互信方案即两个 Kerberos 集群做互信,此时一套 Keytab 文件和 Principal 能够访问多套 Kerberos 集群,该方案的优点为一次部署,后期仅需运维 Kerberos 状态;但是缺点较多:首先所有服务均需要修改并重启,已部署的应用会受到比较大的影响,同时两个 Kerberos 集群会互相影响,运维也较复杂。

非互信方案即保持原有的 Kerberos 服务和 Kafka 集群不动,只在客户端做 krb5 的配置,使得应用只需要配置不同的用户信息即可访问多套 Kerberos 认证的 Kafka 集群。

二、Kafka 加入 Kerberos 后客户端认证方式

Kerberos 是一种基于可信任第三方的网络认证协议,其设计目标是解决在分布式网络环境下对接入的用户进行身份认证的问题。除了服务器和用户以外,Kerberos 还包括可信任第三方密钥发放中心(KDC),KDC 负责整个认证过程的票据生成管理服务。

KDC 包含两部分 :认证服务器(AS)和票据授权服务器(TGS)。AS 负责在客户端向 AS 发起请求时,向客户端发放票据授权票据(TGT)。TGS 负责验证 TGT,并授予服务票据,服务票据允许认证过的主体使用应用服务器提供的服务,如图所示:



如上图所示,Kafka 需要使用秘钥文件来验证用户,其加入 Kerberos 后客户端认证有两种方式:静态文件 Jaas 认证和动态 Sasl 认证。

静态认证通过加入 JVM 参数-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf来通过 Kafka 认证,其文件内容通常为:

KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseTicketCache=falseuseKeytab=truekeytab=”/path/to/kafka_client.keytab”principal=”client@realm.com”renewTicket=trueserviceName="kafka";};
复制代码

这种验证方式只需要在 Java 程序运行时加入 jvm 参数即可,无需应用方做其他配置,因此在线上被大量使用。但是这种方式它是 JVM 全局共享的,并且默认无法进行刷新。结果即为所有客户端都只能以进程为单位来进行 Kerberos 验证,即在同一个进程中只能读写一个 Kerberos 服务中的 Kafka 集群数据,那么如果用户需要使用消费者读取一个集群 Kafka,并通过生产者写入另外一个 Kafka 集群时,必须要将消费者和生产者分为不同的进程中。此时应用需要进行大量的适配,保证其可以读写多套 Kerberos 验证的 Kafka 集群。

此时即可考虑动态 SASL 验证,Kafka 官方客户端在 0.10.1.0 之后即可通过 Kafka 客户端的配置指定其需要验证的 Keytab 文件和 Principal:

props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "        + "useTicketCache=false "        + "renewTicket=true "        + "serviceName=\"kafka\" "        + "useKeyTab=true "        + "keyTab=\"/path/to/kafak_client.keyab\" "        + "principal=\"client@realm.com\";");
复制代码

此时,即可通过配置初始化 Kafka 的消费者和生产者,不同的配置即可对应不同的 Kerberos 集群,可实现在同一个进程中读写不同 Kerberos 的 Kafka 集群。

动态认证相比静态认证,有着以下的优点:

因此,为了满足线上 Flink 读写多套 Kerberos 的 Kafka 的需求,可采用动态 SASL 认证来开发应用。

三、多套 Kerberos 认证的客户端配置

客户端需要在 Kerberos 配置文件中指定配置将集群中的主机映射到相应的 kerberos 服务中,如下所示:

[domain_realm]bj01host-748167.host.idcvdian.com = example1.combj01host-748168.host.idcvdian.com = example1.combj01host-748169.host.idcvdian.com = example1.combj01host-614146.host.idcvdian.com = example2.combj01host-614147.host.idcvdian.com = example2.com
复制代码

即需要指定集群所需要验证的 KDC 集群,从而使得 kerberos 验证时能够找到其所在的 KDC Server。

四、Flink 读写多套 Kerberos 认证的 Kafka 实例

当每个节点都配置完后,即可完成 Flink 读写多套 Kerberos 认证的 Kafka 实例。

为了测试验证信息和文件的正确性,可先使用 JAVA 的 api 来读写:

//消费者关键配置如下(只列出了Kerberos相关)Properties props = new Properties();        props.put("security.protocol", "SASL_PLAINTEXT");        props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "                + "useTicketCache=false "                + "renewTicket=true "                + "serviceName=\"kafka\" "                + "useKeyTab=true "                + "keyTab=\""+ inKeytab+"\" "                + "principal=\""+ inPrincipal +"\";");        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);        //创建消费者对象以后即可获取Kafka中数据//生产者关键参数如下(只列出了Kerberos相关)        Properties props1 = new Properties();        props1.put("security.protocol", "SASL_PLAINTEXT");        props1.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "                + "useTicketCache=false "                + "renewTicket=true "                + "serviceName=\"kafka\" "                + "useKeyTab=true "                + "keyTab=\"" + outKeytab + "\" "                + "principal=\"" + outPrincipal + "\";");        KafkaProducer<String, String> producer = new KafkaProducer<>(props1);        //创建生产者对象后即可向Kafka写入数据
复制代码

调用上述 Java 程序,从 Kafka1 读到数据,并将特定的数据写入 Kafka2:

已读到了 Kafka1 的数据,可查看 Kafka2 的数据是否正确:


可看到,完成了一个进程中读入 Kafka1 的集群,并写入 Kafka2。

此时即可通过 Flink 测试读写多套 Kerberos 认证的 Kafka 集群:

 //从Kafka1中读到数据,并写入Kafka2        DataStream<String> messageStream = env                .addSource(new FlinkKafkaConsumer<String>(inTopic, new SimpleStringSchema(), getKafkaConsumerProperties()));        messageStream                .rebalance()                .addSink(new FlinkKafkaProducer<String>(outTopic,                        new SimpleStringSchema(),                        getKafkaProducerProperties())).setParallelism(1);        //Kafka消费者与生产者配置可通过上述代码获取
复制代码

开发打包后,即可提交 Flink 任务,从 Kafka1 读取数据,并将读到的数据写入至 Kafka2 中:

/usr/bch/3.2.0/flink/bin/flink run -c flink.benchmark.FlinkKakfaReadAndWrite -d -m yarn-cluster -yt /home/kerberoskeytabfile -yqu root.test -ys 1 /path/to/flinktest.jar
复制代码

为了确保消费者和生产者在同一个进程中运行,将程序并发设置为 1,提交后应用为:

尝试往 kafka1 写入数据,同时从 kafka2 中读取数据:

可以看到,测试写入的数据为 testmessage1750,读到的数据相同,Flink 应用功能正常,完成了 Kafka1 至 Kafka2 直接的数据传输。

五、总结及拓展

目前 Kerberos 为大数据领域广泛使用的安全认证服务,Kafka 集群在采用了 kerberos 认证后,常用的静态认证方式即 JVM 中加入 jaas 文件,在不重启集群(非互信)的情况下无法满足多套 Kerberos 集群的 Kafka 数据传输要求。

本文介绍了 Kafka 的动态 Kerberos 认证,分析了它的原理和优势,同时给出了在非互信的情况下应用的动态配置方式及 Flink 测试用例,打通了多个集群间的数据流转,可扩展到异地 Kafka 集群间的应用读写方案,最大程度的挖掘数据价值。

参考链接

  • https://web.mit.edu/kerberos/

  • https://kafka.apache.org/

用户头像

移动云,5G时代你身边的智慧云 2019-02-13 加入

移动云大数据产品团队,在移动云上提供云原生大数据分析LakeHouse,消息队列Kafka/Pulsar,云数据库HBase,弹性MapReduce,数据集成与治理等PaaS服务。 微信公众号:人人都学大数据

评论

发布
暂无评论
Flink 读写多套 Kerberos 认证的 Kafka 方案_移动云大数据_InfoQ写作社区