户外网站模板,主题网站设计与制作,wordpress省理工大学,中山建设招聘信息网站目录
前言#xff1a;
一.消息确认机制
• ⾃动确认
• ⼿动确认 手动确认方法又分为三种#xff1a;
二. 代码实现#xff08;spring环境#xff09;
配置相关信息#xff1a;
1#xff09;. AcknowledgeMode.NONE
2 #xff09;AcknowledgeMode.AUTO
3
一.消息确认机制
• ⾃动确认
• ⼿动确认 手动确认方法又分为三种
二. 代码实现spring环境
配置相关信息
1. AcknowledgeMode.NONE
2 AcknowledgeMode.AUTO
3AcknowledgeMode.MANUAL
总结 前言
前期讲了RabbitMQ的概念和应⽤,RabbitMQ实现了AMQP0-9-1规范的许多扩展,在RabbitMQ官⽹上,也给⼤家介绍了RabbitMQ的⼀些特性,我们挑⼀些重要的且常⽤的给⼤家讲⼀下
Rabbitmq官网 一.消息确认机制 ⽣产者发送消息之后,到达消费端之后,可能会有以下情况: a. 消息处理成功 b. 消息处理异常 RabbitMQ向消费者发送消息之后,就会把这条消息删掉,那么第两种情况,就会造成消息丢失. 那么如何确保消费端已经成功接收了,并正确处理了呢 为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制messageacknowledgement)。 消费者在订阅队列时可以指定autoAck参数,根据这个参数设置,消息确认机制分为以下两种: • ⾃动确认 当autoAck等于true时, RabbitMQ 会⾃动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,⽽不管消费者是否真正地消费到了这些消息.⾃动确认模式适合对于消息可靠性要求不⾼的场景. • ⼿动确认 当autoAck等于false时RabbitMQ会等待消费者显式地调⽤Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移去消息.这种模式适合对消息可靠性要求⽐较⾼的场景. 手动确认方法又分为三种 肯定确认:Channel.basicAck(long deliveryTag, boolean multiple) RabbitMQ已知道该消息并且成功的处理消息可以将其丢弃了.否定确认: Channel.basicReject(long deliveryTag, boolean requeue) 消费者客⼾端可以调⽤channel.basicReject⽅法来告诉RabbitMQ拒绝这个消息.否定批量确认: Channel.basicNack(long deliveryTag, boolean multiple,boolean requeue)Basic.Reject命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使⽤Basic.Nack这个命令.消费者客⼾端可以调⽤channel.basicNack⽅法来实现. 参数说明 1deliveryTag 消息的唯⼀标识它是⼀个单调递增的64位的⻓整型值. deliveryTag 是每个通道 Channel独⽴维护的,所以在每个通道上都是唯⼀的.当消费者确认(ack)⼀条消息时,必须使⽤对应的通道上进⾏确认. 2multiple 是否批量确认.在某些情况下,为了减少⽹络流量,可以对⼀系列连续的 deliveryTag 进 ⾏批量确认.值为true则会⼀次性ack所有⼩于或等于指定deliveryTag的消息.值为false,则只确认当前指定deliveryTag的消息. 3requeue 表⽰拒绝后,这条消息如何处理.如果requeue参数设置为true,则RabbitMQ会重新将这条 消息存⼊队列,以便可以发送给下⼀个订阅的消费者.如果requeue参数设置为false,则RabbitMQ会把消息从队列中移除,⽽不会把它发送给新的消费者. 二. 代码实现spring环境 1.可以直接使用RabbitMQ Java Client 库 2.使用spring集成的amqp 主要介绍第二种在spring环境下实现
Spring-AMQP 对消息确认机制提供了三种策略.
public enum AcknowledgeMode { NONE //确认,MANUAL//手动 ,AUTO //默认;
}
配置相关信息
基本信息以及确认机制 队列交换机以及它们之间的绑定关系
package com.bite.extensions.config;import com.bite.extensions.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitConfig {Bean(ackQueue)public Queue ackQueue() {return QueueBuilder.durable(Constants.ACK_QUEUE).build();}Bean(directExchange)public DirectExchange directExchange() {return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();}Bean(ackBinding)public Binding ackBinding(Qualifier(directExchange) DirectExchange directExchange,Qualifier(ackQueue) Queue queue) {return BindingBuilder.bind(queue).to(directExchange).with(ack);}
}
生产者
主要解释消费者在不同确认机制的状态
package com.bite.extensions.controller;import com.bite.extensions.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;RequestMapping(/producer)
RestController
public class ProducerController {Autowiredprivate RabbitTemplate rabbitTemplate;RequestMapping(/ack)public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,ack,consumer ack mode test...);return 消息发送成功!;}
}
1AcknowledgeMode.NONE 这种模式下,消息⼀旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ就会⾃动确认 消息,从RabbitMQ队列中移除消息.如果消费者处理消息失败,消息可能会丢失. 1消费者 正常消费情况下
package com.bite.extensions.listener;import com.bite.extensions.constant.Constants;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;Component
public class AckListener {RabbitListener(queues Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑long deliverTag message.getMessageProperties().getDeliveryTag();System.out.printf(接收到信息: %s, deliveryTag: %d\n,new String(message.getBody(),UTF-8),deliverTag);//业务逻辑处理System.out.println(业务逻辑处理);System.out.println(业务逻辑完成);}
} 消费者正确处理,MQ删除相应信息
2消费者 异常消费情况下
Component
public class AckListener {RabbitListener(queues Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑long deliverTag message.getMessageProperties().getDeliveryTag();System.out.printf(接收到信息: %s, deliveryTag: %d\n,new String(message.getBody(),UTF-8),deliverTag);//业务逻辑处理System.out.println(业务逻辑处理);int num 3/0; //异常System.out.println(业务逻辑完成);}
} 可以看到,消费者处理失败,但是消息已经从RabbitMQ中移除.
2 AcknowledgeMode.AUTO 这种模式下,消费者在消息处理成功时会⾃动确认消息,但如果处理过程中抛出了异常,则不会确认消息. listener:simple:acknowledge-mode: auto #消息接收确认
1消费者 正常消费情况下
Component
public class AckListener {RabbitListener(queues Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑long deliverTag message.getMessageProperties().getDeliveryTag();System.out.printf(接收到信息: %s, deliveryTag: %d\n,new String(message.getBody(),UTF-8),deliverTag);//业务逻辑处理System.out.println(业务逻辑处理);//int num 3/0;System.out.println(业务逻辑完成);}
} 2消费者 异常消费情况下
Component
public class AckListener {RabbitListener(queues Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws UnsupportedEncodingException {//消费者逻辑long deliverTag message.getMessageProperties().getDeliveryTag();System.out.printf(接收到信息: %s, deliveryTag: %d\n,new String(message.getBody(),UTF-8),deliverTag);//业务逻辑处理System.out.println(业务逻辑处理);int num 3/0;System.out.println(业务逻辑完成);}
}
..........
接收到信息: consumer ack mode test..., deliveryTag: 88
业务逻辑处理
2024-11-17T15:19:11.42008:00 WARN 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
接收到信息: consumer ack mode test..., deliveryTag: 89
业务逻辑处理
2024-11-17T15:19:11.47708:00 INFO 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2024-11-17T15:19:11.47708:00 INFO 22936 --- [rabbitmq-extensions-demo] [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish. 消费者处理异常会一直重试发送所有仍然保留在mq中 3AcknowledgeMode.MANUAL ⼿动确认模式下,消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息.如果消 息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会在消费者可⽤时重新投递该消息,这 种模式提⾼了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,⽽是可以被重新处理. listener:simple:acknowledge-mode: manual#消息接收确认
1消费者 正常消费情况下
Component
public class AckListener {RabbitListener(queues Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws Exception {//消费者逻辑long deliverTag message.getMessageProperties().getDeliveryTag();try {System.out.printf(接收到信息: %s, deliveryTag: %d\n,new String(message.getBody(),UTF-8),deliverTag);//业务逻辑处理System.out.println(业务逻辑处理);//int num 3/0;System.out.println(业务逻辑完成);//肯定确认channel.basicAck(deliverTag,false);} catch (Exception e) {//否定确认channel.basicNack(deliverTag,false,true);}}
}如果不进行确认 又会发送什么 当我们使用手动确认manual的时候一定要手动添加上肯定确认不然即使消费者处理成功也不会进行确认 2消费者 异常消费情况下
Component
public class AckListener {RabbitListener(queues Constants.ACK_QUEUE)public void handleMessage(Message message, Channel channel) throws Exception {//消费者逻辑long deliverTag message.getMessageProperties().getDeliveryTag();try {System.out.printf(接收到信息: %s, deliveryTag: %d\n,new String(message.getBody(),UTF-8),deliverTag);//业务逻辑处理System.out.println(业务逻辑处理);int num 3/0;System.out.println(业务逻辑完成);//肯定确认channel.basicAck(deliverTag,false);} catch (Exception e) {//否定确认channel.basicNack(deliverTag,false,true);}}
} 否定确认完又会进行重新入队会变成Ready状态 此时修改为false不让它入队会发生什么 消费者处理异常会不停的重试 使用manual一定要进行手动确认 总结
模式确认方式可靠性性能使用场景None无确认低可能丢失消息高不关心消息是否成功消费丢失消息可容忍的场景Auto自动确认较低可能丢失消息较高对丢失消息容忍度较高的场景Manual手动确认高消息只有成功处理才会确认较低需要确保每条消息被成功消费的场景 None 适用于性能要求高但对消息丢失不敏感的场景。Auto 适合那些不需要太高消息可靠性的应用但仍然需要自动化确认机制。Manual 最适合那些对消息处理的可靠性要求较高尤其是在出现异常时需要精细控制消息是否重新入队或丢弃的场景。 选择哪种模式取决于你的具体需求尤其是对于消息可靠性的要求以及系统的性能考虑。 结语 写博客不仅仅是为了分享学习经历同时这也有利于我巩固知识点总结该知识点由于作者水平有限对文章有任何问题的还请指出接受大家的批评让我改进。同时也希望读者们不吝啬你们的点赞收藏关注你们的鼓励是我创作的最大动力