网站qq联系代码,福州哪家企业网站建设设计最高端,网站建设及管理制度,wordpress萨隆设置解决RabbitMQ设置TTL过期后不进入死信队列 问题发现问题解决方法一#xff1a;只监听死信队列#xff0c;在死信队列里面处理业务逻辑方法二#xff1a;改为自动确认模式 问题发现
最近再学习RabbitMQ过程中#xff0c;看到关于死信队列内容#xff1a; 来自队列的消息可… 解决RabbitMQ设置TTL过期后不进入死信队列 问题发现问题解决方法一只监听死信队列在死信队列里面处理业务逻辑方法二改为自动确认模式 问题发现
最近再学习RabbitMQ过程中看到关于死信队列内容 来自队列的消息可以是 “死信”这意味着当以下四个事件中的任何一个发生时这些消息将被重新发布到 Exchange。 使用 basic.reject 或 basic.nack 且 requeue 参数设置为 false 的使用者否定该消息消息由于每条消息的 TTL 而过期队列超出了长度限制消息返回到 quorum 队列的次数超过了 delivery-limit 的次数。 再模拟TTL过期时遇到的疑惑特此记录下来示例代码如下 先设置为手动应答模式
#手动应答
spring.rabbitmq.listener.simple.acknowledge-mode manual绑定队列示例代码如下
Configuration
public class MQConfig {/*** 死信队列* return*/Beanpublic Queue deadQueue(){return new Queue(dead_queue);}/*** 死信队列交换机* return*/Beanpublic DirectExchange deadExchange(){return new DirectExchange(dead.exchange);}/*** 死信队列和死信交换机绑定* return*/Beanpublic Binding deadBinding(){return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(dead);}/*** 普通队列* return*/Beanpublic Queue queue(){
// 方法一
// Queue normalQueue new Queue(normal_queue);
// normalQueue.addArgument(x-dead-letter-exchange, dead.exchange); // 死信队列
// normalQueue.addArgument(x-dead-letter-routing-key, dead); // 死信队列routingKey
// normalQueue.addArgument(x-message-ttl, 10000); // 死信队列routingKey
// 方法二return QueueBuilder.durable(normal_queue).deadLetterExchange(dead.exchange).deadLetterRoutingKey(dead).ttl(10000).build();}/*** 普通交换机* return*/Beanpublic DirectExchange normalExchange(){return new DirectExchange(normal.exchange);}/*** 普通队列和普通交换机绑定* return*/Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(normalExchange()).with(normal);}
}监听普通队列消费方示例代码如下
Component
RabbitListener(queues normal_queue)
public class MQReceiver {private static final Logger log LoggerFactory.getLogger(MQReceiver.class);RabbitHandlerpublic void receive(String msg, Message message, Channel channel) throws IOException, InterruptedException {log.info(收到消息msg);}
}监听死信队列消费方示例代码如下
Component
RabbitListener(queues dead_queue)
public class MQReceiver2 {private static final Logger log LoggerFactory.getLogger(MQReceiver2.class);RabbitHandlerpublic void receive(String msg, Message message, Channel channel) throws IOException {log.info(死信队列收到消息{},msg);// 参数一当前消息标签参数二true该条消息已经之前所有未消费设置为已消费false只确认当前消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}发送方向普通队列发送消息示例代码如下
Component
public class MQSender {private static final Logger log LoggerFactory.getLogger(MQSender.class);Autowiredprivate RabbitTemplate template;public void send() throws UnsupportedEncodingException {String msg hello world;log.info(发送消息msg);template.convertAndSend(normal.exchange, normal, msg);}
}执行结果如图 时间到了后死信队列长时间未收到消息消息一直在普通队列中如图所示 然后开始百度网上很多都说什么配置不对啥的还有说队列的预取值太大导致的问题扯犊子呢反正就是没有找到一个合理的解释。
然后吃了个饭回来发现RabbitMQ报了一个长时间未收到消息确认的错误大概意思就是说ACK消息确认超时时间为18000毫秒也就是30分钟原来RabbitMQ一直在等待消息确认所以一直被持有当普通队列挂了重启后被释放进入死信队列。 PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more 这下知道为什么不进入死信队列的原因了。新的问题又来了如果我手动确认或者拒绝了那不就达不到TTL过期的效果了吗
问题解决
方法一只监听死信队列在死信队列里面处理业务逻辑
这个方法是参考众多文章比较常见的一个做法但是个人感觉与我理解的TTL有偏差应该是在普通队列中处理超时的一种补偿机制如果只监听死信队列那就完全不需要在配置时普通队列里面定义死信队列虽然这种做法可以解决业务问题另一方面官方也有提到 消息可以在写入套接字之后过期但在到达消费者之前过期。 示例代码如下
Configuration
public class MQConfig {/*** 死信队列* return*/Beanpublic Queue deadQueue(){return new Queue(dead_queue);}/*** 死信队列交换机* return*/Beanpublic DirectExchange deadExchange(){return new DirectExchange(dead.exchange);}/*** 死信队列和死信交换机绑定* return*/Beanpublic Binding deadBinding(){return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(dead);}/*** 普通队列* return*/Beanpublic Queue queue(){
// 方法一
// Queue normalQueue new Queue(normal_queue);
// normalQueue.addArgument(x-dead-letter-exchange, dead.exchange); // 死信队列
// normalQueue.addArgument(x-dead-letter-routing-key, dead); // 死信队列routingKey
// normalQueue.addArgument(x-message-ttl, 10000); // 死信队列routingKey
// 方法二return QueueBuilder.durable(normal_queue).deadLetterExchange(dead.exchange).deadLetterRoutingKey(dead).ttl(10000).build();}/*** 普通交换机* return*/Beanpublic DirectExchange normalExchange(){return new DirectExchange(normal.exchange);}/*** 普通队列和普通交换机绑定* return*/Beanpublic Binding binding(){return BindingBuilder.bind(queue()).to(normalExchange()).with(normal);}}消费方只监听死信队列
Component
RabbitListener(queues dead_queue)
public class MQReceiver2 {private static final Logger log LoggerFactory.getLogger(MQReceiver2.class);RabbitHandlerpublic void receive(String msg, Message message, Channel channel) throws IOException {log.info(死信队列收到消息{},msg);// 伪代码判断订单状态1支付成功2支付超时
// if(order.state 1){
// // 参数一当前消息标签参数二true该条消息已经之前所有未消费设置为已消费false只确认当前消息
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// }else{
// // todo 修改订单状态
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
// }}
}发送方代码如下
Component
public class MQSender {private static final Logger log LoggerFactory.getLogger(MQSender.class);Autowiredprivate RabbitTemplate template;public void send() throws UnsupportedEncodingException {String msg hello world;log.info(发送消息msg);template.convertAndSend(normal.exchange, normal, msg);}
}调用send()方法执行结果如图 可以看到从发送时间到进入死信队列时间正好间隔10s。
方法二改为自动确认模式
经过思考后既然手动确认走不通那不如试一试自动模式我们在普通队列里面模拟业务出现异常情况如果只是单纯模拟业务超时不会进入死信队列直接就确认消费了。
我们先把手动确认的配置删除或者修改为自动确认示例代码如下
#spring.rabbitmq.listener.simple.acknowledge-mode auto发送方代码和配置的代码就不重复展示了参考之前示例消费方示例代码如下
Component
RabbitListener(queues normal_queue)
public class MQReceiver {private static final Logger log LoggerFactory.getLogger(MQReceiver.class);RabbitHandlerpublic void receive(String msg) throws IOException, InterruptedException {log.info(收到消息msg);throw new RuntimeException();}
}
Component
RabbitListener(queues dead_queue)
public class MQReceiver2 {private static final Logger log LoggerFactory.getLogger(MQReceiver2.class);RabbitHandlerpublic void receive(String msg) throws IOException {log.info(死信队列收到消息{},msg);// 伪代码判断订单状态1支付成功2支付超时
// if(order.state 1){
// // 参数一当前消息标签参数二true该条消息已经之前所有未消费设置为已消费false只确认当前消息
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// }else{
// // todo 修改订单状态
// channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
// }}
}调用send()方法执行结果如图 我们可以看到第一次进入普通队列时间和最后一次报错进入死信队列的时间正好间隔10s。但是这中间会重复发起N次不合理是时长可能会导致资源消耗过高但这又属于另外一个问题了。