长沙自助建站哪家好,重庆荣昌网站建设报价,wordpress网站支持中文注册,大学生dw网页设计作业【Kafka-3.x-教程】专栏#xff1a;
【Kafka-3.x-教程】-【一】Kafka 概述、Kafka 快速入门 【Kafka-3.x-教程】-【二】Kafka-生产者-Producer 【Kafka-3.x-教程】-【三】Kafka-Broker、Kafka-Kraft 【Kafka-3.x-教程】-【四】Kafka-消费者-Consumer 【Kafka-3.x-教程】-【五…【Kafka-3.x-教程】专栏
【Kafka-3.x-教程】-【一】Kafka 概述、Kafka 快速入门 【Kafka-3.x-教程】-【二】Kafka-生产者-Producer 【Kafka-3.x-教程】-【三】Kafka-Broker、Kafka-Kraft 【Kafka-3.x-教程】-【四】Kafka-消费者-Consumer 【Kafka-3.x-教程】-【五】Kafka-监控-Eagle 【Kafka-3.x-教程】-【六】Kafka 外部系统集成 【Flume、Flink、SpringBoot、Spark】 【Kafka-3.x-教程】-【七】Kafka 生产调优、Kafka 压力测试 【Kafka-3.x-教程】-【三】Kafka-Broker、Kafka-Kraft 1Kafka Broker 工作流程1.1.Zookeeper 存储的 Kafka 信息1.2.Kafka Broker 总体工作流程1.3.Broker 重要参数 2节点服役和退役2.1.服役新节点2.2.退役旧节点 3 Kafka 副本3.1.副本基本信息3.2.Leader 选举流程3.3.Leader 和 Follower 故障处理细节3.4.分区副本分配3.5.手动调整分区副本存储3.6.Leader Partition 负载平衡3.7.增加副本因子 4文件存储4.1.Topic 数据存储机制4.2.Topic 数据存储位置4.3.index 文件和 log 文件详解 5文件清理策略6高效读写数据7Kafka-Kraft 模式7.1.Kafka-Kraft 架构7.2.Kafka-Kraft 集群部署7.3.Kafka-Kraft 集群启动停止脚本 1Kafka Broker 工作流程
1.1.Zookeeper 存储的 Kafka 信息
Kafka 2.8 版本以后Kafka-Kraft 模式出现不再依赖 ZK由 controller 节点代替 zookeeper元数据保存在 controller 中由 controller 直接进行 Kafka 集群管理。点击此处查看 Kafka-Kraft 模式。 1.2.Kafka Broker 总体工作流程 1、Broker 启动后向 ZK 进行注册ZK 记录好存活的 Broker。
2、每个 Broker 中都有 Controller谁的 Controller 先注册谁就是 Controller Leader。
3、Controller Leader 上线后监听已经注册的 Broker 节点的变化。
4、Controller 开始选举 Leader
1选举规则在 ISR 中存活着的节点按照 AR 中排在前面的优先Leader 也会按照 AR 中的排列顺序进行轮询。
2ARKafka 分区中所有副本的统称。
5、Controller 将节点信息Leader、ISR记录在 ZK 中。
6、其他 Controller 节点从 ZK 中拉取记录好的数据防止 Leader 挂了后其他节点上位获取相关数据。
7、Producer 发送消息后 Follower 主动从 Leader 同步数据。
1底层以 log 的方式进行存储但是 log 实际上是抽象的称呼实际上是以 segment 1G进行存储。
2segment 中包含 .log 和 .index 文件.log 就是数据.index 负责加快检索速度。
8、如果 Leader 挂了Controller 会监听到这个变化从而在 ZK 中重新拉取到 Leader 信息和 ISR 信息。
9、重新选举原则还是按照 AR 中排在前面的优先。
10、将新的 Leader 和 ISR 信息更新回 ZK 中。
1.3.Broker 重要参数 2节点服役和退役
实际生产中会出现 kafka 节点的服役和退役那么我们该如何进行负载均衡操作呢
2.1.服役新节点
1、创建一个要均衡的主题。
vim topics-to-move.json#添加下面内容
{topics: [{topic: first}],version: 1
}2、生成一个负载均衡的计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2,3 --generate#Current partition replica assignment
#{version:1,partitions:[{topic:first,partition:0,replicas:[0,2,1],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[2,1,0],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[1,0,2],log_dirs:[any,any,any]}]}#Proposed partition reassignment configuration
#{version:1,partitions:[{topic:first,partition:0,replicas:[2,3,0],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[3,0,1],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[0,1,2],log_dirs:[any,any,any]}]}3、创建副本存储计划所有副本存储在 broker0、broker1、broker2、broker3 中。
vim increase-replication-factor.json#输入如下内容
{version:1,partitions:[{topic:first,partition:0,replicas:[2,3,0],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[3,0,1],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[0,1,2],log_dirs:[any,any,any]}]}4、执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute5、验证副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify#Status of partition reassignment:
#Reassignment of partition first-0 is complete.
#Reassignment of partition first-1 is complete.
#Reassignment of partition first-2 is complete.
#Clearing broker-level throttles on brokers 0,1,2,3
#Clearing topic-level throttles on topic first2.2.退役旧节点
1、执行负载均衡操作
先按照退役一台节点生成执行计划然后按照服役时操作流程执行负载均衡。
1创建一个要均衡的主题。
vim topics-to-move.json
{topics: [{topic: first}],version: 1
}2、创建执行计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2 --generate#Current partition replica assignment
#{version:1,partitions:[{topic:first,partition:0,replicas:[2,0,1],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[3,1,2],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[0,2,3],log_dirs:[any,any,any]}]}#Proposed partition reassignment configuration
#{version:1,partitions:[{topic:first,partition:0,replicas:[2,0,1],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[0,1,2],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[1,2,0],log_dirs:[any,any,any]}]}3、创建副本存储计划所有副本存储在 broker0、broker1、broker2 中。
vim increase-replication-factor.json#添加如下内容
{version:1,partitions:[{topic:first,partition:0,replicas:[2,0,1],log_dirs:[any,any,any]},{topic:first,partition:1,replicas:[0,1,2],log_dirs:[any,any,any]},{topic:first,partition:2,replicas:[1,2,0],log_dirs:[any,any,any]}]}4、执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute5、验证副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify#Status of partition reassignment:
#Reassignment of partition first-0 is complete.
#Reassignment of partition first-1 is complete.
#Reassignment of partition first-2 is complete.
#Clearing broker-level throttles on brokers 0,1,2,3
#Clearing topic-level throttles on topic first6、执行停止命令
bin/kafka-server-stop.sh3 Kafka 副本
3.1.副本基本信息
1、Kafka 副本作用提高数据可靠性。
2、Kafka 默认副本 1 个生产环境一般配置为 2 个保证数据可靠性太多副本会增加磁盘存储空间增加网络上数据传输降低效率。
3、Kafka 中副本分为Leader 和 Follower。Kafka 生产者只会把数据发往 Leader然后 Follower 找 Leader 进行同步数据。
4、Kafka 分区中的所有副本统称为 ARAssigned Repllicas。
AR ISR OSR
ISR表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定默认 30s。Leader 发生故障之后就会从 ISR 中选举新的 Leader。
OSR表示 Follower 与 Leader 副本同步时延迟过多的副本。
3.2.Leader 选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader负责管理集群 broker 的上下线所有 topic 的分区副本分配和 Leader 选举等工作。
Controller 的信息同步工作是依赖于 Zookeeper 的。 1、创建一个新的 topic4 个分区4 个副本。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu1 --partitions 4 --replication-factor 4
#Created topic atguigu1.2、查看 Leader 分布情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic atguigu1#Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
#Configs: segment.bytes1073741824
#Topic: atguigu1 Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
#Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
#Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
#Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,33、停止掉 hadoop105 的 kafka 进程并查看 Leader 分区情况。
bin/kafka-server-stop.sh
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic atguigu1#Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
#Configs: segment.bytes1073741824
#Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
#Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
#Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
#Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,04、停止掉 hadoop104 的 kafka 进程并查看 Leader 分区情况。
bin/kafka-server-stop.sh
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic atguigu1#Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
#Configs: segment.bytes1073741824
#Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
#Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
#Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
#Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,05、启动 hadoop105 的 kafka 进程并查看 Leader 分区情况。
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic atguigu1#Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
#Configs: segment.bytes1073741824
#Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3
#Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3
Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3
#Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,36、启动 hadoop104 的 kafka 进程并查看 Leader 分区情况。
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic atguigu1#Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
#Configs: segment.bytes1073741824
#Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2
#Topic: atguigu1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2
#Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2
#Topic: atguigu1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,27、停止掉 hadoop103 的 kafka 进程并查看 Leader 分区情况。
bin/kafka-server-stop.sh
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic atguigu1#Topic: atguigu1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
#Configs: segment.bytes1073741824
#Topic: atguigu1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,3,2
#Topic: atguigu1 Partition: 1 Leader: 2 Replicas: 1,2,3,0 Isr: 0,3,2
#Topic: atguigu1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,2
#Topic: atguigu1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 0,3,23.3.Leader 和 Follower 故障处理细节
首先了解两个概念
LEOLog End Offset每个副本的最后一个offsetLEO其实就是最新的offset 1
HWHigh Watermark所有副本中最小的LEO
1、Follower 故障 1发生故障的 Follower 被踢出 ISR。
2这期间 Leader 和剩下正常工作的 Follower 会继续接收数据。
3发生故障的 Follower 恢复正常后会读取本地磁盘记录的上次的 HW并将 log 文件高于 HW 的部分截取掉从 HW 开始向 Leader 进行同步。
4等到该 Follower 的 LEO 大于等于该 Partition 的 HW即 Follower 追上 Leader 后就可以重新假如 ISR 了。
2、Leader 故障 1发生故障的 Leader 被踢出 ISR。
2从 ISR 中选出一个新的 Leader。
3为了保证副本之间数据的一致性其余 Follower 会先将各自的 log 文件高于 HW 的部分截取掉保持和新选举的 Leader 一致然后从新的 Leader 同步数据。
注意只能保证副本之间数据的一致性并不能保证数据不丢失或者不重复。
3.4.分区副本分配
如果 kafka 服务器只有 4 个节点那么设置 kafka 的分区数大于服务器台数在 kafka 底层如何分配存储副本呢
1、创建 16 分区3 个副本的 topic名称为 second。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 16 --replication-factor 3 --topic second2、查看分区和副本情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic second#Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
#Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
#Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
#Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
#Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
#Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
#Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
#Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
#Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
#Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
#Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
#Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
#Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
#Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
#Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
#Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1总结
① 尽可能让副本均匀分布 - 负载均衡。
② 防止数据丢失。
3.5.手动调整分区副本存储 手动调整分区副本存储的步骤如下
1、创建一个新的 topic名称为 three。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 4 --replication-factor 2 --topic three2、查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three3、创建副本存储计划所有副本都指定存储在 broker0、broker1 中。
vim increase-replication-factor.json#输入如下内容
{
version:1,
partitions:[{topic:three,partition:0,replicas:[0,1]},
{topic:three,partition:1,replicas:[0,1]},
{topic:three,partition:2,replicas:[1,0]},
{topic:three,partition:3,replicas:[1,0]}]
}4、执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute5、验证副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify6、查看分区副本存储情况。
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic three3.6.Leader Partition 负载平衡 3.7.增加副本因子
在生产环境当中由于某个主题的重要等级需要提升我们考虑增加副本。副本数的增加需要先制定计划然后根据计划执行。
1、创建 topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 3 --replication-factor 1 --topic four2、手动增加副本存储
1创建副本存储计划所有副本都指定存储在 broker0、broker1、broker2 中。
vim increase-replication-factor.json#输入如下内容
{version:1,partitions:[{topic:four,partition:0,replicas:[0,1,2]},{topic:four,partition:1,replicas:[0,1,2]},{topic:four,partition:2,replicas:[0,1,2]}]}2执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute4文件存储
4.1.Topic 数据存储机制 1、一个 topic 分为多个 partition一个 partition 分为多个 segment数据以 segment 形式进行存储分文件存储大小为 1G。
2、一个 segment 分为.log数据、.index为了快速定位数据位置的索引、.timeindex自动删除策略时的时间基准、其他文件。
3、index 和 log 文件以当前 segment 的第一条消息的 offset 命名命名规则topic 名称 分区序号。
4、Producer 的数据会不断追加到 segment 后写入速度较快。
4.2.Topic 数据存储位置
1、启动生产者并发送消息。
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
hello world2、查看 hadoop102或者 hadoop103、hadoop104的/opt/module/kafka/datas/first-1first-0、first-2路径上的文件。
ls#00000000000000000092.index
#00000000000000000092.log
#00000000000000000092.snapshot
#00000000000000000092.timeindex
#leader-epoch-checkpoint
#partition.metadata3、直接查看 log 日志发现是乱码。
cat 00000000000000000092.log #\CYnF|©|©ÿÿÿÿÿÿÿÿÿÿÿÿÿÿhello world4、通过工具查看 index 和 log 信息。
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index #Dumping ./00000000000000000000.index
#offset: 3 position: 152kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log#输出如下内容
Dumping datas/first-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 1 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position:
0 CreateTime: 1636338440962 size: 75 magic: 2 compresscodec: none crc: 2745337109 isvalid:
true
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position:
75 CreateTime: 1636351749089 size: 77 magic: 2 compresscodec: none crc: 273943004 isvalid:
true
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position:
152 CreateTime: 1636351749119 size: 77 magic: 2 compresscodec: none crc: 106207379 isvalid:
true
baseOffset: 4 lastOffset: 8 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position:
229 CreateTime: 1636353061435 size: 141 magic: 2 compresscodec: none crc: 157376877 isvalid:
true
baseOffset: 9 lastOffset: 13 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position:
370 CreateTime: 1636353204051 size: 146 magic: 2 compresscodec: none crc: 4058582827 isvalid:
true4.3.index 文件和 log 文件详解 1、Kafka 并不会将每条数据都创建索引Kafka 的 index 为稀疏索引大约每往 segment 中写入 4kb 数据会往 index 文件写入一条索引写入的索引为此 segment 上的相对 offset参数log.index.interval.bytes。
2、如果此时想在文件中找到 offset 600 的数据那么会按照 index 区间进行查找类似于二分查找法。
3、找到相对 offset 对应的 position起始位置后继续向下检索直到定位到这条数据的具体位置。
注意每个 segment 中的 offset 为相对 offset这样能确保 offset 值所占用的空间不会过大。 5文件清理策略
Kafka 中默认的日志保存时间为 7 天可以通过调整如下参数修改保存时间。 log.retention.hours最低优先级小时默认 7 天。 log.retention.minutes分钟。 log.retention.ms最高优先级毫秒。 log.retention.check.interval.ms负责设置检查周期默认 5 分钟。
那么日志一旦超过了设置的时间怎么处理呢
Kafka 中提供的日志清理策略有 delete 和 compact 两种。
1、delete 日志删除将过期数据删除
log.cleanup.policy delete 所有数据启用删除策略
1基于时间默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
2基于大小默认关闭。超过设置的所有日志总大小删除最早的 segment最好不要开启。
log.retention.bytes默认等于-1表示无穷大。
【思考】如果一个 segment 中有一部分数据过期一部分没有过期怎么处理
【答】以 segment 中所有记录中的最大时间戳作为该文件时间戳进行删除。
2、compact 日志压缩
1compact日志压缩对于相同key的不同value值只保留最后一个版本。
log.cleanup.policy compact 所有数据启用压缩策略
2压缩后的 offset 可能是不连续的比如上图中没有 6当从这些 offset 消费消息时将会拿到比这个 offset 大的 offset 对应的消息实际上会拿到 offset 为 7 的消息并从这个位置开始消费。
注意这种策略只适合特殊场景比如消息的 key 是用户 IDvalue 是用户的资料通过这种压缩策略整个消息集里就保存了所有用户最新的资料数据更新或覆盖一般用的比较少。 6高效读写数据
1、Kafka 本身是分布式集群可以采用分区技术并行度高。
2、读数据采用稀疏索引可以快速定位要消费的数据。
3、顺序写磁盘。
Kafka 的 producer 生产数据要写入到 log 文件中写的过程是一直追加到文件末端为顺序写。官网有数据表明同样的磁盘顺序写能到 600M/s而随机写只有 100K/s。这与磁盘的机械机构有关顺序写之所以快是因为其省去了大量磁头寻址的时间。 4、页缓存 零拷贝技术
1非零拷贝工作流程
① Producer 发送数据到 Kafka 集群直接将数据发送给 Linux 系统内核。
② 内核是用来操作系统硬件的数据过来后不会直接进行落盘数据会存放于内核中的页缓存中至于什么时候落盘取决于 Linux 内核做决定。
③ 消费者消费数据先访问 Kafka 应用层然后访问页缓存接着访问网卡最后传输到 Consumer。
总结Kafka 重度依赖 Linux 系统内核当写数据时直接写入页缓存中当读数据时先到页缓存中读取如果找不到再从磁盘中获取。
2零拷贝工作流程Kafka 所用
① Producer 发送数据到 Kafka 集群直接将数据发送给 Linux 系统内核。
② 内核是用来操作系统硬件的数据过来后不会直接进行落盘数据会存放于内核中的页缓存中至于什么时候落盘取决于 Linux 内核做决定。
③ 消费者直接访问网卡将数据传输到 Consumer并不会走 Kafka 应用层。
总结Kafka Broker 应用层不关心对存储的数据的修改只需要把数据获取到下放到页缓存中即可所有对数据操作的地方都放在了 Producer 和 Consumer 的拦截器中。 7Kafka-Kraft 模式
Kafka-Kraft 模式 7.1.Kafka-Kraft 架构 左图为 Kafka 现有架构元数据在 zookeeper 中运行时动态选举 controller由controller 进行 Kafka 集群管理。右图为 kraft 模式架构实验性不再依赖 zookeeper 集群而是用三台 controller 节点代替 zookeeper元数据保存在 controller 中由 controller 直接进行 Kafka 集群管理。
这样做的好处有以下几个
Kafka 不再依赖外部框架而是能够独立运行controller 管理集群时不再需要从 zookeeper 中先读取数据集群性能上升由于不依赖 zookeeper集群扩展时不再受到 zookeeper 读写能力限制controller 不再动态选举而是由配置文件规定。这样我们可以有针对性的加强 controller 节点的配置而不是像以前一样对随机 controller 节点的高负载束手无策。
7.2.Kafka-Kraft 集群部署
1、再次解压一份 kafka 安装包
tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/2、重命名为 kafka2
mv kafka_2.12-3.0.0/ kafka23、在 hadoop102 上修改 /opt/module/kafka2/config/kraft/server.properties 配置文件
vim server.properties
#kafka 的角色controller 相当于主机、broker 节点相当于从机主机类似 zk 功
能
process.rolesbroker, controller
#节点 ID
node.id2
#controller 服务协议别名
controller.listener.namesCONTROLLER
#全 Controller 列表
controller.quorum.voters2hadoop102:9093,3hadoop103:9093,4hado
op104:9093
#不同服务器绑定的端口
listenersPLAINTEXT://:9092,CONTROLLER://:9093
#broker 服务协议别名
inter.broker.listener.namePLAINTEXT
#broker 对外暴露的地址
advertised.ListenersPLAINTEXT://hadoop102:9092
#协议别名到安全协议的映射
listener.security.protocol.mapCONTROLLER:PLAINTEXT,PLAINTEXT:PLA
INTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#kafka 数据存储目录
log.dirs/opt/module/kafka2/data4、分发 kafka2
xsync kafka2/在 hadoop103 和 hadoop104 上 需 要 对 node.id 相应改变值需要和 controller.quorum.voters 对应。在 hadoop103 和 hadoop104 上需要根据各自的主机名称修改相应的 advertised.Listeners 地址。
5、初始化集群数据目录
1首先生成存储目录唯一 ID。
bin/kafka-storage.sh random-uuidJ7s9e8PPTKOO47PxzI39VA2用该 ID 格式化 kafka 存储目录三台节点。
bin/kafka-storage.sh format -t J7s9e8PPTKOO47PxzI39VA -c /opt/module/kafka2/config/kraft/server.properties6、启动 kafka 集群
bin/kafka-server-start.sh -daemon config/kraft/server.properties7、停止 kafka 集群
bin/kafka-server-stop.sh7.3.Kafka-Kraft 集群启动停止脚本
1、在 /home/atguigu/bin 目录下创建文件 kf2.sh 脚本文件
vim kf2.sh脚本如下
#! /bin/bash
case $1 in
start){for i in hadoop102 hadoop103 hadoop104doecho --------启动 $i Kafka2-------ssh $i /opt/module/kafka2/bin/kafka-server-start.sh -
daemon /opt/module/kafka2/config/kraft/server.propertiesdone
};;
stop){for i in hadoop102 hadoop103 hadoop104doecho --------停止 $i Kafka2-------ssh $i /opt/module/kafka2/bin/kafka-server-stop.sh done
};;
esac2、添加执行权限
chmod x kf2.sh3、启动集群命令
kf2.sh start4、停止集群命令
kf2.sh stop