当前位置: 首页 > news >正文

深圳住房和建设局网站首页seog

深圳住房和建设局网站首页,seog,电脑版浏览器在线使用,网站如何建设移动端RabbitMQ消息确认 SpringBoot与RabbitMQ整合后#xff0c;对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致#xff1b; 消息发布确认 生产者给交换机发送消息后、若是不管了#xff0c;则会出现消息丢失#xff1b; 解决方案1#xff1a; 交换机接受…RabbitMQ消息确认 SpringBoot与RabbitMQ整合后对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致 消息发布确认 生产者给交换机发送消息后、若是不管了则会出现消息丢失 解决方案1 交换机接受到消息、给生产者一个答复ack, 若生产者没有收到ack, 可能出现消息丢失因此重新发送消息 解决方案1隐藏问题若是交换机发送了ack, 出现网络延迟则生产者没有收到ack, 就会出现消息重复发送问题 进而衍生幂等性问题 隐藏问题解决方案1在数据库中增加一张去重表设置唯一索引 生产者在消息内容中翻入唯一ID消费者消费时、先从数据库查询是否存在存在则不处理该消息 适用于并发低、业务严谨的场景 隐藏问题解决方案2利用Redis的String的setnx,若key存在则不处理、若key存在则执行业务 适用于短时间处理大量消息且 key不会重复 -这就是大名鼎鼎的幂等性问题贼讨厌这些专有名词 业务开发中的幂等性 前端保存数据时、点击多次保存按钮插入多条数据 解决方案 前端限制按钮点击、 数据库设置业务唯一索引 消息推送中可能出现多条内容一样的消息又不可以重复处理 需要幂等性处理 上家公司中后台给app客户端推送系统消息时、配置给所有用户推送消息 其他服务给我的应用消息服务推送 RabbitMQ消息 正常来说 每次推送的消息 设备ID和用户ID合起来唯一的结果其他服务业务数据存在问题有些旧数据没有清除 导致相通的设备ID用户ID 一次给设备用户推送了十几条安卓客户端当当当的响 直接惊动了产品经理 经过排查上是上游数据有问题代码又很老其他服务负责人排查了好几天 把问题数据清楚了 结果后面又产生了问题数据 解决方案由于会一次性处理几万条推送消息因此对业务要求速度高因此利用Redis的String的setNx, 以taskId mobileDevId userId tenantId 组成了唯一key,若是存在则不处理 key有限时间为60分钟 就成功处理了该问题吐槽上游的业务问题让下游服务做业务保证属实离谱 { “taskId” “xxxx”; “mobileDevId” : “xxxx”; “userId”:“xxx”; “tenantId” : “xxx”; “其他字段”: “…” } RabbitMQ发布确认与返回 SpringBoot发布确认与返回 配置 第二个参数因为过时所以要配置第三个参数为correlated表示用来确认消息 #生产者 spring.rabbitmq.publisher-returnstrue spring.rabbitmq.publisher-confirmstrue spring.rabbitmq.publisher-confirm-typecorrelated生产者 通过RabbitTempldate#setConfirmCallback设置确认回调 即交换器发送给ack给生产者生产者调用ConfirmCallback回调 若出现异常cause则可重新推送 通过RabbitTempldate#setReturnCallback设置返回回调通过template#waitForConfirms(xxx)表示等待xxx毫秒后确认超时返回false; 若返回false, 则进行业务补救处理 public class ConfirmProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{Autowiredprivate RabbitTemplate template;Autowiredprivate DirectExchange confirmExchange;AtomicInteger index new AtomicInteger(0);AtomicInteger count new AtomicInteger(0);private final String[] keys {sms, mail};Scheduled(fixedDelay 1000, initialDelay 500)public void send() throws IOException {//短信String sms {userName: xxx; phone:xxx};HashMapString, Object map new HashMap();map.put(userName, hanxin);map.put(phone, index.getAndIncrement());template.setMandatory(true);template.setConfirmCallback(this);template.setReturnCallback(this);template.convertAndSend(confirmExchange.getName(), confirm, map);System.out.println(send sms confirm);}Scheduled(fixedDelay 1000, initialDelay 500)public void send2() throws IOException {template.invoke((operations) - {//短信String sms {userName: xxx; phone:xxx};HashMapString, Object map new HashMap();map.put(userName, hanxin);map.put(phone, index.getAndIncrement());//必须设置template.setMandatory(true);template.convertAndSend(confirmExchange.getName(), confirm, map);System.out.println(send sms confirm);return template.waitForConfirms(1000);});}Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(receive confirm callback, ack ack);}Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(receive return call : message: message.getBody());System.out.println(receive return call : replyCode: replyCode);System.out.println(receive return call : replyText: replyText);System.out.println(receive return call : exchange: exchange);System.out.println(receive return call : routingKey: routingKey);} }业务开发中非严谨追求性能高业务建议使用send,这个过程是异步确认的 严谨业务建议使用send2, 同步等待相应出现问题好确认 交换机: Configuration public class ConfirmConfig {public final static String CONFIRM_QUEUE_NAME confirmQueue;public final static String CONFIRM_EXCHANGE_NAME confirmExchange;public final static String CONFIRM_ROUTING_NAME confirm;Beanpublic DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}Beanpublic ConfirmProducer confirmProducer() {return new ConfirmProducer();} }运行结果 receive status : true RabbitMQ发布确认 若是使用原生Rabbit MQ客户端API则有三种方式 声明channel是需要交换机确认的 channel.confirmSelect();发布单条消息 for (int i 0; i MESSAGE_COUNT; i) {String body String.valueOf(i);channel.basicPublish(, queue, null, body.getBytes());channel.waitForConfirmsOrDie(5_000); }channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待RabbitMQ给出一个响应用来表明这个消息已经正确发送到了RabbitMQ服务端。但是要注意这个方法会同步阻塞channel在等待确认期间channel将不能再继续发送消息也就是说会明显降低集群的发送速度即吞吐量。 官方说明了其实channel底层是异步工作的会将channel阻塞住然后异步等待服务端发送一个确认消息才解除阻塞。但是我们在使用时可以把他当作一个同步工具来看待。利用一个异步转同步功能可利用JUC实现 然后如果到了超时时间还没有收到服务端的确认机制那就会抛出异常。然后通常处理这个异常的方式是记录错误日志或者尝试重发消息但是尝试重发时一定要注意不要使程序陷入死循环。 发送批量消息 条确认的机制会对系统的吞吐量造成很大的影响所以稍微中和一点的方式就是发送一批消息后再一起确认 int batchSize 100;int outstandingMessageCount 0;long start System.nanoTime();for (int i 0; i MESSAGE_COUNT; i) {String body String.valueOf(i);ch.basicPublish(, queue, null, body.getBytes());outstandingMessageCount;if (outstandingMessageCount batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount 0;}}if (outstandingMessageCount 0) {ch.waitForConfirmsOrDie(5_000);}存在隐藏问题若是500条消息处理太久超时了则响应失败消息重新入队、出现重新消费问题 异步确认消息 实现的方式也比较简单Producer在channel中注册监听器来对消息进行确认。核心代码就是一个 channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);这三种确认机制都能够提升Producer发送消息的安全性。通常情况下第三种异步确认机制的性能是最好的。第一种安全性最高。 消费者确认 当交换机接受消息后就要转发给消费者如何保证消息不丢失重复消费 SpringBoot消费确认 自动确认 若是业务中一些消息发送给消费者若是消息出现异常消费者返回通知交换机消息出现了异常交换机会将消息重新入队 若是没有确认消息交换机没有收到消息会将消息会重新放入队列中每次消费者启动都会把以前消费的消息重新消费 SpringBoot整合RabbitMQ后 设置参数max-attempts为最大重试次数、retry.enabled为开启重试机制 spring.rabbitmq.listener.direct.retry.max-attempts5 spring.rabbitmq.listener.direct.retry.enabledtrue 为什么要开启重试 不开启重试、消费者处理消息发生异常后 RabbitMQ会丢弃该消息 通常业务开发中是不允许的。 为什么设置重试次数 不开启重试次数则消息会一直重新入队占用内存若是错误消息过多RabbitMQ内存爆了 手动确认 在手动确认的模式下不管是消费成功还是消费失败一定要记得确认消息不然消息会一直处于unack状态直到消费者进程重启或者停止。 设置参数 spring.rabbitmq.listener.direct.acknowledge-modemanual spring.rabbitmq.listener.simple.acknowledge-modemanual消费者 通过使用channel#basicAck, basicNack, basicReject完成 RabbitListener(queues ConfirmConfig.CONFIRM_QUEUE_NAME) public class ConfirmConsumer {// RabbitHandler : 标记的方法只能有一个参数类型为String ,若是传Map参数、则需要传入map参数// RabbitListener:标记的方法可以传入Channel Message参数RabbitListener(queues ConfirmConfig.CONFIRM_QUEUE_NAME)public void listenObjectQueue(Channel channel, Message message, MapString, Object msg) throws IOException {System.out.println(接收到object.queue的消息 msg);System.out.println(消息ID message.getMessageProperties().getDeliveryTag());try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (IOException exception) {//拒绝确认消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);//拒绝消息 // channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}RabbitHandlerpublic void listenObjectQueue2(MapString, Object msg) throws IOException {System.out.println(接收到object.queue的消息 msg);}} 确认收到一个或多个消息 void basicAck(long deliveryTag, boolean multiple) throws IOException;deliveryTag 消息传递标识multiple是否批量确认为true则确认后其他消息deliveryTag小于当前消息的deliveryTag的消息全部变为确认(慎重) 拒绝一个或多个消息 void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;deliveryTag消息的传递标识。multiple 如果为true则拒绝所有consumer获得的小于deliveryTag的消息。(慎重)requeue 设置为true 会把消费失败的消息从新添加到队列的尾端设置为false不会重新回到队列。 拒绝一个消息 void basicReject(long deliveryTag, boolean requeue) throws IOException;deliveryTag消息的传递标识。requeue 设置为false 表示不再重新入队如果配置了死信队列则进入死信队列。 channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息而basicReject一次只能拒绝一条消息。 个人还是推荐手动确认可控性更高 SpringBoot消费者手动确认调用的API与RabbitMQClient原生API一致都是通过这三个方法完成确认操作 业务开发中RabbitHandler注解用的少因为注解标记的方法只能传入 消息内容参数 无法传Channel, Message, 获取到的消息有限, 而RabbitListener则相反
http://www.dnsts.com.cn/news/170255.html

相关文章:

  • 长沙市网站推广哪家专业wordpress分类二级域名
  • 网站结构优点做网站配置
  • 免费单页网站建设集团网站开发
  • 什么网站可以找人做系统广州金融网站建设
  • 安庆市大观区城乡建设局网站emlog 迁移Wordpress
  • 珠海市建设工程造价协会网站做书网站
  • 企业自己做网站的成本网站的更新与维护
  • 徐州 商城网站建设江北区网络推广技巧
  • 网页qq登录首页乐清seo公司
  • 高明网站设计服务全国知名网站排名
  • 上海智能网站建设公司旅游网站模板库
  • wordpress电源模板青岛做优化网站哪家好
  • 玉环网站制作素材网视频
  • 用地方名字做网站品牌建设典型案例材料
  • 易语言网站建设购买建立网站费怎么做会计凭证
  • 网站开发的开发语言详情页设计模板网站
  • 素材下载网站源码网店运营与管理
  • 俄语网站看摄影作品的网站
  • 物流网站模板友链网站
  • 电商网站设计培训建设工程协会网站查询系统
  • 做外贸网站怎么样网站流量如何增加
  • 天津市北辰区建设与管理局网站企业形象设计论文2000字
  • 给公众号做头像的网站做ppt的网站叫什么
  • 华为云网站定制seo服务如何收费
  • 怎么做相册的网站搭建个网站多少钱
  • 课程资源网站开发解决方案清远最新闻
  • 当当网的网站怎么做的建设一个企业网站
  • 正规网站建设团队是什么dedecms 子网站
  • 城乡建设查询网站手机网站营销方法
  • 做网站前端需要懂得移动电子商务的概念