当前位置: 首页 > news >正文

2017山亭区建设局网站北京WordPress爱好者

2017山亭区建设局网站,北京WordPress爱好者,找代做海报的网站,网页设计课RabbitMQ-保证消息可靠性 1.消息可靠性1.1.生产者消息确认1.1.1.修改配置1.1.2.定义Return回调1.1.3.定义ConfirmCallback 1.2.消息持久化1.2.1.交换机持久化1.2.2.队列持久化1.2.3.消息持久化 1.3.消费者消息确认1.3.1.演示none模式1.3.2.演示auto模式 1.4.消费失败重试机制1.… RabbitMQ-保证消息可靠性 1.消息可靠性1.1.生产者消息确认1.1.1.修改配置1.1.2.定义Return回调1.1.3.定义ConfirmCallback 1.2.消息持久化1.2.1.交换机持久化1.2.2.队列持久化1.2.3.消息持久化 1.3.消费者消息确认1.3.1.演示none模式1.3.2.演示auto模式 1.4.消费失败重试机制1.4.1.本地重试1.4.2.失败策略 1.5.总结 1.消息可靠性 消息从发送到消费者接收会经理多个过程 其中的每一步都可能导致消息丢失常见的丢失原因包括 发送时丢失 生产者发送的消息未送达exchange消息到达exchange后未到达queue MQ宕机queue将消息丢失consumer接收到消息后未消费就宕机 针对这些问题RabbitMQ分别给出了解决方案 生产者确认机制mq持久化消费者确认机制失败重试机制 下面操作通过案例来演示每一个步骤。 项目结构如下 1.1.生产者消息确认 RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后会返回一个结果给发送者表示消息是否处理成功。 返回结果有两种方式 publisher-confirm发送者确认 消息成功投递到交换机返回ack消息未投递到交换机返回nack publisher-return发送者回执 消息投递到交换机了但是没有路由到队列。返回ACK及路由失败原因。 注意 1.1.1.修改配置 首先在publisher服务中的application.yml文件添加下面的内容 spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true 说明 publish-confirm-type开启publisher-confirm这里支持两种类型 simple同步等待confirm结果直到超时correlated异步回调定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback publish-returns开启publish-return功能同样是基于callback机制不过是定义ReturnCallbacktemplate.mandatory定义消息路由失败时的策略。true则调用ReturnCallbackfalse则直接丢弃消息 1.1.2.定义Return回调 每个RabbitTemplate只能配置一个R eturnCallback因此需要在项目加载时配置 修改publisher服务添加一个 package cn.zqd.mq.config;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration;Slf4j Configuration public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {// 投递失败记录日志log.info(消息发送失败应答码{}原因{}交换机{}路由键{},消息{},replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要可以重发消息});} }1.1.3.定义ConfirmCallback ConfirmCallback可以在发送消息时指定因为每个业务处理confirm成功或失败的逻辑不一定相同。 在publisher服务的cn.zqd.mq.spring.SpringAmqpTest类中定义一个单元测试方法 public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息体String message hello, spring amqp!;// 2.全局唯一的消息ID需要封装到CorrelationData中CorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result - {if(result.isAck()){// 3.1.ack消息成功log.debug(消息发送成功, ID:{}, correlationData.getId());}else{// 3.2.nack消息失败log.error(消息发送失败, ID:{}, 原因{},correlationData.getId(), result.getReason());}},ex - log.error(消息发送异常, ID:{}, 原因{},correlationData.getId(),ex.getMessage()));// 4.发送消息rabbitTemplate.convertAndSend(task.direct, task, message, correlationData);// 休眠一会儿等待ack回执Thread.sleep(2000); }1.2.消息持久化 生产者确认可以确保消息投递到RabbitMQ的队列中但是消息发送到RabbitMQ以后如果突然宕机也可能导致消息丢失。 要想确保消息在RabbitMQ中安全保存必须开启消息持久化机制。 交换机持久化队列持久化消息持久化 1.2.1.交换机持久化 RabbitMQ中交换机默认是非持久化的mq重启后就丢失。 SpringAMQP中可以通过代码指定交换机持久化 Bean public DirectExchange simpleExchange(){// 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange(simple.direct, true, false); }事实上默认情况下由SpringAMQP声明的交换机都是持久化的。 可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示 1.2.2.队列持久化 RabbitMQ中队列默认是非持久化的mq重启后就丢失。 SpringAMQP中可以通过代码指定交换机持久化 Bean public Queue simpleQueue(){// 使用QueueBuilder构建队列durable就是持久化的return QueueBuilder.durable(simple.queue).build(); }事实上默认情况下由SpringAMQP声明的队列都是持久化的。 可以在RabbitMQ控制台看到持久化的队列都会带上D的标示 1.2.3.消息持久化 利用SpringAMQP发送消息时可以设置消息的属性MessageProperties指定delivery-mode 1非持久化2持久化 用java代码指定 默认情况下SpringAMQP发出的任何消息都是持久化的不用特意指定。 1.3.消费者消息确认 RabbitMQ是阅后即焚机制RabbitMQ确认消息被消费者消费后会立刻删除。 而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的消费者获取消息后应该向RabbitMQ发送ACK回执表明自己已经处理消息。 设想这样的场景 1RabbitMQ投递消息给消费者2消费者获取消息后返回ACK给RabbitMQ3RabbitMQ删除消息4消费者宕机消息尚未处理 这样消息就丢失了。因此消费者返回ACK的时机非常重要。 而SpringAMQP则允许配置三种确认模式 •manual手动ack需要在业务代码结束后调用api发送ack。 •auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nack •none关闭ackMQ假定消费者获取消息后会成功处理因此消息投递后立即被删除 由此可知 none模式下消息投递是不可靠的可能丢失auto模式类似事务机制出现异常时返回nack消息回滚到mq没有异常返回ackmanual自己根据业务情况判断什么时候该ack 一般我们都是使用默认的auto即可。 1.3.1.演示none模式 修改consumer服务的application.yml文件添加下面内容 spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack修改consumer服务的SpringRabbitListener类中的方法模拟一个消息处理异常 RabbitListener(queues simple.queue) public void listenSimpleQueue(String msg) {log.info(消费者接收到simple.queue的消息【{}】, msg);// 模拟异常System.out.println(1 / 0);log.debug(消息处理完成); }测试可以发现当消息处理抛异常时消息依然被RabbitMQ删除了。 1.3.2.演示auto模式 再次把确认机制修改为auto: spring:rabbitmq:listener:simple:acknowledge-mode: auto # 关闭ack在异常位置打断点再次发送消息程序卡在断点时可以发现此时消息状态为unack未确定状态 抛出异常后因为Spring会自动返回nack所以消息恢复至Ready状态并且没有被RabbitMQ删除 1.4.消费失败重试机制 当消费者出现异常后消息会不断requeue重入队到队列再重新发送给消费者然后再次异常再次requeue无限循环导致mq的消息处理飙升带来不必要的压力 怎么办呢 1.4.1.本地重试 我们可以利用Spring的retry机制在消费者出现异常时利用本地重试而不是无限制的requeue到mq队列。 修改consumer服务的application.yml文件添加内容 spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态false有状态。如果业务中包含事务这里改为false重启consumer服务重复之前的测试。可以发现 在重试3次后SpringAMQP会抛出异常AmqpRejectAndDontRequeueException说明本地重试触发了查看RabbitMQ控制台发现消息被删除了说明最后SpringAMQP返回的是ackmq删除消息了 结论 开启本地重试时消息处理过程中抛出异常不会requeue到队列而是在消费者本地重试重试达到最大次数后Spring会返回ack消息会被丢弃 1.4.2.失败策略 在之前的测试中达到最大重试次数后消息会被丢弃这是由Spring内部机制决定的。 在开启重试模式后重试次数耗尽如果消息依然失败则需要有MessageRecovery接口来处理它包含三种不同的实现 RejectAndDontRequeueRecoverer重试耗尽后直接reject丢弃消息。默认就是这种方式 ImmediateRequeueMessageRecoverer重试耗尽后返回nack消息重新入队 RepublishMessageRecoverer重试耗尽后将失败消息投递到指定的交换机 比较优雅的一种处理方案是RepublishMessageRecoverer失败后将消息投递到一个指定的专门存放异常消息的队列后续由人工集中处理。 1在consumer服务中定义处理失败消息的交换机和队列 Bean public DirectExchange errorMessageExchange(){return new DirectExchange(error.direct); } Bean public Queue errorQueue(){return new Queue(error.queue, true); } Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error); }2定义一个RepublishMessageRecoverer关联队列和交换机 Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error); }完整代码 package cn.zqd.mq.config;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.context.annotation.Bean;Configuration public class ErrorMessageConfig {Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange(error.direct);}Beanpublic Queue errorQueue(){return new Queue(error.queue, true);}Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error);}Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);} }1.5.总结 如何确保RabbitMQ消息的可靠性(面试题) 开启生产者确认机制确保生产者的消息能到达队列开启持久化功能确保消息未消费前在队列中不会丢失开启消费者确认机制为auto由spring确认消息处理成功后完成ack开启消费者失败重试机制并设置MessageRecoverer多次重试失败后将消息投递到异常交换机交由人工处理
http://www.dnsts.com.cn/news/53590.html

相关文章:

  • 太原市做网站石家庄软件外包
  • 如何做高端网站昆明app制作
  • 手机上怎么自己做网站二次开发收费需要高点
  • 东莞网站建设网站排名优化济南源码网站建设
  • 游戏平台网站做公司网站的专业公司深圳
  • 网站建设费入何科目江西中慧城乡建设开发公司网站
  • 做建材的网站好名字wordpress页面添加
  • 网站怎样做平面设计图郑中设计事务所
  • 微网站建设正规公司高端个性化网站开发
  • 深圳网站建设叶林高端网站建设南宁
  • 新建茶叶网站文章内容建设wordpress网站很慢
  • 江门网站建设推广西山区城市建设局网站
  • 做一直播网站要多少钱营销网站制作企业
  • 网站注销怎么做消咸鱼网站交易付款怎么做
  • 什么网站发布建设标准公众号如何做微网站
  • 豆芽网站建设 优帮云网站建成后应该如何推广
  • 正规的南昌网站建设网页设计软件介绍
  • 网站底部浮动代码国家工商注册网
  • 南充做网站多少钱简单网站建设规划方案
  • 四川华泰建设集团网站百度收录不了网站吗
  • html5网站链接标签wordpress 点击排行
  • 端子东莞网站建设智能营销客户管理系统
  • 做视频网站推广挣钱吗广东专业网站优化公司
  • 网站设计简单网页百度搜索网站优化
  • 建设信用卡中心网站百度推送 wordpress
  • 做酒的网站托者设计吧官网
  • 永久免费建站空间推广普通话主题班会记录
  • 台州网站专业制作公司流程管理系统
  • 做网站策划遇到的问题软件开放和网站开发
  • 怎么做返利网之类的网站北京网络推广平台