网站页面统计代码,网页开发和网站开发一样吗,萝岗免费网站建设,做虚假网站判多少年Rabbitmq发送者可靠性 发送者重连发送者确认1.开启确认机制2.ReturnCallback3.ConfirmCallback MQ的可靠性数据持久化交换机持久化队列持久化消息持久化 Lazy Queue 总结其他文章 Rabbitmq提供了两种发送来保证发送者的可靠性#xff0c;第一种叫发送者重连#xff0c;第二种… Rabbitmq发送者可靠性 发送者重连发送者确认1.开启确认机制2.ReturnCallback3.ConfirmCallback MQ的可靠性数据持久化交换机持久化队列持久化消息持久化 Lazy Queue 总结其他文章 Rabbitmq提供了两种发送来保证发送者的可靠性第一种叫发送者重连第二种叫发送者确认。
发送者重连
有时候由于网络波动可能会出现发送者连接MQ失败的情况通过配置可以开启连接失败后的重连机制
spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true #开启超时重试机制initial-interval: 1000ms # 失败后的初次等待时间multiplier: 1 #失败后下次等待时长倍数下次等待时长 initial-interval * multipliermax-attempts: 3 #最大重试次数注 当网络不稳定的时候利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试也就是说多次重试等待的过程中当前线程是被阻塞的会影响业务性能。 如果对于业务性能有要求建议禁用重试机制。如果一定要使用请合理配置等待时长和重试次数当然也可以考虑使用异步线程来执行发送消息的代码
发送者确认
SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确机制认后当发送者发送消息给MQ后MQ会返回确认结果给发送者。返回的结果有以下几种情况
消息投递到了MQ但是路由失败。此时会通过PublisherReturn返回路由异常原因然后返回ACK告知投递成功临时消息投递到了MQ并且入队成功返回ACK告知投递成功持久消息投递到了MQ并且入队完成持久化返回ACK 告知投递成功其它情况都会返回NACK告知投递失败 1.开启确认机制
在publisher这个微服务的application.yml中添加配置
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制并设置confirm类型publisher-returns: true # 开启publisher return机制注 这里publisher-confirm-type有三种模式可选
none关闭confirm机制simple同步阻塞等待MQ的回执消息correlatedMQ异步回调方式返回回执消息
2.ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback因此需要在项目启动过程中配置 Slf4j
Configuration
RequiredArgsConstructor
public class MqConfig {private final RabbitTemplate rabbitTemplate;PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(returned - {//业务处理log.error(触发return callback,);log.debug(exchange: {}, returned.getExchange());log.debug(routingKey: {}, returned.getRoutingKey());log.debug(message: {}, returned.getMessage());log.debug(replyCode: {}, returned.getReplyCode());log.debug(replyText: {}, returned.getReplyText());});}
}
3.ConfirmCallback
发送消息指定消息ID、消息ConfirmCallback Testpublic void SendConfirmCallBack() throws InterruptedException {// 1.创建CorrelationDataCorrelationData cd new CorrelationData(UUID.randomUUID().toString());// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallbackCorrelationData.Confirm() {Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑基本不会触发log.error(handle message ack fail, ex);}Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑参数中的result就是回执内容if (result.isAck()){// result.isAck()boolean类型true代表ack回执false 代表 nack回执log.error(发送消息成功收到 ack!);}else {// result.getReason()String类型返回nack时的异常描述log.error(发送消息失败收到 nack, reason : {}, result.getReason());}}});String exchangeName amq.topic;String message Hello topic china;this.rabbitTemplate.convertAndSend(exchangeName,.11news11, message,cd);Thread.sleep(3000); // 测试方法用于回显日志}注 发送者确认机制需要跟MQ进行通讯和确认会影响发送的效率开启根据实际需求考虑开以后要注意重试次数不要无限重试注意重试次数否则对性能影像严重。
MQ的可靠性
在默认情况下RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题
一旦MQ宕机内存中的消息会丢失内存空间有限当消费者故障或处理过慢时会导致消息积压引发MQ阻塞 数据持久化
RabbitMQ实现数据持久化包括3个方面
交换机持久化
交换机默认持久化 Durable 持久 Transient 临时
队列持久化
队列默认持久化 Durable 持久 Transient 临时
消息持久化
消息持久化需要手动指定 Non-persistent 非持久化 Persistent 持久化 代码实现
MessageBuilder.withBody(案例.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();注: 由于Mq默认持久化,所以一般不需要修改
Lazy Queue
从RabbitMQ的3.6.0版本开始就增加了Lazy Queue的概念也就是惰性队列。 惰性队列的特征如下
接收到消息后直接存入磁盘不再存储到内存消费者要消费消息时才会从磁盘中读取并加载到内存可以提前缓存部分消息到内存最多2048条 在3.12版本后所有队列都是Lazy Queue模式无法更改。
要设置一个队列为惰性队列只需要在声明队列时指定x-queue-mode属性为lazy即可 代码实现 注解式 RabbitListener(queuesToDeclare Queue( name lazy.queue,durable true,arguments Argument(name x-queue-mode, value lazy) )) public void listenLazyQueue(String msg){ log.info(接收到 lazy.queue的消息{}, msg); }声明式 Beanpublic Queue lazyQueue(){return QueueBuilder.durable(lazy.queue).lazy() // 开启Lazy模式 .build();}总结
首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘MQ重启消息依然存在。RabbitMQ在3.6版本引入了LazyQueue并且在3.12版本后会称为队列的默认模式。LazyQueue会将所有消息都持久化开启持久化和发送者确认时 RabbitMQ只有在消息持久化完成后才会给发送者返回ACK回执
其他文章
发送者可靠性 消费者可靠性 延迟信息