关于网站得精神文明建设,包装设计十大网站,wordpress 分享 可见,如何建立网站或网页目录 生产者ack机制消费者ack模式手动提交ACK 生产者ack机制
Kafka 生产者的 ACK 机制指的是生产者在发送消息后#xff0c;对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本#xff0c;并在需要时获取确认信息。
Kafka 提供了三种… 目录 生产者ack机制消费者ack模式手动提交ACK 生产者ack机制
Kafka 生产者的 ACK 机制指的是生产者在发送消息后对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本并在需要时获取确认信息。
Kafka 提供了三种 ACK 机制的配置选项分别是 acks0生产者在成功将消息发送到网络缓冲区后即视为消息已被提交不等待任何服务器响应。这种配置下可能会出现消息丢失的情况。 acks1生产者在成功将消息发送到主题的分区 leader 后即视为消息已被提交。这种配置下生产者会收到分区 leader 的确认但仍有可能出现消息丢失的情况例如当 leader 出现故障而消息尚未复制到其他副本时。 acksall 或acks-1生产者需要等待所有分区副本都成功写入消息后才视为消息已被提交。这种配置下生产者会等待所有分区副本的确认确保消息被复制到足够数量的副本后才返回提交确认。这是最安全的确认方式但也会导致较长的等待时间。
在实际使用中根据对消息可靠性和延迟的要求可以选择不同的 ACKs 级别。一般来说如果对消息的可靠性要求较高可以选择较高的 ACKs 级别但需要考虑相应的延迟成本。
我们可以通过spring.kafka.producer.acks来配置ack机制
spring.kafka.producer.acks1消费者ack模式
kafka支持的消费模式在AbstractMessageListenerContainer.AckMode的枚举中下面就介绍下各个模式的区别
public enum AckMode {/*** Commit after each record is processed by the listener.*/RECORD,/*** Commit whatever has already been processed before the next poll.*/BATCH,/*** Commit pending updates after* {link ContainerProperties#setAckTime(long) ackTime} has elapsed.*/TIME,/*** Commit pending updates after* {link ContainerProperties#setAckCount(int) ackCount} has been* exceeded.*/COUNT,/*** Commit pending updates after* {link ContainerProperties#setAckCount(int) ackCount} has been* exceeded or after {link ContainerProperties#setAckTime(long)* ackTime} has elapsed.*/COUNT_TIME,/*** User takes responsibility for acks using an* {link AcknowledgingMessageListener}.*/MANUAL,/*** User takes responsibility for acks using an* {link AcknowledgingMessageListener}. The consumer* immediately processes the commit.*/MANUAL_IMMEDIATE,}
AckMode模式
RECORD当每一条记录被消费者监听器ListenerConsumer处理之后提交 当使用 RECORD 确认模式时消息监听容器会在每个消息被单独处理后进行确认。这意味着如果一条消息被成功处理它将作为单独的记录进行确认如果处理失败也会针对该消息进行错误记录。这种确认模式适用于需要精确处理每个消息的应用场景例如确保每个消息都被正确处理。
BATCH当每一批poll()的数据被消费者监听器ListenerConsumer处理之后提交 当使用 BATCH 确认模式时消息监听容器会在批量处理一组消息后进行确认。这意味着消息监听容器会将多个消息合并为批次并将它们作为一组进行处理。只有在整个批次都被成功处理后该批次的所有消息才会被确认。这种确认模式适用于需要提高处理效率的场景例如批量处理大量消息以减少网络传输和系统调用的开销。
TIME当每一批poll()的数据被消费者监听器ListenerConsumer处理之后距离上次提交时间大于TIME时提交
COUNT当每一批poll()的数据被消费者监听器ListenerConsumer处理之后被处理record数量大于等于COUNT时提交
COUNT_TIMETIME或COUNT 有一个条件满足时提交
MANUAL这是手动确认模式消费者需要显式地调用 Acknowledgment.acknowledge() 方法来确认消息。只有当消费者调用 acknowledge() 方法后才会向 Kafka 服务器发送确认消息。这种模式可以保证消息的可靠性和顺序性但需要消费者显式地处理确认逻辑。
MANUAL_IMMEDIATE这是立即手动确认模式与 MANUAL 模式类似但消费者在调用 acknowledge() 方法时会立即向 Kafka 服务器发送确认消息。这种模式可以提高消息处理的速度但可能会增加重复消费的风险。
MANUAL和MANUAL_IMMEDIATE的区别
MANUAL 和 MANUAL_IMMEDIATE 都是 Kafka 消费者的手动确认模式它们的区别在于确认的时机不同。
MANUAL 模式下消费者需要显式地调用 Acknowledgment.acknowledge() 方法来确认消息在调用该方法之后消息才会被标记为已消费并且确认消息会在下次 poll() 时发送到 Kafka 服务器。这种模式的优点是可以保证消息的可靠性和顺序性但需要消费者显式地处理确认逻辑。
相比之下MANUAL_IMMEDIATE 模式下在消费者调用 Acknowledgment.acknowledge() 方法时会立即向 Kafka 服务器发送确认消息。这种模式可以提高消息处理的速度但可能会增加重复消费的风险因为如果消息处理失败Kafka 不会再次发送该消息而是认为该消息已经被成功消费了。
在实际使用中应根据业务需求和性能要求来选择合适的确认模式。如果要求消息的可靠性和顺序性比较高可以选择 MANUAL 模式如果要求处理速度比较高可以选择 MANUAL_IMMEDIATE 模式。
AckMode 可以通过配置文件或代码进行设置。例如在 Spring Boot 应用中可以使用以下配置方式指定确认模式
spring.kafka.listener.ack-modemanual_immediate手动提交ACK
kafka默认是自动提交ack的很多时候我们都需要手动提交这就要进行以下配置
1、设置enable-auto-commitfalse禁止自动提交 2、设置ack-mode为manual_immediate
在配置文件进行如下配置
spring.kafka.consumer.enable-auto-commitfalse
spring.kafka.listener.ack-modemanual_immediate3、监听方法的入参加入Acknowledgment ack 参数并在消费完成之后调用acknowledge方法如下所示 KafkaListener(topics my-topic2,groupId myGroup)public void receiveMessage2(String message, Acknowledgment ack){log.info(消费消息message);//ack确认ack.acknowledge();}