网站建设项目分工,用dw如何做网站链接,安阳百度贴吧,易企秀怎么做网站链接重试机制
在消息从Broker到消费者的传递过程中#xff0c;可能会遇到各种问题#xff0c;如网络故障、服务不可用、资源不足等#xff0c;这些问题都可能导致消息处理失败。为了解决这些问题#xff0c;RabbitMQ提供了重试机制#xff0c;允许消息在处理失败之后重新发送…重试机制
在消息从Broker到消费者的传递过程中可能会遇到各种问题如网络故障、服务不可用、资源不足等这些问题都可能导致消息处理失败。为了解决这些问题RabbitMQ提供了重试机制允许消息在处理失败之后重新发送。
但如果是程序逻辑引起的错误那么多次重试也是不起作用的因此设置了重试次数。
消费者确认机制为AUTO时
当消费者确认机制是AUTO时如果程序逻辑错误那么就会不断重试造成消息积压。因此我们就需要设置重试次数当多次重试还是失败消息就会被自动确认自然消息就会丢失。
spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时长max-attempts: 5 # 最大重试次数
Configuration
public class RetryConfig {Bean(retryQueue)public Queue retryQueue() {return QueueBuilder.durable(Constants.RETRY_QUEUE).build();}Bean(retryExchange)public Exchange retryExchange() {return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).durable(true).build();}Bean(retryQueueBind)public Binding retryQueueBind(Qualifier(retryExchange) Exchange exchange,Qualifier(retryQueue) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(retry).noargs();}}
RestController
RequestMapping(/retry)
public class RetryController {Resourcepublic RabbitTemplate rabbitTemplate;RequestMappingpublic void retryQueue() {this.rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE, retry, hello 重试机制);System.out.println(重试机制生产者发送消息成功);}}
Configuration
public class RetryListener {RabbitListener(queues Constants.RETRY_QUEUE)public void retryListener(String msg) {System.out.println(获取到消息 msg);int n 3 / 0;}}
上述代码和可靠性传输一文的消费者确认机制中策略为AUTO的代码类似只不过在此配置文件中加了一个重试机制。当启动程序之后可以看到如下结果 重试时 重试结束之后 从测试结果可以看出当消费者确认机制的策略为AUTO时遇到异常就会进行重试当重试结束之后依然没有接收时就会自动确认消息。
消费者确认机制为MANAUL时
当消费者确认机制是MANUL时修改消费者代码并启动程序查看结果
spring:rabbitmq:host: 43.138.108.125port: 5672username: adminpassword: adminvirtual-host: mq-springboot-testlistener:simple:acknowledge-mode: manual # 消息确认机制手动确认retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时长max-attempts: 5 # 最大重试次数
Configuration
public class RetryListener {RabbitListener(queues Constants.RETRY_QUEUE)public void retryListener(Message msg, Channel channel) throws IOException {try {System.out.println(接收到消息 msg);int num 3 / 0; // 模拟处理失败channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);} catch (Exception e) {channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);}}} 从测试结果可以得出消费者确认机制为手动确认时并不会依据配置文件中的重试次数等内容来做而是依据消息者自身的代码实现来做实现机制。原因是因为手动确认模式下消费者需要显示地对消息进行确认如果消费者在消息处理过程中遇到异常可以选择确认不确定消息也可以选择重新入队。所以重试的控制权并不在应用程序本身而在于代码逻辑本身。 1. 消费者确认机制为AUTO时如果程序逻辑异常多次重试还是失败。那么消息就会自动确认进而消息就会丢失。 2. 消费者确认机制为MANAUL时如果程序逻辑异常多次重试依然处理失败无法被确认消息就会积压。 3. 消费者确认机制为NONE时不管发生什么情况当消息从Broker内部发出时就会自动确认因此它不存在任何内容。 TTL
TTL过期时间。当消息到达过期时间之后还没有被消息消息就会被自动清除。
RabbitMQ可以对队列和消息设置过期时间。如果两种方法同时使用那么就以两者较小的值为准。
设置消息的TTL
Configuration
public class TllConfig {Bean(ttlQueue)public Queue ttlQueue() {return QueueBuilder.durable(Constants.TTL_QUEUE).build();}Bean(ttlExchange)public Exchange ttlExchange() {return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}Bean(ttlQueueBind)public Binding ttlQueueBind(Qualifier(ttlExchange) Exchange exchange,Qualifier(ttlQueue) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(ttl).noargs();}}
RestController
RequestMapping(/ttl)
public class TtlController {Resourceprivate RabbitTemplate rabbitTemplate;RequestMappingpublic void ttlQueue() {MessagePostProcessor messagePostProcessor new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration(50000);return message;}};this.rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, ttl, hello ttl, messagePostProcessor);}} 在TTL的测试中不需要消费者的存在否则看不到消息在队列中的自动丢失。
设置队列的TTL
注意设置队列的TTL并不是过期之后删除整个队列也是关于消息设置的。只不过投递到该消息队列的所有消息都有一个共同的过期时间而已。
Configuration
public class TllConfig {Bean(ttlQueue)public Queue ttlQueue() {return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(5000).build();}Bean(ttlExchange)public Exchange ttlExchange() {return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).build();}Bean(ttlQueueBind)public Binding ttlQueueBind(Qualifier(ttlExchange) Exchange exchange,Qualifier(ttlQueue) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(ttl).noargs();}}
设置队列的TTL只需要在声明队列时给出过期时间即可。在测试的过程中如果是先测试了消息的过期时间那么在测试队列时需要先将持久化的队列给删除再启动程序。
当启动程序之后可以看到队列上加了一个TTL的标识表明队列的过期时间设置成功 区别
设置队列的过期时间一旦消息过期就会从队列中删除。
设置消息的过期时间即使消息过期如果消息不在队首还得等到消息到达队首之后才会进行判定是否过期。如果过期那就删除反之就投递到相应的消费者中。 为什么这两种方法处理的方式不一样 因为设置队列的过期时间那么队列中过期的消息一定在队首RabbitMQ只需要定期从队首扫描消息是否有过期的消息即可。 而设置消息的过期时间每条消息的过期时间都不一致如果要删除队列的所有过期消息那么就要扫描整个队列所以不如等到消息要进行投递时再判断消息是否过期这样可以减少一定的资源消耗。