做淘宝链接模板网站,网站怎么做用qq登录接入,手机app软件制作平台,河南网站建设公司价格为了解决Kafka传输数据时#xff0c;所产生的数据重复和乱序问题#xff0c;Kafka引入了幂等性操作#xff0c;所谓的幂等性#xff0c;就是Producer同样的一条数据#xff0c;无论向Kafka发送多少次#xff0c;kafka都只会存储一条。注意#xff0c;这里的同样的一条数…为了解决Kafka传输数据时所产生的数据重复和乱序问题Kafka引入了幂等性操作所谓的幂等性就是Producer同样的一条数据无论向Kafka发送多少次kafka都只会存储一条。注意这里的同样的一条数据指的不是内容一致的数据而是指的不断重试的数据。
默认幂等性是不起作用的所以如果想要使用幂等性操作只需要在生产者对象的配置中开启幂等性配置即可。
配置项配置值说明enable.idempotencetrue开启幂等性max.in.flight.requests.per.connection小于等于5每个连接的在途请求数不能大于5取值范围为[1,5]acksall(-1)确认应答固定值不能修改retries0重试次数推荐使用Int最大值
【1】kafka实现幂等性的流程
① 数据增加唯一性标识
开启幂等性后为了保证数据不会重复那么就需要给每一个请求批次的数据增加唯一性标识。kafka中这个标识采用的是连续的序列号数字sequencenum。但是不同的生产者Producer可能序列号是一样的仅仅靠seqnum还无法唯一标记数据所以还需要同时对生产者进行区分。
Kafka采用申请生产者IDproducerid的方式对生产者进行区分。在发送数据前我们就需要提前申请producerid以及序列号sequencenum。 ② 记录生产者的生产状态
Broker中会给每一个分区记录生产者的生产状态采用队列的方式缓存最近的5个批次数据。队列中的数据按照seqnum进行升序排列。这里的数字5是经过压力测试均衡空间效率和时间效率所得到的值所以为固定值无法配置且不能修改。 ③ 判重
判断Borker当前新的请求批次数据在缓存的5个旧的批次中是否存在相同的如果有相同的那么说明有重复当前批次数据不做任何处理。 ④ 判断序列号是否连续
如果Broker当前的请求批次数据在缓存中没有相同的那么判断当前新的请求批次的序列号是否为缓存的最后一个批次的序列号加1
如果是说明是连续的顺序没乱那么继续。如果不是那么说明数据已经乱了发生异常。 ⑤ 重试
Broker根据异常返回响应通知Producer进行重试。Producer重试前需要在缓冲区中将数据重新排序保证正确的顺序后再进行重试即可。
⑥ 更新数据
如果请求批次不重复且有序那么更新缓冲区中的批次数据。将当前的批次放置再队列的结尾将队列的第一个移除保证队列中缓冲的数据最多5个。 ⑦ 缺陷
从上面的流程可以看出Kafka的幂等性是通过消耗时间和性能的方式提升了数据传输的有序和去重在一些对数据敏感的业务中是十分重要的。但是这种幂等性还是有缺陷的
幂等性的producer仅做到单分区上的幂等性即单分区消息有序不重复多分区无法保证幂等性。只能保持生产者单个会话的幂等性无法实现跨会话的幂等性也就是说如果一个producer挂掉再重启那么重启前和重启后的producer对象会被当成两个独立的生产者从而获取两个不同的独立的生产者ID导致broker端无法获取之前的状态信息所以无法实现跨会话的幂等。要想解决这个问题可以采用后续的事务功能。
【2】跨会话的幂等性
对于幂等性的缺陷kafka可以采用事务的方式解决跨会话的幂等性。基本的原理就是通过事务功能管理生产者ID保证事务开启后生产者对象总能获取一致的生产者ID。
为了实现事务Kafka引入了事务协调器TransactionCoodinator负责事务的处理所有的事务逻辑包括分派PID等都是由TransactionCoodinator负责实施的。TransactionCoodinator 会将事务状态持久化到该主题中。
事务基本的实现思路就是通过配置的事务ID将生产者ID进行绑定然后存储在Kafka专门管理事务的内部主题 __transaction_state中而内部主题的操作是由事务协调器TransactionCoodinator对象完成的这个协调器对象有点类似于咱们数据发送时的那个副本Leader。
其实这种设计是很巧妙的因为kafka将事务ID和生产者ID看成了消息数据然后将数据发送到一个内部主题中。这样使用事务处理的流程和咱们自己发送数据的流程是很像的。
接下来我们就把这两个流程简单做一个对比。
① 普通数据发生流程 ② 事务数据发送流程 通过两张图可以看到基本的事务操作和数据操作是很像的。不过要注意我们这里只是简单对比了数据发送的过程其实它们的区别还在于数据发送后的提交过程。普通的数据操作只要数据写入了日志那么对于消费者来讲。数据就可以读取到了但是事务操作中如果数据写入了日志但是没有提交的话其实数据默认情况下也是不能被消费者看到的。只有提交后才能看见数据。
更为详细的可以参考下图