网站日志管理,小熊代刷推广网站,建筑公司企业愿景怎么写,wordpress postmetaKafka入门及安装 文章目录 Kafka入门及安装1.介绍Kafka的基本概念和核心组件 2.安装1.docker快速安装zookeeper安装kafka安装 添加topic删除topickafka-ui安装 2.Docker安装#xff08;SASL/PLAIN认证配置-用户名密码#xff09; 来源参考的deepseek#xff0c;如有侵权联系…Kafka入门及安装 文章目录 Kafka入门及安装1.介绍Kafka的基本概念和核心组件 2.安装1.docker快速安装zookeeper安装kafka安装 添加topic删除topickafka-ui安装 2.Docker安装SASL/PLAIN认证配置-用户名密码 来源参考的deepseek如有侵权联系立删
1.介绍
Kafka的基本概念和核心组件
Kafka是分布式流处理平台由 Scala 和 Java 编写。Kafka 的核心概念和组件包括
Producer发布消息的客户端向Broker发送数据。Consumer订阅消息的客户端从Broker读取数据。BrokerKafka服务节点负责消息存储和转发。Topic消息的逻辑分类单位类似数据库表。Partition主题的分区实现并行处理和扩展性。Consumer Group多个消费者组成的组协同消费同一主题的不同分区。
详细介绍
主题Topic 主题是消息的逻辑分类单位类似于数据库中的表。每个主题可以包含多个分区Partition用于实现水平扩展和高吞吐量。分区Partition 分区是物理上的消息分组每个分区是一个有序且不可变的消息队列。分区可以分布在不同的服务器上以实现数据的分布式存储和处理。Broker服务器 Broker 是 Kafka 集群中的节点负责接收和存储消息。一个集群可以由多个 Broker 组成每个 Broker 可以管理多个分区和副本。生产者Producer 生产者是向 Kafka 主题发送消息的客户端。生产者将消息发布到指定的主题中消息会被路由到相应的分区。消费者Consumer 消费者是从 Kafka 主题中拉取消息的客户端。消费者可以订阅一个或多个主题并从这些主题中读取消息。消费者可以组成消费者组Consumer Group以实现消息的并行消费和故障恢复。消费者组Consumer Group 消费者组由多个消费者组成组内的每个消费者负责消费不同分区的消息以实现高效的消息分发和容错能力。ZooKeeperZooKeeper 是一个分布式协调服务用于管理 Kafka 集群的元数据如主题、分区和副本的配置信息。ZooKeeper 还负责选举和维护集群的领导者。日志Log Kafka 使用日志来存储消息每个主题的日志由多个分区组成每个分区包含一系列有序的消息。日志可以进行压缩和持久化以提高存储效率和数据可靠性。偏移量Offset 偏移量用于跟踪消息的消费位置。消费者通过偏移量来确定从何处继续消费消息。复制机制Kafka 通过复制机制提高容错能力。每个分区可以有多个副本即使部分 Broker 失效服务仍可继续运行。
这些组件共同构成了 Kafka 的核心架构使其能够实现高吞吐量、低延迟、可扩展性和持久化的消息传递和处理。
2.安装
1.docker快速安装
zookeeper安装
docker pull wurstmeister/zookeeperdocker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeperkafka安装
docker pull wurstmeister/kafkadocker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper \
-v /etc/localtime:/etc/localtime \
--env KAFKA_ZOOKEEPER_CONNECTzookeeper:2181 \
--env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://127.0.0.1:9092 \
--env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 \
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR1 wurstmeister/kafka--name kafka: 设置容器的名字为“kafka”。-p 9092:9092: 将容器的9092端口映射到宿主机的9092端口。--link zookeeper:zookeeper: 连接到名为“zookeeper”的另一个Docker容器并且在当前的容器中可以通过zookeeper这个别名来访问它。--env KAFKA_ZOOKEEPER_CONNECTzookeeper:2181: 设置环境变量指定ZooKeeper的连接字符串。--env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://localhost:9092: 设置环境变量指定Kafka的advertised listeners。--env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092: 设置环境变量指定Kafka的listeners。--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR1: 设置环境变量指定offsets topic的副本因子。wurstmeister/kafka: 使用的Docker镜像名字。/opt/kafka/config/server.properties #镜像内部配置文件地址./kafka-server-start.sh -daemon ./config/server.properties #启动命令添加topic
kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 \
--replication-factor 1 --partitions 1 --topic mytest删除topic
kafka-topics.sh \--delete --topic lx \--zookeeper 127.0.0.1:2181kafka-ui安装
docker run --name kafka-ui -p 19092:8080 \-e KAFKA_CLUSTERS_0_NAMElocal \-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS127.0.0.1:9092 \-d provectuslabs/kafka-ui:latest 2.Docker安装SASL/PLAIN认证配置-用户名密码
1.宿主机创建用户密码配置文件
#创建文件
touch /dockerData/kafka/config/kafka_server_jaas.conf2.编辑kafka_server_jaas.conf配置
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusernameadminpassword123456user_admin123456user_moshangshang123456;
}; user_admin123456
user_后面跟的是用户名然后123456为密码可配置多个用户密码3.运行kafka容器
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper \
-v /etc/localtime:/etc/localtime \
-v /dockerData/kafka/config/kafka_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf \
--env KAFKA_ZOOKEEPER_CONNECTzookeeper:2181 \
--env 192.168.1.250:9092 \
--env KAFKA_LISTENERSSASL_PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_OPTS-Djava.security.auth.login.config/opt/kafka/config/kafka_server_jaas.conf \
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR1 wurstmeister/kafka4.宿主机创建kafka的server.properties 配置
5.修改并添加认证相关配置
listenersSASL_PLAINTEXT://0.0.0.0:9092
advertised.listenersSASL_PLAINTEXT://192.168.1.250:9092
security.inter.broker.protocolSASL_PLAINTEXT
sasl.enabled.mechanismsPLAIN
sasl.mechanism.inter.broker.protocolPLAIN6.拷贝宿主机配置文件覆盖容器内部原本文件
docker cp /dockerData/kafka/config/server.properties kafka:/opt/kafka/config7.重启容器 8.整合springboot的yml配置
#kafka配置
spring:kafka:#kafka集群地址bootstrap-servers: 192.168.25.100:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后会发送properties:linger.ms: 1security.protocol: SASL_PLAINTEXTsasl.mechanism: PLAINsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required usernameadmin password123456;#代表集群中从节点都持久化后才认为发送成功acks: -1consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000security.protocol: SASL_PLAINTEXTsasl.mechanism: PLAINsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required usernameadmin password123456;group-id: testauto-offset-reset: earliestlistener:ack-mode: MANUAL # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch
9.kafka-ui配置认证
docker run --name kafka-ui -p 19092:8080 \-e KAFKA_CLUSTERS_0_NAMElocal \-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS192.168.1.250:9092 \-e KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOLSASL_PLAINTEXT \-e KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISMPLAIN \-e KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIGorg.apache.kafka.common.security.plain.PlainLoginModule required usernamemoshangshang password123456; \-d provectuslabs/kafka-ui:latest