做外商备案的网站,建立网站专业公司吗,用html制作淘宝网页,网站目录管理模板下载生产者者的可靠性
为了保证我们生产者在发送消息的时候消息不丢失#xff0c;我们需要保证发送者的可靠性
1.生产者重试
假如发送消息的时候消息丢失 #xff0c;我们可以使用发送者 重试机制#xff0c;尝试重新发送消息
实现该机制非常简单#xff0c;只需要在yml文…生产者者的可靠性
为了保证我们生产者在发送消息的时候消息不丢失我们需要保证发送者的可靠性
1.生产者重试
假如发送消息的时候消息丢失 我们可以使用发送者 重试机制尝试重新发送消息
实现该机制非常简单只需要在yml文件中 配置 rabbitmq的配置信息就可以了 2.生产者确认机制
在我们 生产者发送消息到交换机的时候假如 我们发送到交换机 但是 队列没有收到消息会返回ack发送到交换机然后发送到队列消费者没有接收到消息返回ack但是发送到交换机失败会返回nack
也就是说 我们 只要消息成功发送到 mq里面去无论消息是否成功路由被消费或者没被消费他都会 返回ack
下面我们看具体实现代码
生产者代码 Testpublic void publisherConfirm(){//获得 相关组件CorrelationData correlationData new CorrelationData();correlationData.getFuture().addCallback(new ListenableFutureCallbackCorrelationData.Confirm() {Overridepublic void onFailure(Throwable ex) {log.error(消息发送错误{},ex.getMessage());}Overridepublic void onSuccess(CorrelationData.Confirm result) {if(result.isAck()){log.info(发送成功收到{}但是消息可能未路由,result.getReason());}else{log.error(发送失败收到{},result.getReason());}}});//我们像一个不的交换机中发送一条消息 验证我们的情况rabbitTemplate.convertAndSend(myExchange.direct,my,aaa,correlationData);}
消费者代码
Component
Slf4j
RequiredArgsConstructor
public class Consumer2 {private final RabbitTemplate rabbitTemplate;
RabbitListener(bindings QueueBinding(value Queue(value mySimple.queue,durable true,arguments Argument(namex-queue-mode,valuelazy)),exchange Exchange(value myExchange.direct,durable true),key my
))
public void consumerMessage(String message){log.info(收到消息{},message);
}}
mq消息可靠性
1.交换机 队列持久化
我们在设置的时候可以设置交换机队列持久化这样即使我们rabbitmq重启的话也不用担心交换机队列丢失
2.消息持久化
我们在设置的时候可以手动设置 一个持久化的 message 对象当作消息传递然后 持久化之后消息会直接存储到磁盘中去防止内存中的消息积压同时也可以为消息设置优先级
3.lazyqueue
我们使用lazyqueue 形式发送消息到队列的时候消息会直接存储到磁盘中去一直到被需要消费的时候才会被加载
设置lazyqueue也很简单我们只需要在代码中简单配置即可 消费者可靠性
1.消费者确认机制
消费者确认消息之后会返回ack并且删除消息
消费者对消息处理失败返回nackmq需要重新发送消息
reject 消费者处理消息失败并且拒绝该消息并且删除消息
而怎么确认消息也有三种机制 none不处理。即消息投递给消费者后立刻ack消息会立刻从MQ删除。非常不安全不建议使用 manual手动模式。需要自己在业务代码中调用api发送ack或reject存在业务入侵但更灵活 auto自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强当业务正常执行时则自动返回ack. 当业务出现异常时根据异常判断返回不同结果 如果是业务异常会自动返回nack 如果是消息处理或校验异常自动返回reject
我们在yml文件中配置即可
2.消息重试策略
当一条消息发送失败的时候消费者重新尝试消费消息达到我们重试的次数之后消费者返回rejectmq直接删除消息
3.处理失败消息
当消息彻底处理失败我们消费者设置一个新的交换机队列 重新存储这些失败的消息后续再做处理
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;Configuration
ConditionalOnProperty(name spring.rabbitmq.listener.simple.retry.enabled, havingValue true)
public class ErrorMessageConfig {Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange(error.direct);}Beanpublic Queue errorQueue(){return new Queue(error.queue, true);}Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error);}Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);}
}
死信交换机和延迟消息
死信交换机 都是假如一个定时消息过期了或者发送延迟消息我们直接把该消息传递到我们绑定的死信交换机中跟上文 消息发送失败了返回rejct之后消息发送到err交换机是两种不同的策略 我们用死信交换机发送延迟消息的策略可以如下图 而代码也很简单只需要再声明队列a的时候绑定死信交换机就可以了
dlx就是死信交换机