php代理ip访问网站,凡科网免费网站怎么样,网站怎么识别PC 手机,大区直播间网站开发制作1. ack级别
上文中我们提到过kafka是存在确认应答机制的#xff0c;也就是数据在发送到kafka的时候#xff0c;kafka会回复一个确认信息#xff0c;这个确认信息是存在等级的。 ack0 这个等级是最低的#xff0c;这个级别中数据sender线程复制完毕数据默认kafka已经接收到…1. ack级别
上文中我们提到过kafka是存在确认应答机制的也就是数据在发送到kafka的时候kafka会回复一个确认信息这个确认信息是存在等级的。 ack0 这个等级是最低的这个级别中数据sender线程复制完毕数据默认kafka已经接收到数据。 ack1 这个级别中sender线程复制完毕数据leader分区拿到数据放入到自己的存储并且返回确认信息 ack -1 这个级别比较重要sender线程复制完毕数据主分区接受完毕数据并且从分区都同步完毕数据然后在返回确认信息 那么以上的等级在使用的时候都会出现什么问题呢
ack 0 会丢失数据 ack0时在异步复制过程中leader可能会丢失leader分区和follower分区的数据。 ack1 ack1的时候leader虽然接收到数据存储到本地但是没有同步给follower节点这个时候主节点宕机从节点重新选举新的主节点主节点是不含有这个数据的数据会丢失.
ack -1 这个模式不会丢失数据但是如果leader接受完毕数据并且将数据同步给不同的follower从节点已经接受完毕但是还没有返回给sender线程ack的时候这个时候leader节点宕机了sender没有接收到这个ack它人为没有发送成功还会重新发送数据过来会造成数据重复。
一般前两种都适合在数据并不是特别重要的时候使用而最后一种效率会比较低下但是适用于可靠性比较高的场景使用
所以一般使用我们都会使用ack -1 retries N 联合在一起使用
那么我们如何能够保证数据的一致性呢
2. 幂等性
在kafka的0.10以后的版本中增加了新的特性幂等性主要就是为了解决kafka的ack -1的时候数据的重复问题设计的原理就是在kafka中增加一个事务编号。 数据在发送的时候在单个分区中的seq事物编号是递增的如果重复的在一个分区中多次插入编号一致的两个信息那么这个数据会被去重掉
在单个分区中序号递增也就是我们开启幂等性也只能保证单个分区的数据是可以去重的
整体代码如下
pro.put(ProducerConfig.RETRIES_CONFIG,3);
pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
设定retries 3 ,enable.idempotence true
幂等性开启的时候ack默认设定为-1。
幂等性的工作原理很简单每条消息都有一个「主键」这个主键由 PID, Partition, SeqNumber 组成他们分别是
PIDProducerID每个生产者启动时Kafka 都会给它分配一个 IDProducerID 是生产者的唯一标识需要注意的是Kafka 重启也会重新分配 PIDPartition消息需要发往的分区号SeqNumber生产者他会记录自己所发送的消息给他们分配一个自增的 ID这个 ID 就是 SeqNumber是该消息的唯一标识
对于主键相同的数据Kafka 是不会重复持久化的它只会接收一条但由于是原理的限制幂等性也只能保证单分区、单会话内的数据不重复如果 Kafka 挂掉重新给生产者分配了 PID还是有可能产生重复的数据这就需要另一个特性来保证了 ——Kafka 事务。
3. kafka的事务
Kafka 事务基于幂等性实现通过事务机制Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入即处于同一个事务内的所有消息最终结果是要么全部写成功要么全部写失败。 Kafka 事务分为生产者事务和消费者事务但它们并不是强绑定的关系消费者主要依赖自身对事务进行控制因此这里我们主要讨论的是生产者事务。 3.1 如何开启事务
创建一个 Producer指定一个事务 ID
Properties properties new Properties();properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设置事务ID必须
properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactional_id_1);
//创建生产者
KafkaProducerString, String producer new KafkaProducer(properties);
使用事务发送消息
// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();//发送10条消息往kafka假如中间有异常所有消息都会发送失败
try {for (int i 0; i 10; i) {producer.send(new ProducerRecord(topic-test, a message i));}
}
// 提交事务
producer.commitTransaction();
} catch (Exception e) {// 终止事务producer.abortTransaction();
} finally {producer.close();
}
3.2 事务工作原理 1启动生产者分配协调器
在使用事务的时候必须给生产者指定一个事务 ID生产者启动时Kafka 会根据事务 ID 来分配一个事务协调器Transaction Coordinator 。每个 Broker 都有一个事务协调器负责分配 PIDProducer ID 和管理事务。
事务协调器的分配涉及到一个特殊的主题 __transaction_state该主题默认有 50 个分区每个分区负责一部分事务Kafka 根据事务ID的hashcode值%50 计算出该事务属于哪个分区 该分区 Leader 所在 Broker 的事务协调器就会被分配给该生产者。
分配完事务协调器后该事务协调器会给生产者分配一个 PID接下来生产者就可以准备发送消息了。
2发送消息
生产者分配到 PID 后要先告诉事务协调器要把消息发往哪些分区协调器会做一个记录然后生产者就可以开始发送消息了这些消息与普通的消息不同它们带着一个字段标识自己是事务消息。
当生产者事务内的消息发送完毕会向事务协调器发送 Commit 或 Abort 请求此时生产者的工作已经做完了它只需要等待 Kafka 的响应。
3确认事务
当生产者开始发送消息时协调器判定事务开始。它会将开始的信息持久化到主题 __transaction_state 中。
当生产者发送完事务内的消息或者遇到异常发送失败协调器会收到 Commit 或 Abort 请求接着事务协调器会跟所有主题通信告诉它们事务是成功还是失败的。
如果是成功主题会汇报自己已经收到消息协调者收到所有主题的回应便确认了事务完成并持久化这一结果。
如果是失败的主题会把这个事务内的消息丢弃并汇报给协调者协调者收到所有结果后再持久化这一信息事务结束整个放弃事务的过程消费者是无感知的它并不会收到这些数据。
事物不仅可以保证多个数据整体成功失败还可以保证数据丢失后恢复。
3.3 代码实现
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class ProducerWithTransaction {public static void main(String[] args) {Properties pro new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop106:9092);pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,transaciton_test);KafkaProducerString, String producer new KafkaProducerString, String(pro);ProducerRecordString, String record new ProducerRecord(topic_a, this is hainiu);producer.initTransactions();producer.beginTransaction();try{for(int i0;i5;i){producer.send(record);}
// int a 1/0;producer.commitTransaction();}catch (Exception e){producer.abortTransaction();}finally {producer.close();}}
}
4. 一致性语义
在大数据场景中存在三种时间语义分别为
At Least Once 至少一次数据至少一次可能会重复
At Most Once 至多一次数据至多一次可能会丢失
Exactly Once 精准一次有且只有一次准确的消息传输
那么针对于以上我们学习了ack已经幂等性以及事务。
所以我们做以下分析 如果设定ack 0 或者是 1 出现的语义就是At Most Once 会丢失数据 如果设定ack - 1 会出现At Least Once 数据的重复 在ack -1的基础上开启幂等性会解决掉数据重复问题但是不能保证一个批次的数据整体一致所以还要开启事务才可以。 5. 参数调节
参数调节buffer.memoryrecord accumulator的大小适当增加可以保证producer的速度默认32Mbatch-size异步线程拉取的批次大小适当增加可以提高效率但是会增加延迟性linger.ms异步线程等待时长一般根据生产效率而定不建议太大增加延迟效果acks确认应答一般设定为-1保证数据不丢失enable.idempotence开启幂等性保证数据去重实现exactly once语义retries增加重试次数保证数据的稳定性compression.type增加producer端的压缩max.in.flight.requests.per.connectionsender线程异步复制数据的阻塞次数当没收到kafka的ack之前可以最多发送五个写入请求调节这个参数可以保证数据的有序性
全部代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class ProducerWithMultiConfig {public static void main(String[] args) throws InterruptedException {Properties pro new Properties();pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop106:9092);pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);pro.put(ProducerConfig.LINGER_MS_CONFIG, 100);pro.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024*1024*64);pro.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);pro.put(ProducerConfig.RETRIES_CONFIG, 3);pro.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, snappy);pro.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);KafkaProducerString, String producer new KafkaProducerString, String(pro);ProducerRecordString, String record new ProducerRecord(topic_a, this is hainiu);producer.send(record);producer.close();}
}
其中max.in.flight.requests.per.connection参数设定后可以增加producer的阻塞大小
在未开启幂等性的时候这个值设定为1可以保证单个批次的数据有序在分区内部有序
如果开启了幂等性可以设定最大值不超过5可以保证五个request请求单个分区内有序
因为没有开启幂等性的时候如果第一个请求失败第二个请求重新发送的时候需要二次排序
要是开启幂等性了会保留原来的顺序性不需要重新排序
总而言之kafka可以保证单分区有序但是整体是无序的