关于网站备案,wordpress竖状导航,网站空间 jsp,wordpress添加导航首页在项目中#xff0c;我们经常需要使用消息队列来实现延迟任务#xff0c;本篇文章就向各位介绍使用RabbitMQ如何实现延迟消息发送#xff0c;由于是实战篇#xff0c;所以不会讲太多理论的知识#xff0c;还不太理解的可以先看看MQ的延迟消息的一个实现原理再来看这篇文章…在项目中我们经常需要使用消息队列来实现延迟任务本篇文章就向各位介绍使用RabbitMQ如何实现延迟消息发送由于是实战篇所以不会讲太多理论的知识还不太理解的可以先看看MQ的延迟消息的一个实现原理再来看这篇文章跟着练手哦~
需求背景
我这儿的一个需求背景大概是干部添加完活动后由管理员进行审批审批通过后该活动id连同设置的过期时间会被放入消息队列中等到活动结束时间到的时候自动将活动的状态设置为已完成这里华丽一个活动图各位参考一下。 需求了解完之后我们就可以开始的写代码啦~手动微笑
相关知识点拓展
这里还是简单提一下MQ实现延迟队列的一个方法一种是用插件还有一种是使用死信队列当然本文我们使用的就是通过死信队列来实现的。
当我们的一个正常消息因为设置了过期时间或者被消费者拒绝消费的时候这条消息就会被放入死信队列中然后死信队列再进行消费。
然后啰嗦一下说一下MQ的交换机类型以及死信交换机一般选用哪种 1. Direct Exchange直连交换机 特点 根据消息的 Routing Key 精确匹配队列的 Binding Key。完全匹配时消息才会被路由到对应的队列。适用场景 点对点消息传递消息需要精确路由到特定队列。示例 消息的 Routing Key 为 order.created队列的 Binding Key 也为 order.created则消息会被路由到该队列。 2. Fanout Exchange扇出交换机 特点 将消息广播到所有绑定到该交换机的队列忽略 Routing Key。适用场景 广播消息消息需要发送到多个队列。示例 消息发送到 Fanout Exchange所有绑定到该交换机的队列都会收到消息。 3. Topic Exchange主题交换机 特点 根据消息的 Routing Key 和队列的 Binding Key 进行模式匹配。Binding Key 支持通配符 *匹配一个单词。#匹配零个或多个单词。适用场景 消息需要根据模式路由到多个队列。示例 消息的 Routing Key 为 order.created.us队列的 Binding Key 为 order.created.*则消息会被路由到该队列。 4. Headers Exchange头交换机 特点 根据消息的 Headers键值对匹配队列的 Binding Arguments。忽略 Routing Key。适用场景 消息需要根据复杂的条件路由到队列。示例 消息的 Headers 包含 typeorder 和 regionus队列的 Binding Arguments 要求 x-matchall 且 typeorder则消息会被路由到该队列。 5. Default Exchange默认交换机 特点 RabbitMQ 默认创建的交换机类型为 Direct Exchange。每个队列都会自动绑定到默认交换机Binding Key 为队列名称。适用场景 默认情况下消息可以直接发送到队列。 死信交换机适合使用哪种类型 死信交换机DLX, Dead Letter Exchange的类型选择取决于你的业务需求。以下是常见的选择 1. Direct Exchange 适用场景 死信消息需要精确路由到特定的死信队列。示例 将死信消息路由到 dlx-queue用于统一处理所有死信消息。 2. Topic Exchange 适用场景 死信消息需要根据不同的 Routing Key 路由到不同的死信队列。示例 将死信消息根据业务类型如 order.dead、payment.dead路由到不同的死信队列。 3. Fanout Exchange 适用场景 死信消息需要广播到多个死信队列。示例 将死信消息同时发送到日志队列和报警队列。 推荐选择 大多数情况下死信交换机使用 Direct Exchange因为死信消息通常需要精确路由到一个死信队列用于统一处理。如果死信消息需要根据不同的条件路由到多个队列可以使用 Topic Exchange。 代码部分
首先我们需要定义一个死信交换机和死信队列用来接收来自普通队列的消息。
// 创建死信交换机处理延迟消息通知Bean(dead_letter_exchange)public DirectExchange delayExchange(){return new DirectExchange(dead_letter_exchange,true,false);}
// 创建死信队列public Queue deadLetterQueue(){Queue queue new Queue(dead_letter_queue, true);rabbitAdmin.declareQueue(queue);log.info(死信队列声明成功 queue.getName());return queue; }
然后我们需要配置一个普通的消息队列和一个普通的交换机这个消息队列需要设置对应的死信交换机和死信路由同时我们这个普通队列需要接收一个过期时间保证一到过期时间消息就会被发送到死信队列当中。
// 创建一个普通队列,接受一个过期时间出列活动结束后发送到死信队列public Queue normalQueue(Long expireTime){MapString,Object args new HashMap();if (expireTime ! null expireTime 0) { // 确保 TTL 是正数args.put(x-message-ttl, expireTime);}// 设置死信交换机args.put(x-dead-letter-exchange,deadLetterExchange);// 设置死信路由键args.put(x-dead-letter-routing-key,dead_letter_routing_key);Queue queue new Queue(normal_queue, true, false, false, args);log.info(普通队列声明成功 queue.getName());return queue; }
// 创建一个普通交换机处理活动结束自动设置活动状态为结束Bean(activity_end_exchange)public DirectExchange activityEndExchange(){return new DirectExchange(activity_end_exchange);}
然后我们需要分别将死信交换机和死信队列普通交换机和普通队列分别进行绑定。
// 将死信队列和死信交换机进行绑定public void bindDeadLetterRouting(){Queue queuequeueDeclareConfig.deadLetterQueue();Binding binding BindingBuilder.bind(queue).to(deadLetterExchange).with(dead_letter_routing_key);rabbitAdmin.declareBinding(binding);log.info(死信队列绑定成功死信队列名称----》 queue.getName() 死信交换机名称----》 deadLetterExchange.getName());}// 绑定活动结束交换机和普通队列public void bindActivityEndRouting(Long expireTime) {Queue queue queueDeclareConfig.normalQueue(expireTime);Binding binding BindingBuilder.bind(queue).to(activityEndExchange).with(activity_end_routing_key);rabbitAdmin.declareBinding(binding);}
当然我们还需要配置生产者来发送消息到交换机里面
//活动结束后发送消息到死信队列自动设置活动结束状态public void sendActivityEndMessage(Long expireTime, Integer activityId) {rabbitMQBindRoutingConfig.bindDeadLetterRouting();rabbitMQBindRoutingConfig.bindActivityEndRouting(expireTime);try {// 将消息发送到普通队列等待消息过期发送到死信交换机rabbitTemplate.convertAndSend(activity_end_exchange, activity_end_routing_key, activityId, msg - {msg.getMessageProperties().setExpiration(expireTime.toString());return msg;});} catch (Exception e) {log.error(发送消息失败------- activityId);throw new RuntimeException(发送消息失败---- activityId);}}
这里生产者的代码可以根据你的业务逻辑具体进行更改~
消费者逻辑也需要进行编写一下
// 使用MQ延迟队列活动结束修改活动状态RabbitListener(queues dead_letter_queue)public void updatePlaceOccupyStatus(Message message, Channel channel){try {String messageBody new String(message.getBody(), StandardCharsets.UTF_8);Integer activityId Integer.parseInt(messageBody);ActivityInfo activityInfo baseMapper.selectById(activityId);LambdaUpdateWrapperActivityInfo wrapper new LambdaUpdateWrapper();wrapper.eq(ActivityInfo::getActivityId,activityId).set(ActivityInfo::getProgress,StatusConstant.FINISH);if(baseMapper.update(activityInfo,wrapper)0){channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} catch (Exception e) {log.error(处理消息时发生错误 e.getMessage());try {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} catch (IOException ioException) {ioException.printStackTrace();}}
消费者这边需要注意的是如果你选择的提交类型不是自动提交的话在处理完消息之后需要手动ack一下消息不然消费的消息不会被认为已经消费从而导致消息积压也会在之后的消费中重复进行消费因此你需要告诉生产者这条消息已经被消费了。
当然如果在消费的过程中出现了什么问题可以设置以下这行代码 basicNack方法接收三个参数 deliveryTag: 消息的标识符。 multiple: 是否对多个消息进行否定确认。 requeue: 是否将消息重新放入队列。
可以根据你的需求进行设定~
然后的然后我们需要再application.yml当中进行配置相关信息
rabbitmq:host: localhostport: 5672username: guestpassword: guest
# 确认消息发送到交换机上publisher-confirm-type: correlated# 消息发送到队列确认,失败回调publisher-returns: truelistener:direct:acknowledge-mode: manualretry:enabled: true
# 重试的时间间隔为1sinitial-interval: 1000ms
# 最大重试3次max-attempts: 3
# 最大的重试时间间隔为2smax-interval: 2000ms
# 每次重试时间间隔为1s每次重试时间间隔倍数multiplier: 1.0#重试次数超过上面的设置之后是否丢弃false不丢弃时需要写相应代码将该消息加入死信队列default-requeue-rejected: falsesimple:default-requeue-rejected: falseacknowledge-mode: manual
# 最小消费者数量concurrency: 1
# 最大消费者数量max-concurrency: 10retry:enabled: trueinitial-interval: 1000msmax-attempts: 3max-interval: 2000msmultiplier: 1.0
上面给出了一个比较全的配置你可以根据你的需求进行选择但是需要注意的是default-requeue-rejected: false这一行配置一定要先配置不然你的消息在普通队列中过期了是不会发送到死信队列当中进行消费的~
到这儿基本上所有的代码都写的差不多了当然我们还需要再rabbitmq控制平台上分别建一个普通交换机和一个死信交换机一个普通队列和一个私信队列然后分别绑定就可以了。
注意的是普通交换机也需要在平台上配置一次死信队列和死信路由 到这儿如果没有什么问题的话基本上已经可以直接运行了所以我的这篇文章到这儿基本上也已经结束了如果你有什么问题可以评论区留言我们相互学习~