写点什么

Kafka 配置 SASL_SSL 认证传输加密

  • 2025-09-04
    北京
  • 本文字数:5921 字

    阅读完需:约 19 分钟

本文分享自天翼云开发者社区《Kafka配置SASL_SSL认证传输加密》,作者:王****帅

一、SSL 证书配置

1、生成证书

如我输入命令如下:依次是 密码—重输密码—名与姓—组织单位—组织名—城市—省份—国家两位代码—密码—重输密码,后面告警不用管,此步骤要注意的是,名与姓这一项必须输入域名,如 “localhost”,切记不可以随意写,我曾尝试使用其他字符串,在后面客户端生成证书认证的时候一直有问题。

keytool -keystore server.keystore.jks -alias localhost -validity 3650 -genkeyEnter keystore password:Re-enter new password:What is your first and last name?[Unknown]:  localhostWhat is the name of your organizational unit?[Unknown]:  CH-kafkaWhat is the name of your organization?[Unknown]:  kafkadevWhat is the name of your City or Locality?[Unknown]:  shanghaiWhat is the name of your State or Province?[Unknown]:  shanghaiWhat is the two-letter country code for this unit?[Unknown]:  CHIs CN=localhost, OU=CH-kafka, O=kafkadev, L=shanghai, ST=shanghai, C=CH correct?[no]:  yesEnter key password for <localhost>	(RETURN if same as keystore password):  Re-enter new password: Warning:The JKS keystore uses a proprietary format. It is recommended to migrate to PKCS12 which is an industry standard format using "keytool -importkeystore -srckeystore server.keystore.jks -destkeystore server.keystore.jks -deststoretype pkcs12".
复制代码

完成上面步骤,可使用命令 keytool -list -v -keystore server.keystore.jks 来验证生成证书的内容


2、生成 CA

通过第一步,集群中的每台机器都生成一对公私钥,和一个证书来识别机器。但是,证书是未签名的,这意味着攻击者可以创建一个这样的证书来伪装成任何机器。

因此,通过对集群中的每台机器进行签名来防止伪造的证书。证书颁发机构(CA)负责签名证书。CA 的工作机制像一个颁发护照的政府。政府印章(标志)每本护照,这样护照很难伪造。其他政府核实护照的印章,以确保护照是真实的。同样,CA 签名的证书和加密保证签名证书很难伪造。因此,只要 CA 是一个真正和值得信赖的权威,client 就能有较高的保障连接的是真正的机器。如下,生成的 CA 是一个简单的公私钥对和证书,用于签名其他的证书,下面为输入命令,依次提示输入为 密码—重输密码—国家两位代码—省份—城市—名与姓—组织名—组织单位—名与姓(域名)—邮箱 ,此输入步骤与上面生成证书世输入步骤相反,输入值要与第一步一致,邮箱可不输入

openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650 Generating a 2048 bit RSA private key .........................................................................+++ ..................+++ writing new private key to 'ca-key' Enter PEM pass phrase: Verifying - Enter PEM pass phrase: ----- You are about to be asked to enter information that will be incorporated into your certificate request. What you are about to enter is what is called a Distinguished Name or a DN. There are quite a few fields but you can leave some blank For some fields there will be a default value, If you enter '.', the field will be left blank. ----- Country Name (2 letter code) [XX]:CH State or Province Name (full name) []:shanghai Locality Name (eg, city) [Default City]:shanghai Organization Name (eg, company) [Default Company Ltd]:kafkadev Organizational Unit Name (eg, section) []:CH-kafka Common Name (eg, your name or your server's hostname) []:localhost Email Address []: 
将生成的CA添加到**clients' truststore(客户的信任库)**,以便client可以信任这个CA:
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
复制代码

3、签名证书

步骤 2 生成的 CA 来签名所有步骤 1 生成的证书,首先,你需要从密钥仓库导出证书:

keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file 
复制代码

然后用 CA 签名:{validity},{ca-password} 两个为参数,

openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}
复制代码

最后,你需要导入 CA 的证书和已签名的证书到密钥仓库:

keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed 
复制代码

上文中的各参数解释如下:

keystore: 密钥仓库的位置

ca-cert: CA 的证书

ca-key: CA 的私钥

ca-password: CA 的密码

cert-file: 出口,服务器的未签名证书

cert-signed: 已签名的服务器证书

上面步骤所有执行脚本如下:注意密码修改为自己的密码,以防混淆,所有步骤密码最好设为同一个

#!/bin/bash#Step 1keytool -keystore server.keystore.jks -alias localhost -validity 3650 -keyalg RSA -genkey#Step 2openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650keytool -keystore server.truststore.jks -alias CARoot -import -file ca-certkeytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert#Step 3keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-fileopenssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 3650 -CAcreateserial -passin pass:123456keytool -keystore server.keystore.jks -alias CARoot -import -file ca-certkeytool -keystore server.keystore.jks -alias localhost -import -file cert-signed#Step 4keytool -keystore client.keystore.jks -alias localhost -validity 3650 -keyalg RSA -genkey
复制代码

二、配置 zookeeper 的安全认证

1、在 zookeeper 的 conf 文件夹下创建 jaas.conf 安全配置文件

此文件中定义了两个用户 admin 以及 kafka 等于号后面是用户对应的密码此文件定义的是连接 zookeeper 服务器的用户 JAAS 配置节点默认为 Server(节点名不可修改,修改后会报错)

Server {  org.apache.zookeeper.server.auth.DigestLoginModule required  user_admin="!234Qwer"  user_kafka="clearwater001";};
复制代码

2、在 zookeeper 的配置文件 zoo.cfg 中添加认证配置源文件如下

tickTime=2000initLimit=10syncLimit=5dataDir=/usr/local/zookeeper/apache-zookeeper-3.5.9-bin/datadataLogDir=/usr/local/zookeeper/apache-zookeeper-3.5.9-bin/logsclientPort=2181#sasl认证authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProviderrequireClientAuthScheme=sasljaasLoginRenew=3600000
复制代码

3、在 zkEnv.sh 启动环境脚本中添加 jvm 参数,将 jaas 的配置文件位置作为 JVM 参数传递给每个客户端的 JVM

 LIBPATH=("${ZOOKEEPER_PREFIX}"/share/zookeeper/*.jar)else  #release tarball format  for i in "$ZOOBINDIR"/../zookeeper-*.jar  do    CLASSPATH="$i:$CLASSPATH"  done  LIBPATH=("${ZOOBINDIR}"/../lib/*.jar)fi
for i in "${LIBPATH[@]}"do CLASSPATH="$i:$CLASSPATH"done
#make it work for developersfor d in "$ZOOBINDIR"/../build/lib/*.jardo CLASSPATH="$d:$CLASSPATH"done
for d in "$ZOOBINDIR"/../zookeeper-server/target/lib/*.jardo CLASSPATH="$d:$CLASSPATH"done
#make it work for developersCLASSPATH="$ZOOBINDIR/../build/classes:$CLASSPATH"
#make it work for developersCLASSPATH="$ZOOBINDIR/../zookeeper-server/target/classes:$CLASSPATH"
case "`uname`" in CYGWIN*|MINGW*) cygwin=true ;; *) cygwin=false ;;esac
if $cygwinthen CLASSPATH=`cygpath -wp "$CLASSPATH"`fi
#echo "CLASSPATH=$CLASSPATH"
# default heap for zookeeper serverZK_SERVER_HEAP="${ZK_SERVER_HEAP:-1000}"export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS"#JVM参数export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/usr/local/zookeeper/apache-zookeeper-3.5.9-bin/conf/jaas.conf"
# default heap for zookeeper clientZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-256}"export CLIENT_JVMFLAGS="-Xmx${ZK_CLIENT_HEAP}m $CLIENT_JVMFLAGS"
复制代码

三、配置 kafka 的安全认证

1、在 kafka 的 conf 目录下创建 jaas.conf 认证文件

username 和 password 属性 用来定义 kafka 中各个 broker 节点之间相互通信的用户 user_用来定义连接到 kafka 中各个 broker 的用户 这些用户可供生产者以及消费者进行使用两个用户的配置均在 JAAS 默认配置节点 KafkaServer 中进行配置 broker 连接到 zookeeper 的用户在 JAAS 默认配置节点 Client 中进行配置,从上面 zookeeper 中的 jaas 文件中选择一个用户进行使用

KafkaServer {        org.apache.kafka.common.security.plain.PlainLoginModule required        username="admin"        password="clearwater"        user_admin="clearwater"        user_kafka="!234Qwer";};Client {  org.apache.kafka.common.security.plain.PlainLoginModule required    username="kafka"    password="clearwater001";};
复制代码

2、在 kafka 的 conf 目录下创建 kafka_client_jaas.conf 认证文件

KafkaClient {        org.apache.kafka.common.security.plain.PlainLoginModule required        username="kafka"        password="!234Qwer";};
复制代码

3、在 kafka 的 bin 目录下 kafka-server-start.sh 的启动脚本中配置环境变量,指定 jaas.conf 文件

if [ $# -lt 1 ];then	echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"	exit 1fibase_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"fi#环境变量if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.8.0/config/jaas.conf"fi
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
COMMAND=$1case $COMMAND in -daemon) EXTRA_ARGS="-daemon "$EXTRA_ARGS shift ;; *) ;;esac
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
复制代码

4、在 kafka 的 bin 目录下 kafka-console-producer.sh 的启动脚本中配置环境变量,指定 jaas.conf 文件

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then    export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.8.0/config/kafka_client_jaas.conf"fiexec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
复制代码

5、在 kafka 的 bin 目录下 kafka-server-start.sh 的启动脚本中配置环境变量,指定 jaas.conf 文件

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then    export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.8.0/config/kafka_client_jaas.conf"fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
复制代码

6、在 kafka 的 bin 目录下创建 client-ssl.properties 认证文件(执行生产者和消费者命令时指定)

security.protocol=SASL_SSLssl.endpoint.identification.algorithm=sasl.mechanism=PLAINgroup.id=testssl.truststore.location=/usr/local/kafka_2.12-2.8.0/ssl/client.truststore.jksssl.truststore.password=clearwater001!
复制代码

7、配置 kafka 的 server.properties 配置文件,添加如下内容

#sasl_ssllisteners=SASL_SSL://172.17.0.53:9093advertised.listeners=SASL_SSL://172.17.0.53:9093security.inter.broker.protocol=SASL_SSLsasl.enabled.mechanisms=PLAINsasl.mechanism.inter.broker.protocol=PLAINauthorizer.class.name=kafka.security.auth.SimpleAclAuthorizerallow.everyone.if.no.acl.found=truessl.keystore.location=/usr/local/kafka_2.12-2.8.0/ssl/server.keystore.jksssl.keystore.password=clearwater001!ssl.key.password=clearwater001!ssl.truststore.location=/usr/local/kafka_2.12-2.8.0/ssl/server.truststore.jksssl.truststore.password=clearwater001!ssl.endpoint.identification.algorithm=
复制代码

8、重启 zookeeper 和 kafka,创建 topic(命令在第四节),添加生产者和消费者授权

#生产者授权./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:"kafka" --producer --topic "test"#消费者授权./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:"kafka" --consumer --topic "test" --group '*'
复制代码

四、相关启动命令

1、启动 zookeeper

/usr/local/zookeeper/apache-zookeeper-3.5.9-bin/bin/zkServer.sh start 
复制代码

2、启动 kafka-server

./kafka-server-start.sh  -daemon /usr/local/kafka_2.12-2.8.0/config/server.properties  
复制代码

3、创建 topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #查看topic list ./kafka-topics.sh --list --zookeeper localhost:2181 
复制代码

4、加密前生产消费消息(一般使用新版本命令)

###生产消息####老版本./kafka-console-producer.sh --broker-list localhost:9092 --topic test#新版本./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
###消费消息####老版本./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning#新版本./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
复制代码

5、加密后生产消费消息命令

#生产消息 ./kafka-console-producer.sh --bootstrap-server 172.17.0.53:9093 --topic test --producer.config client-ssl.properties #消费消息./kafka-console-consumer.sh --bootstrap-server 172.17.0.53:9093 --topic test --from-beginning --consumer.config client-ssl.properties
复制代码


用户头像

还未添加个人签名 2022-02-22 加入

天翼云是中国电信倾力打造的云服务品牌,致力于成为领先的云计算服务提供商。提供云主机、CDN、云电脑、大数据及AI等全线产品和场景化解决方案。

评论

发布
暂无评论
Kafka配置SASL_SSL认证传输加密_kafka_天翼云开发者社区_InfoQ写作社区