做药物分析常用网站,网站婚礼服务态网站建设论文,辽宁城乡建设部网站首页,网站建设如何报价文章目录 Kafka的基本概念什么是ISRISR的维护机制ISR的作用ISR相关配置参数同步过程示例代码总结 Kafka中的ISR#xff08;In-Sync Replicas同步副本#xff09;机制是确保数据高可用性和一致性的核心组件。 Kafka的基本概念
在Kafka中#xff0c;数据被组织成主题#xf… 文章目录 Kafka的基本概念什么是ISRISR的维护机制ISR的作用ISR相关配置参数同步过程示例代码总结 Kafka中的ISRIn-Sync Replicas同步副本机制是确保数据高可用性和一致性的核心组件。 Kafka的基本概念
在Kafka中数据被组织成主题Topic每个主题分为多个分区Partition。每个分区有多个副本Replica这些副本分布在不同的Broker上以确保数据的冗余和高可用性。
Leader Replica每个分区有一个领导副本负责处理所有读写请求。Follower Replica其他副本作为追随者从领导副本中复制数据。
什么是ISR
ISRIn-Sync Replicas是一个分区副本集合这些副本被认为是与领导副本保持同步的。具体来说ISR中的副本是那些能够在一定时间内由参数replica.lag.time.max.ms指定将数据同步到与领导副本相同位置的副本。
Kafka根据副本同步的情况分成了3个集合:
AR (Assigned Replicas) : 包括ISR和OSRISR (In-sync Replicas) : 和leader副本保持同步的副本集合可以被认为是可靠的数据OSR (Out-Sync Replicas) :和Leader副本同步失效的副本集合
ARISROSR。
ISR的维护机制
领导副本更新ISR 领导副本会定期检查每个追随者副本的状态。如果某个追随者副本在指定时间内未能跟上领导副本的更新领导副本会将其从ISR中移除。追随者副本重新加入ISR 当追随者副本追上了领导副本的日志进度即达到了与领导副本相同的日志偏移量领导副本会将其重新加入ISR。
ISR的作用
数据一致性保证 生产者在写入数据时可以通过设置acks参数来控制数据的一致性级别。设置acksall或acks-1时领导副本会等待所有ISR中的副本都确认收到数据后才向生产者发送确认。这保证了数据在写入时至少被写入到ISR中的所有副本。故障容错能力 如果领导副本发生故障Kafka会从ISR中选取一个新的领导副本。由于ISR中的副本与之前的领导副本保持同步新的领导副本能够继续提供服务而不会丢失数据。
ISR相关配置参数
replica.lag.time.max.ms追随者副本与领导副本之间的最大允许同步延迟时间。如果追随者副本超过此时间没有同步到领导副本会被移出ISR。
min.insync.replicas最少同步副本数。生产者在设置acksall时只有当ISR中的副本数不少于这个值才会确认消息的写入。这个参数用于在保证数据可用性的同时控制生产者的写入成功率。
同步过程
a. 数据写入 客户端将数据写入主副本。 主副本将数据写入本地日志并确认写入。 主副本异步将数据推送给所有的从副本。b. 副本同步 从副本收到主副本的数据后将其写入本地日志并返回确认。 当从副本确认写入数据后主副本会更新其 ISRIn-Sync Replicas即同步副本集合表示这些副本已经同步到最新的数据。
示例代码
以下是一个简单的生产者示例代码展示了如何使用acks参数来确保数据写入的高可用性
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.ACKS_CONFIG, all); // 确保数据被所有ISR副本确认KafkaProducerString, String producer new KafkaProducer(props);try {for (int i 0; i 10; i) {ProducerRecordString, String record new ProducerRecord(my-topic, key i, value i);producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception null) {System.out.println(Message sent successfully to partition metadata.partition() with offset metadata.offset());} else {exception.printStackTrace();}}});}} finally {producer.close();}}
}
总结
Kafka的ISR机制通过维护一个与领导副本同步的副本集合确保了数据的一致性和高可用性。通过合理配置和使用ISR机制Kafka能够在面对节点故障时仍然保证数据的安全和系统的稳定。