呼伦贝尔旅游包车网站咋做,什么是网站定位,网站推广目标关键词是什么意思,韩国做美食网站文章目录 高性能消息中间件 - Kafka3.x#xff08;三#xff09;Kafka Broker ⭐Kafka Broker概念Zookeeper#xff08;新版本可以不使用zk了#xff09;⭐Zookeeper的作用 Kafka的选举1#xff1a;Broker选举Leader⭐Broker核心参数⭐案例#xff1a;服役新节点和退役旧… 文章目录 高性能消息中间件 - Kafka3.x三Kafka Broker ⭐Kafka Broker概念Zookeeper新版本可以不使用zk了⭐Zookeeper的作用 Kafka的选举1Broker选举Leader⭐Broker核心参数⭐案例服役新节点和退役旧节点重要⭐服役新节点⭐退役旧节点⭐ Kafka的副本⭐副本的基本概念⭐Kafka的选举2副本选举Leader⭐手动调整分区副本⭐分区自动再平衡机制增加kafka的副本⭐ Kafka的存储基本概念文件清理策略 Kafka高效读取数据⭐1顺序写磁盘2页缓存与零拷贝 Kafka消费者⭐Kafka消费者的消费模式消费者组⭐Kafka的选举3消费者组选举Leader⭐Java Api消费者的重要参数⭐Java Api操作消费者⭐手动提交Offset⭐ 生产调优5消息积压⭐ 高性能消息中间件 - Kafka3.x三
Kafka Broker ⭐
Kafka Broker概念
kafka broker说白了就是kafka服务器。我们在生产环境下通常要搭建kafka broker集群。partition分区和replica副本的区别 前提条件假设我们的kafka集群数量broker为3。也就是说我们集群只有3台kafka。 此时Partition分区可以指定的数量是无限制的也就是说可以指定10、100。但是replica副本数最多只能指定为33、2、1否则就会报错。因为我们的kafka broker数量就是3replica不能超过这个值
大致图如下partition和replica和kafka broker的关系图 Zookeeper新版本可以不使用zk了⭐
Zookeeper的作用
zookeeper的作用有 1保存kafka元数据。2保存broker注册信息。有哪些broker可用3记录每一个分区副本有哪些并且每一个分区副本的leader是谁4辅助选举leader。5保存主题、分区信息等等。
Kafka的选举1Broker选举Leader⭐
broker选举leader规则 每一个broker都有一个唯一的broker idbroker在启动后会去zookeeper的controller节点中竞争注册哪个broker先注册那这个broker就是leader。
Broker核心参数⭐
参数描述replica.lag.time.max.msISR 中如果 Follower 长时间未向 Leader 发送通信请求或同步数据则该 Follower 将被踢出 ISR并移动到OSR。该时间阈值默认 30sauto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡再平衡。leader.imbalance.per.broker.percentage**默认是 10%。**每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值控制器会触发 leader的平衡。leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间。log.segment.bytesKafka 中 log 日志是分成一块块存储的此配置是指 log日志划分 成块的大小默认值 1G。log.index.interval.bytes默认 4kbkafka 里面每当写入了 4kb 大小的日志.log然后就往 index文件里面记录一个索引log.retention.hoursKafka 中数据保存的时间默认 7 天。log.retention.minutesKafka 中数据保存的时间分钟级别默认关闭。log.retention.msKafka 中数据保存的时间毫秒级别默认关闭。log.retention.check.interval.ms检查数据是否保存超时的间隔默认是 5 分钟。log.retention.bytes默认等于-1表示无穷大。超过设置的所有日志总大小删除最早的 segment。log.cleanup.policy默认是 delete表示所有数据启用删除策略如果设置值为 compact表示所有数据启用压缩策略。num.io.threads默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。num.network.threads默认是 3。数据传输线程数这个参数占总核数的50%的 2/3 。num.replica.fetchers副本拉取线程数这个参数占总核数的 50%的 1/3
案例服役新节点和退役旧节点重要⭐
服役新节点⭐ 先只启动kafka01和kafka02机器然后创建topic副本数设置为2。然后再启动kafka03机器然后让这个kafka03节点服役到集群中。 1只启动kafka01和kafka02机器并创建topic
kafka-topics.sh --bootstrap-server192.168.184.201:9092 --topicmytopic001 --partitions3 --replication-factor2 --create2查询mytopic001详情发现副本只存在于kafka01和kafka02节点上
kafka-topics.sh --bootstrap-server192.168.184.201:9092 --topicmytopic001 --describe3启动kafka03机器的zk和kafka再次查询mytopic001详情发现还是一样副本只存在于kafka01和kafka02节点上而不存在于kafka03节点上
kafka-topics.sh --bootstrap-server192.168.184.201:9092 --topicmytopic001 --describe4开始服役新节点了
vim topics-to-move.json内容如下topics下面的topic设置为想要服役的topic主题名称其他不变
{topics: [{topic: mytopic001}],version: 1
}5生成新的负载均衡计划并复制下面如图所示的计划 –bootstrap-serverkafka集群地址–topics-to-move-json-file刚刚创建的json文件名–broker-list假如服役节点后的broker编号列表3为kafka03的broker编号
kafka-reassign-partitions.sh --bootstrap-server kafka01:9092 --topics-to-move-json-file topics-to-move.json --broker-list 1,2,3 --generate6创建副本存储计划
vim increase-replication-factor.json内容如下就是刚刚复制的新的负载均衡计划
{version:1,partitions:[{topic:mytopic001,partition:0,replicas:[3,2],log_dirs:[any,any]},{topic:mytopic001,partition:1,replicas:[1,3],log_dirs:[any,any]},{topic:mytopic001,partition:2,replicas:[2,1],log_dirs:[any,any]}]}7执行副本计划–reassignment-json-file负载均衡计划文件
kafka-reassign-partitions.sh --bootstrap-server kafka01:9092 --reassignment-json-file increase-replication-factor.json --execute8再次查看mytopic001详情
kafka-topics.sh --bootstrap-server192.168.184.201:9092 --topicmytopic001 --describe退役旧节点⭐
1经过上面的服役新节点我们可以看到副本被分配到1、2、3号机器上了。下面我们要把kafka03节点删去。
kafka-topics.sh --bootstrap-server192.168.184.201:9092 --topicmytopic001 --describe2开始退役节点了
vim topics-to-move.json内容如下topics下面的topic设置为想要退役的topic主题名称其他不变
{topics: [{topic: mytopic001}],version: 1
}3生成新的负载均衡计划并复制下面如图所示的计划 –bootstrap-serverkafka集群地址–topics-to-move-json-file刚刚创建的json文件名–broker-list假如服役节点后的broker编号列表1、2代表着kafka03节点要退役了
kafka-reassign-partitions.sh --bootstrap-server kafka01:9092 --topics-to-move-json-file topics-to-move.json --broker-list 1,2 --generate6创建副本存储计划
vim decr-replication-factor.json内容如下就是刚刚复制的新的负载均衡计划
{version:1,partitions:[{topic:mytopic001,partition:0,replicas:[1,2],log_dirs:[any,any]},{topic:mytopic001,partition:1,replicas:[2,1],log_dirs:[any,any]},{topic:mytopic001,partition:2,replicas:[1,2],log_dirs:[any,any]}]}7执行副本计划–reassignment-json-file负载均衡计划文件
kafka-reassign-partitions.sh --bootstrap-server kafka01:9092 --reassignment-json-file decr-replication-factor.json --execute8再次查看mytopic001详情
kafka-topics.sh --bootstrap-server192.168.184.201:9092 --topicmytopic001 --describeKafka的副本⭐
副本的基本概念⭐ replica副本为了防止集群中机器故障导致数据丢失所以副本出现了。kafka的每一个topic在每一个分区都可以有多个replica副本副本分为leader副本和follower副本。 leader副本生产者和消费者只于leader副本交互。每一个topic中的每一个分区都有一个leader副本。follower副本负责实时从leader副本同步数据当leader副本故障了则会重新选举使follower副本变成leader副本。 AR:分区中的所有 Replica 统称为 AR ISR OSR ISR:所有与 Leader 副本保持一定程度同步的Replica(包括 Leader 副本在内)组成 ISR OSR:与 Leader 副本同步滞后过多的 Replica 组成了 OSR
Kafka的选举2副本选举Leader⭐ 选举规则以ISR队列存在为前提选举AR队列中的第一个副本为leader副本。 如果leader副本故障下线则也会以ISR队列存在为前提选举AR队列第一个为leader第一个不行则第二个一直轮询直到选出leader。
手动调整分区副本⭐
1创建一个新的topic指定为3个分区1个副本
kafka-topics.sh --bootstrap-server kafka01:9092 --create --partitions 3 --replication-factor 1 --topic mytopic0022查看mytopic002详情
kafka-topics.sh --bootstrap-server192.168.184.201:9092 --topicmytopic002 --describe3创建副本存储计划
vim increase-replication-factor.json内容如下修改topic和replicas
{
version:1,
partitions:[{topic:mytopic002,partition:0,replicas:[1,2]},
{topic:mytopic002,partition:1,replicas:[1,2]},
{topic:mytopic002,partition:2,replicas:[1,2]} ]}4执行副本存储计划
kafka-reassign-partitions.sh --bootstrap-server kafka01:9092 --reassignment-json-file increase-replication-factor.json --execute5再次查看mytopic002详情
kafka-topics.sh --bootstrap-server192.168.184.201:9092 --topicmytopic002 --describe分区自动再平衡机制
一般情况下我们的分区都是平衡散落在broker的随着一些broker故障会慢慢出现leader集中在某台broker上的情况造成集群负载不均衡这时候就需要分区平衡。
为了解决上述问题kafka出现了自动平衡的机制。kafka提供了下面几个参数进行控制 auto.leader.rebalance.enable自动leader parition平衡默认是true;leader.imbalance.per.broker.percentage每个broker允许的不平衡的leader的比率默认是10%如果超过这个值控制器将会触发leader的平衡leader.imbalance.check.interval.seconds检查leader负载是否平衡的时间间隔默认是300秒但是在生产环境中是不开启这个自动平衡因为触发leader partition的自动平衡会损耗性能或者可以将触发自动平和的参数leader.imbalance.per.broker.percentage的值调大点。
增加kafka的副本⭐
1创建一个新的topic指定为3个分区1个副本
kafka-topics.sh --bootstrap-server kafka01:9092 --create --partitions 3 --replication-factor 1 --topic mytopic0022查看mytopic002详情
kafka-topics.sh --bootstrap-server192.168.184.201:9092 --topicmytopic002 --describe3创建副本存储计划
vim increase-replication-factor.json内容如下修改topic和replicas
{
version:1,
partitions:[{topic:mytopic002,partition:0,replicas:[1,2]},
{topic:mytopic002,partition:1,replicas:[1,2]},
{topic:mytopic002,partition:2,replicas:[1,2]} ]}4执行副本存储计划
kafka-reassign-partitions.sh --bootstrap-server kafka01:9092 --reassignment-json-file increase-replication-factor.json --execute5再次查看mytopic002详情
kafka-topics.sh --bootstrap-server192.168.184.201:9092 --topicmytopic002 --describeKafka的存储
基本概念 每一个partition分区都对应着一个log文件。Producer生产的数据会被不断追加到该log文件末端 kafka为了提高效率把每一个log文件拆分成多个segment每个segment大小默认是1G可以通过log.segment.bytes去修改。 该文件命名规则为topic名称分区号 当log文件写入4kb大小数据这里可以通过log.index.interval.bytes设置就会写入一条索引信息到index文件中这样的index索引文件就是一个稀疏索引它并不会每条日志都建立索引信息。
文件清理策略 日志清理策略有两个 日志删除(delete) :按照一定的保留策略直接删除不符合条件的日志分段。日志压缩(compact) :针对每个消息的key进行整合对于有相同key的不同value值只保留最后一个版本。可以通过修改broker端参数 log.cleanup.policy 来进行配置 kafka中默认的日志保存时间为7天可以通过调整如下参数修改保存时间。 log.retention.hours最低优先级小时默认7天log.retention.minutes分钟log.retention.ms最高优先级毫秒log.retention.check.interval.ms负责设置检查周期默认5分钟file.delete.delay.ms延迟执行删除时间log.retention.bytes当设置为-1时表示运行保留日志最大值相当于关闭当设置为1G时表示日志文件最大值
Kafka高效读取数据⭐
kafka之所以读写性能高是因为 1kafka是一个分布式的中间件采用了多分区架构、以及消费者集群消费者组有利于抗住高并发、大流量场景。2kafka底层存储采用稀疏索引每写入4kb数据才会加上1条索引不仅能够节约索引占用的空间也可以让我们快速定位数据。3顺序写磁盘4采用页缓存和零拷贝
1顺序写磁盘
kafka底层写入log日志数据采用的就是末尾追加的方式写入数据这种就是顺序写磁盘顺序写磁盘的效率比随机写磁盘的效率高了很多。
2页缓存与零拷贝 kafka大量使用了页缓存PageCache分为以下两个场景 1读操作当我们要读取数据的时候首先会去页缓存里面找有没有该数据如果有则返回如果没有则会去磁盘中寻找并把数据存到页缓存中。2写操作首先会判断如果页缓存没有该条数据则会把数据写入页缓存。如果有这条数据则修改这个页缓存数据此时这个页缓存中的数据页就是脏页操作系统会在特定时间把脏页写入磁盘保证了数据一致性。 零拷贝并不是不需要拷贝而是减少不必要的拷贝次数。 当数据从磁盘经过DMA 拷贝到内核缓存页缓存后为了减少CPU拷贝的性能损耗操作系统会将该内核缓存与用户层进行共享减少一次CPU copy过程同时用户层的读写也会直接访问该共享存储本身由用户层到Socket缓存的数据拷贝过程也变成了从内核到内核的CPU拷贝过程更加的快速这就是零拷贝。
Kafka消费者⭐
Kafka消费者的消费模式
1poll拉消费者主动从kafka broker中拉取数据。kafka默认采用2push推kafka broker主动推送数据给消费者。
消费者组⭐ 一个相同消费者组的消费者它们的groupid是相同的。 一个分区只能由同一个消费者组的一个消费者所消费。 消费者组之间互不影响。
Kafka的选举3消费者组选举Leader⭐ 首先要选择出coordinator如下 groupid的hashcode值%50-consumer_offsets的分区数。比如groupid的hashcode值为2则2%502也就是说coordinator的2号分区在哪个kafka broker上就选择哪个节点的coordinator作为这个消费者组的boss以后这个消费者组提交的所有offsets就往这个分区提交。 后面由coordinator负责选出消费组中的Leader
Java Api消费者的重要参数⭐
参数描述bootstrap.serversKafka集群地址列表。key.deserializer 和value.deserialize反序列化类型。要写全类名group.id消费者组id。标记消费者所属的消费者组enable.auto.commit默认值为 true消费者会自动周期性地向服务器提交偏移量auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true 则该值定义了消费者偏移量向 Kafka提交的频率默认 5s。auto.offset.resetearliest自动重置偏移量到最早的偏移量。 latest默认自动重置偏移量为最新的偏移量。 none如果消费组原来的previous偏移量不存在则向消费者抛异常。 anything向消费者抛异常offsets.topic.num.partitions__consumer_offsets 的分区数默认是 50 个分区。heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间默认 3s。该条目的值必须小于 session.timeout.ms 也不应该高于session.timeout.ms 的 1/3session.timeout.msKafka 消费者和 coordinator 之间连接超时时间默认 45s。超过该值该消费者被移除消费者组执行再平衡。max.poll.interval.ms消费者处理消息的最大时长默认是 5 分钟。超过该值该消费者被移除消费者组执行再平衡。fetch.min.bytes默认 1 个字节。消费者获取服务器端一批消息最小的字节数。fetch.max.wait.ms默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到仍然会返回数据。fetch.max.bytes默认 Default: 5242880050 m。消费者获取服务器端一批消息最大的字节数。max.poll.records一次 poll拉取数据返回消息的最大条数默认是 500 条
Java Api操作消费者⭐
注意指定消费者组id。注意指定my-consumer-group01会出现无反应现象。最好别加数字。例如my-consumer-group 这个就可以运行
package com.kafka02.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;public class MyConsumer {public static void main(String[] args) {Properties properties new Properties();//1基本配置properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.184.201:9092);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//2指定消费者组id。注意指定my-consumer-group01会出现无反应现象。最好别加数字。my-consumer-groupproperties.put(ConsumerConfig.GROUP_ID_CONFIG,my-consumer-group);KafkaConsumerString, String kafkaConsumer new KafkaConsumerString, String(properties);// 存放需要消费的topic名称SetString topicSetnew HashSet();topicSet.add(java-api-test);// 订阅这个topic集合kafkaConsumer.subscribe(topicSet);System.out.println(等待拉取数据------);//循环消费消息while (true){//kafka消费者的消费模式就是poll。指定1s拉取一次数据ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));//输出消息consumerRecords.forEach(record - {System.out.println(--);System.out.println(record);});}}}手动提交Offset⭐
手动提交offset有两个方法二选一 commitSync同步提交必须等待offset提交完毕再去消费下一批数据。 阻塞线程一直到提交到成功会进行失败重试commitAsync异步提交 发送完提交offset请求后就开始消费下一批数据了。没有失败重试机制会提交失败
package com.kafka02.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;public class MyConsumerCommit {public static void main(String[] args) {Properties properties new Properties();//1基本配置properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.184.201:9092);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//2指定消费者组id。注意指定my-consumer-group01会出现无反应现象。最好别加数字。my-consumer-groupproperties.put(ConsumerConfig.GROUP_ID_CONFIG,my-consumer-group);//3:关闭自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);KafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 存放需要消费的topic名称SetString topicSetnew HashSet();topicSet.add(java-api-test);// 订阅这个topic集合kafkaConsumer.subscribe(topicSet);System.out.println(等待拉取数据------);//循环消费消息while (true){//kafka消费者的消费模式就是poll。指定1s拉取一次数据ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));//消费消息consumerRecords.forEach(record - {System.out.println(--);System.out.println(record);});//消费结束后手动提交offsetkafkaConsumer.commitSync(); //同步提交}}
}生产调优5消息积压⭐ 解决方案1可以增加partition分区数和增加消费者组中的消费者数量使partition分区数消费者组中的消费者数量。 解决方案2提高每批次拉取消息的数量max.poll.records默认是500条可以提高到1000条。