不用源码做网站,搜索引擎入口大全,软件下载app排行榜,广东公司响应式网站建设设计文章目录 1. 背景2. 环境3. 操作步骤3.1 生成SSL证书3.2 配置zookeeper认证3.3 配置kafka安全认证3.4 使用kafka客户端进行验证3.5 使用Java端代码进行认证 1. 背景
kafka提供了多种安全认证机制#xff0c;主要分为SASL和SSL两大类。
SASL#xff1a; 是一种身份验证机制主要分为SASL和SSL两大类。
SASL 是一种身份验证机制用于在客户端和服务器之间进行身份验证的过程其中SASL/PLAIN是基于账号密码的认证方式。SSL 是一种加密协议用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接并对传输的数据进行加密和解密以防止未经授权的访问和篡改。
在 Kafka 中启用 SASL_SSL 安全协议时SASL 用于客户端和服务器之间的身份验证SSL 则用于加密和保护数据的传输。不仅提供身份验证还提供加密和数据保护的功能。
因工作需要需要在测试环境搭建一套基于SASL_SSL协议的kafka环境。坑比较多经过两天的研究终于搞定了特在此记录下。
2. 环境
操作系统linuxkafka版本kafka_2.13-2.7.1zookeeper版本apache-zookeeper-3.7.0应用程序版本spring-boot-2.6.7、JDK1.8
3. 操作步骤
生成SSL证书配置zookeeper配置kafka前三步配置完成后kafka就开启了SASL_SSL双重认证可以使用kafka自带的客户端进行测试3.4在业务代码中使用请查看3.5
3.1 生成SSL证书
按照步骤一步一步操作生成服务器/客户端的SSL证书。也就是公钥与私钥
参考【SSL协议】生成SSL证书 - lihewei - 博客园 (cnblogs.com)
3.2 配置zookeeper认证
第一步 在apache-zookeeper-3.7.0/conf 目录下创建 kafka_zk_jaas.conf 配置文件名称任意定义了两个用户可提供给生产者和消费者使用格式为user_用户名“用户密码”内容如下
Server {org.apache.zookeeper.server.auth.DigestLoginModule requireduser_admin1qazWSXuser_kafka1qazWSX;
};第二步 zookeeper配置文件zoo.cfg中新增SASL认证配置如下
authProvider.1org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthSchemesasl
jaasLoginRenew3600000第三步 在apache-zookeeper-3.7.0/bin/zkServer.sh脚本中新增jvm参数让其启动时加载jaas配置文件
export SERVER_JVMFLAGS-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config/home/crbt/local/apache-zookeeper-3.7.0/conf/kafka_zk_jaas.conf3.3 配置kafka安全认证
第一步 /home/crbt/local/kafka_2.13-2.7.1/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件内容如下
kafka-server-jaas.conf
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusernameadminpassword1qazWSXuser_admin1qazWSXuser_kafka1qazWSX;
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusernamekafkapassword1qazWSX;
};kafka-client-jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusernamekafkapassword1qazWSX;
};第二步 在kafka启动脚本kafka_2.13-2.7.1/bin/kafka-server-start.sh配置环境变量指定jaas.conf文件增加如下代码
增加环境变量 -Djava.security.auth.login.config/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf
...if [ x$KAFKA_HEAP_OPTS x ]; thenexport KAFKA_HEAP_OPTS-Xmx1G -Xms1G -Djava.security.auth.login.config/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf
fi...
**第三步**修改 kafka 的 server.properties配置文件
#listenersSSL://10.1.61.121:9092
host.namenode1
#listenersPLAINTEXT://node1:9092,SSL://node1:9093
listenersSASL_SSL://node1:9093
#advertised.listenersSSL://node1:9092
advertised.listenersSASL_SSL://node1:9093
ssl.keystore.location/home/crbt/lihw/ca/server/server.keystore.jks
ssl.keystore.passwordQ06688
ssl.key.passwordQ06688
ssl.truststore.location/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.passwordQ06688
ssl.client.authrequired
ssl.enabled.protocolsTLSv1.2,TLSv1.1,TLSv1
ssl.keystore.typeJKS
ssl.truststore.typeJKS
# kafka2.0.x开始将ssl.endpoint.identification.algorithm设置为了HTTPS即:需要验证主机名
# 如果不需要验证主机名那么可以这么设置 ssl.endpoint.identification.algorithm即可
ssl.endpoint.identification.algorithm
# 设置内部访问也用SSL默认值为security.inter.broker.protocolPLAINTEXT
security.inter.broker.protocolSASL_SSL
sasl.enabled.mechanismsPLAIN
sasl.mechanism.inter.broker.protocolPLAIN
authorizer.class.namekafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.foundtrue注意这里有个坑生成SSL密钥私钥时指定了主机的hostname这里也要配置kafka所在服务器的hostname
3.4 使用kafka客户端进行验证
第一步 修改kafka/config/下的 consumer.properties、producer.properties配置SASL_SSL验证的基本信息。
consumer.properties
bootstrap.serversnode1:9093
security.protocolSASL_SSL
ssl.truststore.location/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.passwordQ06688sasl.mechanismPLAIN
sasl.jaas.configorg.apache.kafka.common.security.scram.ScramLoginModule required usernamekafka password1qazWSX;producer.properties:
bootstrap.serversnode1:9093
security.protocolSASL_SSL
ssl.truststore.location/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.passwordQ06688sasl.mechanismPLAIN
sasl.jaas.configorg.apache.kafka.common.security.scram.ScramLoginModule required usernamekafka password1qazWSX;第二步 使用命令行操作时让其找到上述设置的SASL_SSL配置文件 --producer.config …/config/producer.properties
#生产
crbtnode1:/home/crbt/local/kafka_2.13-2.7.1/bin./kafka-console-producer.sh --bootstrap-server node1:9093 --topic first --producer.config ../config/producer.properties
aaa
bbb
ccc
#消费
crbtnode1:/home/crbt/local/kafka_2.13-2.7.1/bin./kafka-console-consumer.sh --bootstrap-server node1:9093 --topic first -consumer.config /home/crbt/local/kafka_2.13-2.7.1/config/consumer.properties
aaa
bbb
ccc3.5 使用Java端代码进行认证
第一步 yaml 配置文件
spring:kafka:bootstrap-servers: localhost:9093properties:sasl:mechanism: PLAINjaas:#此处填写 SASL登录时分配的用户名密码注意password结尾;config: org.apache.kafka.common.security.scram.ScramLoginModule required usernamekafka password1qazWSX;security:protocol: SASL_SSLssl:trust-store-location: /home/crbt/lihw/ca/trust/server.truststore.jkstrust-store-password: Q06688key-store-type: JKSproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbatch-size: 106384acks: -1retries: 3properties:linger-ms: 1retry.backoff.ms: 1000buffer-memory: 33554432第二步 使用 kafkaTemplate 的方式配置一个 config
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;Slf4j
Configuration
public class KafkaProducerConfig {Value(${spring.kafka.bootstrap-servers})private String bootstrapServers;Value(${spring.kafka.producer.acks})private String acks;Value(${spring.kafka.producer.retries})private String retries;Value(${spring.kafka.producer.batch-size})private String batchSize;Value(${spring.kafka.producer.properties.linger-ms})private int lingerMs;Value(${spring.kafka.producer.properties.buffer-memory})private int bufferMemory;Value(${spring.kafka.producer.key-serializer})private String keySerializer;Value(${spring.kafka.producer.value-serializer})private String valueSerializer;Value(${spring.kafka.properties.security.protocol})private String protocol;Value(${spring.kafka.properties.sasl.mechanism})private String mechanism;Value(${spring.kafka.ssl.trust-store-location})private String trustStoreLocation;Value(${spring.kafka.ssl.trust-store-password})private String trustStorePassword;Value(${spring.kafka.ssl.key-store-type})private String keyStoreType;Value(${spring.kafka.properties.sasl.jaas.config})private String jaasConfig;Beanpublic KafkaTemplateString, String kafkaTemplate() {return new KafkaTemplate(producerFactory());}/*** the producer factory config*/Beanpublic ProducerFactoryString, String producerFactory() {MapString, Object props new HashMapString, Object();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.ACKS_CONFIG, acks);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);props.put(security.protocol, protocol);props.put(SaslConfigs.SASL_MECHANISM, mechanism);props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, keyStoreType);props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);return new DefaultKafkaProducerFactoryString, String(props);}
}