哈尔滨网站设计多少钱,各大门户网站用什么做的,网站在哪里设置关键字,深圳建网前言
上一篇介绍了如何在 《SpringBoot 中集成和使用消息队列》#xff0c;看过这一篇就基本上可以在SpringBoot中使用消息队列了#xff0c;但是消息队列他归根结底是一个客户端服务器模式的中间件#xff0c;面对复杂的网络环境和分布式使用环境#xff0c;难免会出现各…前言
上一篇介绍了如何在 《SpringBoot 中集成和使用消息队列》看过这一篇就基本上可以在SpringBoot中使用消息队列了但是消息队列他归根结底是一个客户端服务器模式的中间件面对复杂的网络环境和分布式使用环境难免会出现各种问题。出现问题不可怕重点在于如何预防和处理本章就重点介绍一下如何预防和处理使用SpringAMQP时可能出现的问题。
一、消息堆积
1、什么是消息堆积
消息堆积指的是消费者这边的处理能力低于生产者这边生产消息的能力导致大量的消息积压在MQ的一种现象。消息堆积可能导致短时间内队列达到最大容量导致使新消息无法进入队列对于时间敏感的消息可能成为死信。
2、使用 work 模式同时开启prefetch
work模式简单来说就是让多个消息队列绑定到一个队列共同消费队列中的消息。
默认情况下消息队列是通过轮询的方式将消息推送给消费者的完全不考虑消费者的消费能力。举个例子假设生产者生产了50条消息消费者1的处理能力是1秒50条消费者2的消费能力是1秒5条实际这五十条消息会通过轮询各分配给两个消费者25条如果消费者还没处理完就会阻塞等待处理完之后再继续推送。
所以默认情况并没有考虑到消费者是否已经处理完消息可能也会造成消息堆积。怎么解决呢可以通过修改配置文件将prefetch设置为1即每次给消费者投递一条消息处理完了再投递下一条这样可以尽可能发挥每个消费者的最大处理能力。
spring:rabbitmq:listener:simple:prefetch: 1 #每次投递一条消息消费完在投递下一条3、对消息进行异步处理
再者就是可以在代码上进行优化比如在消息处理的时候使用线程池进行异步消费这样可以缩短每个消息的处理时间降低消息堆积的可能性。
二、发送者可靠性
1、发送者重连机制
有时候可能因为网络波动可能会出现客户端连接MQ失败的情况。这里可以通过重试机制来提高消息发送的成功率。
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后等初始待时间multiplier: 1 # 失败后下次等待时长倍数max-attempts: 3 # 最大重试次数
2、发送者确认机制
在生产者这边是可以开启确认机制的就是MQ他在接收到消息成功后会返回一个ack给生产者接收失败就返回nack生产者这边就可以根据返回的结果如果失败了就可以进行重发。RabbitMQ这边提供了两种确认机制 Publisher Confirm当生产者向消息队列发送消息时如果有设备或网络故障导致消息丢失或其他错误AMQP 协议会自动触发 Confirm 机制将消息发送失败的信息返回给生产者。生产者可以根据返回的信息进行相应的处理例如重发、记录日志等。Publisher Return消息路由失败时触发一般不开启因为路由失败是自己业务的问题 spring:rabbitmq:publisher-confirm-type: CORRELATED # none: 关闭confirm机制 # simple: 同步阻塞等待MQ回执消息 # correlated: MQ异步回调方式返回回执消息publisher-returns: true 对于ReturnCallback整个项目中配置一次即可
Slf4j
Configuration
public class MqCommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallback(路由失败时触发)rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.debug(收到消息的 return callbackexchange:{}, key:{}, msg:{}, code:{}, text:{},returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getReplyCode(),returnedMessage.getReplyText());}});}
}
ConfirmCallback 每次发送消息都需要编写
Test
void testConfirmCallback() throws InterruptedException {// 1.创建CorrelationData并指定消息IDCorrelationData cd new CorrelationData(UUID.randomUUID().toString());// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallbackCorrelationData.Confirm() {Overridepublic void onFailure(Throwable ex) {// 基本不会触发log.error(消息回调失败,ex);}Overridepublic void onSuccess(CorrelationData.Confirm result) {// 这里执行回调if (result.isAck()) {// 消息发送成功log.debug(消息发送成功收到 ack);} else {// 消息发送失败log.debug(消息发送失败收到 nack);// 重传等业务逻辑...}}});rabbitTemplate.convertAndSend(forum.direct,red,hello,cd);
}虽然上述确认机制可以基本保证生产者发送消息的可靠性但是会增加系统额外的负担和资源开销因为生产者确认也需要通过MQ来执行回调如果需要使用不需要开启publisher return自己代码写的有问题对于nack也可以有限次重试失败多了直接记录异常即可。
三、MQ可靠性
对于MQ本身是提供了持久化的功能的可以给保证MQ重启数据不丢失。并且在持久化情况下开启生产者确认时RabbitMQ只有在消息持久化完成之后才会给生产者返回ACK回执。 四、消费者可靠性
1、消息者确认机制
为了确认消费者是否成功处理消息RabbitMQ提供了消费者确认机制当消费者处理结束消息之后向RabbitMQ发送一个回执告知RabbitMQ自己消息的处理状态
ack成功处理消息RabbitMQ从队列中删除消息nack消息处理失败RabbitMQ需要再次投递消息reject消息处理失败并拒绝消息RabbitMQ从队列中删除该消息
spring:rabbitmq:listener:simple:acknowledge-mode: AUTO# none不处理。消息投递给消费者后立即ack消息立即从MQ中删除不安全# manual手动模式。需要自己在业务代码中调用api发送ack或reject# auto自动模式。SpringAMQP使用AOP对我们的消息处理逻辑进行了环绕增强当业务正常执行时自动返回ack异常时如果是业务异常会返回nack如果是消息处理或校验异常会返回reject2、失败重试机制
当消费者处理消息出现异常后MQ这边会再次将消息投递给消费者如果无限失败就会无限重试对于MQ和消费者来讲压力就比较大可以利用SpringAMQP的retry进制当消费者出现异常时限制重试次数
spring:rabbitmq:listener:simple:acknowledge-mode: AUTOretry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时间为1smultiplier: 1 # 下次失败的等待时长的倍数max-attempts: 3 # 最大重试次数stateless: true # true 无状态false 有状态开启重试机制后如果重试次数耗尽消息依然失败就需要被MessageRecoverer接口来处理 RejectAndDontRequeueRecoverer重试耗尽后直接reject默认的方式ImmediateRequeueMessageRecoverer重试耗尽后返回nack消息重新入队RepublishMessageRecoverer重试耗尽后将失败消息投递到指定交换机建议 下面演示第三中策略的接口配置实现
Configuration
public class ErrorConfiguration {Beanpublic DirectExchange errorExchange() {return new DirectExchange(error.exchange);}Beanpublic Queue errorQueue() {return new Queue(error.queue);}Beanpublic Binding errorBinding(Queue errorQueue,DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with(error);}Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate,error.exchange,error);}
}这样即使重试次数耗尽消息也不会丢失而是投递到了 error.queue 的队列里面。
五、保证幂等性
1、什么是幂等性
幂等性就是重复执行相同的操作系统的状态不会发送变化。比如查询和删除这些操作本身就是幂等的它们多次操作不会给系统造成状态不一致的影响。
上述机制可以保证消息“至少”被消费1次但是由于网络的复杂性可能生产者收不到ack导致消息的重发或者MQ这边没有收到消费者的ack导致消息的重复投递这些都可能造成消息的重复消费所以这个时候就要考虑幂等性问题了。
2、使用唯一 ID
生产者这边在给RabbitMQ投递消息的时候附带一个唯一消息的IDRabbitMQ这边它是自带去重功能的就是相同ID的消息它是只存储一份的.
消费者这里他就可以消费完一条消息后先将消息ID存起来然后后面的消息根据ID进行判断是否是重复消息如果重复直接丢弃就行了.
给消息设置ID的方法
Configuration
public class Config {Beanpublic MessageConverter messageConverter() {Jackson2JsonMessageConverter jjmc new Jackson2JsonMessageConverter();jjmc.setCreateMessageIds(true);return jjmc;}
}3、针对业务进行判断
以支付扣减余额和修改订单状态为例 首先支付服务会在余额扣减成功后利用MQ将消息通知给修改订单状态的服务.修改订单状态之前先查询订单状态只有已支付的订单才做修改这样就可以在业务上保证幂等. 六、实现延迟消息
1、借助死信对列
因为对于那些超时为处理的消息MQ会投递到死信对列我们就可以借助这个特性先将消息投递到到一个普通的对列中然后如果超时就直接投到了死信对列然后就让消费者监听死信对列就可以实现延迟消息了。PS对列通过dead-letter-exchange属性绑定的交换机就称为 死信交换机。
发送延迟消息
void testSendTTLMessage() {Message message MessageBuilder.withBody(hello.getBytes(StandardCharsets.UTF_8)).setExpiration(5000).build(); // 5秒钟的延迟消息rabbitTemplate.convertAndSend(simple.direct1,testKey,message);
}2、使用RabbitMQ官方提供的插件
在RabbitMQ中官方是推出了一种原生支持延迟消息的插件的。原理是设计了一种支持延迟消息功能的交换机当消息投递到交换机后暂存一段时间到期后投递到队列。下面讲解插件使用
消费者声明 RabbitListener(bindings QueueBinding(value Queue(name delay.queue,durable true),exchange Exchange(name delay.direct,delayed true,type ExchangeTypes.DIRECT),key delay))public void listenDelayMessage(String msg) {log.info(接收到delay.queue的延迟消息 {}msg);}生产者发送 Testvoid testSendDelayMessage() throws InterruptedException {rabbitTemplate.convertAndSend(delay.direct, delay, hello,delay!, new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 延迟10秒message.getMessageProperties().setDelay(5000);return message;}});log.info(延迟消息发送成功!);}如果感觉这一大串太麻烦可以将 new MessagePostProcessor() 分离出来
// 封装专门用来发送延迟消息的处理器
RequiredArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final int delay;Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}Testvoid testSendDelayMessage2() throws InterruptedException {rabbitTemplate.convertAndSend(delay.direct, delay, hello,delay!, new DelayMessageProcessor(5000));log.info(延迟消息发送成功!);Thread.sleep(1000);}