四川移动网站建设,网站建设wang.cd,手机图片生成网页链接,管理系统软件有哪些一、延迟队列概念延时队列中#xff0c;队列内部是有序的#xff0c;最重要的特性就体现在它的延时属性上#xff0c;延时队列中的元素是希望 在指定时间到了以后或之前取出和处理。简单来说#xff0c;延时队列就是用来存放需要在指定时间内被处理的 元素的队列。其实延迟…一、延迟队列概念延时队列中队列内部是有序的最重要的特性就体现在它的延时属性上延时队列中的元素是希望 在指定时间到了以后或之前取出和处理。简单来说延时队列就是用来存放需要在指定时间内被处理的 元素的队列。其实延迟队列就是死信队列的一种。二、延迟队列使用场景 订单在十分钟之内未支付则自动取消 新创建的店铺如果在十天内都没有上传过商品则自动发送消息提醒用户注册成功后如果三天内没有登陆则进行短信提醒用户发起退款如果三天内没有得到处理则通知相关运营人员预定会议后需要在预定的时间点前十分钟通知各个与会人员参加会议这些场景都有一个特点需要在某个事件发生之后或者之前的指定时间点完成某一项任务。如 发生订单生成事件在十分钟之后检查该订单支付状态然后将未支付的订单进行关闭。看起来似乎只需要 使用定时任务一直轮询数据每秒查一次取出需要被处理的数据然后处理就行。如果 数据量比较少确实可以这样做比如对于“如果账单一周内未支付则进行自动结算”这样的需求 如果对于时间不是严格限制而是宽松意义上的一周那么每天晚上跑个定时任务检查一下所有未支 付的账单确实也是一个可行的方案。但对于数据量比较大并且时效性较强的场景如“订单十 分钟内未支付则关闭“短期内未支付的订单数据可能会有很多活动期间甚至会达到百万甚至千万 级别对这么庞大的数据量仍旧使用轮询的方式显然是不可取的很可能在一秒内无法完成所有订单 的检查同时会给数据库带来很大压力无法满足业务要求而且性能低下。三、RabbitMQ 中的 TTL TTL 是什么呢TTL 是 RabbitMQ 中一个消息或者队列的属性表明一条消息或者该队列中的所有 消息的最大存活时间 单位是毫秒。换句话说如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列那么这 条消息如果在 TTL 设置的时间内没有被消费则会成为死信。如果同时配置了队列的TTL 和消息的 TTL那么较小的那个值将会被使用有两种方式设置 TTL。四、队列TTL4.1 代码架构图创建两个队列 QA 和 QB两者队列 TTL 分别设置为 10S 和 40S然后在创建一个交换机 X 和死信交换机 Y它们的类型都是 direct创建一个死信队列 QD它们的绑定关系如下4.2 配置类这期我们整合了springboot将交换机和队列的声明放到了配置类中。Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE X;public static final String QUEUE_A QA;public static final String QUEUE_B QB;public static final String Y_DEAD_LETTER_EXCHANGE Y;public static final String DEAD_LETTER_QUEUE QD;// 声明 xExchangeBean(xExchange)public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}// 声明 yExchangeBean(yExchange)public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列 A ttl 为 10s 并绑定到对应的死信交换机Bean(queueA)public Queue queueA(){MapString, Object args new HashMap(3);//声明当前队列绑定的死信交换机args.put(x-dead-letter-exchange, Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put(x-dead-letter-routing-key, YD);//声明队列的 TTLargs.put(x-message-ttl, 10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}// 声明队列 A 绑定 X 交换机Beanpublic Binding queueaBindingX(Qualifier(queueA) Queue queueA,Qualifier(xExchange) DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with(XA);}//声明队列 B ttl 为 40s 并绑定到对应的死信交换机Bean(queueB)public Queue queueB(){MapString, Object args new HashMap(3);//声明当前队列绑定的死信交换机args.put(x-dead-letter-exchange, Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put(x-dead-letter-routing-key, YD);//声明队列的 TTLargs.put(x-message-ttl, 40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}//声明队列 B 绑定 X 交换机Beanpublic Binding queuebBindingX(Qualifier(queueB) Queue queue1B,Qualifier(xExchange) DirectExchange xExchange){return BindingBuilder.bind(queue1B).to(xExchange).with(XB);}//声明死信队列 QDBean(queueD)public Queue queueD(){return new Queue(DEAD_LETTER_QUEUE);}//声明死信队列 QD 绑定关系Beanpublic Binding deadLetterBindingQAD(Qualifier(queueD) Queue queueD,Qualifier(yExchange) DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with(YD);}
}4.3 生产者Slf4j
RequestMapping(ttl)
RestController
public class SendMsgController {Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(sendMsg/{message})public void sendMsg(PathVariable String message){log.info(当前时间{},发送一条信息给两个 TTL 队列:{}, new Date(), message);rabbitTemplate.convertAndSend(X, XA, 消息来自 ttl 为 10S 的队列: message);rabbitTemplate.convertAndSend(X, XB, 消息来自 ttl 为 40S 的队列: message);}
}4.4 消费者Slf4j
Component
public class DeadLetterQueueConsumer {RabbitListener(queues QD)public void receiveD(Message message, Channel channel) throws IOException {String msg new String(message.getBody());log.info(当前时间{},收到死信队列信息{}, new Date().toString(), msg);}
}4.5 总结发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻第一条消息在 10S 后变成了死信消息然后被消费者消费掉第二条消息在 40S 之后变成了死信消息 然后被消费掉这样一个延时队列就打造完成了。不过如果这样使用的话岂不是每增加一个新的时间需求就要新增一个队列这里只有 10S 和 40S 两个时间选项如果需要一个小时后处理那么就需要增加 TTL 为一个小时的队列如果是预定会议室然 后提前通知这样的场景岂不是要增加无数个队列才能满足需求五、延时队列优化 5.1 代码架构图 在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间而是通过生产者生产消息时里指定延迟时间。5.2 配置类Component
public class MsgTtlQueueConfig {public static final String Y_DEAD_LETTER_EXCHANGE Y;public static final String QUEUE_C QC;//声明队列 C 死信交换机Bean(queueC)public Queue queueB(){MapString, Object args new HashMap(3);//声明当前队列绑定的死信交换机args.put(x-dead-letter-exchange, Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put(x-dead-letter-routing-key, YD);//没有声明 TTL 属性return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//声明队列 B 绑定 X 交换机Beanpublic Binding queuecBindingX(Qualifier(queueC) Queue queueC,Qualifier(xExchange) DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with(XC);}
}
5.3 生产者GetMapping(sendExpirationMsg/{message}/{ttlTime})
public void sendMsg(PathVariable String message,PathVariable String ttlTime) {rabbitTemplate.convertAndSend(X, XC, message, correlationData -{correlationData.getMessageProperties().setExpiration(ttlTime);return correlationData;});log.info(当前时间{},发送一条时长{}毫秒TTL信息给队列 C:{}, new Date(),ttlTime, message);
}
发起请求 http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000 http://localhost:8080/ttl/sendExpirationMsg/你好 2/20005.4 总结看起来似乎没什么问题但是在最开始的时候就介绍过如果使用在消息属性上设置 TTL 的方式消 息可能并不会按时“死亡“因为 RabbitMQ 只会检查第一个消息是否过期如果过期则丢到死信队列 如果第一个消息的延时时长很长而第二个消息的延时时长很短第二个消息并不会优先得到执行。六、Rabbitmq 插件实现延迟队列 上文中提到的问题确实是一个问题如果不能实现在消息粒度上的 TTL并使其在设置的 TTL 时间 及时死亡就无法设计成一个通用的延时队列。那如何解决呢接下来我们就去解决该问题。6.1 安装延时队列插件 在官网上下载 https://www.rabbitmq.com/community-plugins.html下载 rabbitmq_delayed_message_exchange 插件然后解压放置到 RabbitMQ 的插件目录。6.2 代码架构图在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange绑定关系如下:6.3 配置类在我们自定义的交换机中这是一种新的交换类型该类型消息支持延迟投递机制消息传递后并 不会立即投递到目标队列中而是存储在 mnesia(一个分布式数据系统)表中当达到投递时间时才 投递到目标队列中。Configuration
public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME delayed.queue;public static final String DELAYED_EXCHANGE_NAME delayed.exchange;public static final String DELAYED_ROUTING_KEY delayed.routingkey;Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}//自定义交换机 我们在这里定义的是一个延迟交换机Beanpublic CustomExchange delayedExchange() {MapString, Object args new HashMap();//自定义交换机的类型args.put(x-delayed-type, direct);return new CustomExchange(DELAYED_EXCHANGE_NAME, x-delayed-message, true, false,args);}Beanpublic Binding bindingDelayedQueue(Qualifier(delayedQueue) Queue queue,Qualifier(delayedExchange) CustomExchange delayedExchange) {return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}
6.4 生产者public static final String DELAYED_EXCHANGE_NAME delayed.exchange;
public static final String DELAYED_ROUTING_KEY delayed.routingkey;
GetMapping(sendDelayMsg/{message}/{delayTime})
public void sendMsg(PathVariable String message,PathVariable Integer delayTime) {rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,correlationData -{correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});log.info(当前时间{}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}, new Date(),delayTime, message);
}
6.5 消费者public static final String DELAYED_QUEUE_NAME delayed.queue;
RabbitListener(queues DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message){String msg new String(message.getBody());log.info(当前时间{},收到延时队列的消息{}, new Date().toString(), msg);
}发起请求 http://localhost:8080/ttl/sendDelayMsg/comeon baby1/20000 http://localhost:8080/ttl/sendDelayMsg/comeon baby2/2000 6.6 总结 延时队列在需要延时处理的场景下非常有用使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。另外通过 RabbitMQ 集群的特性可以很好的解决单点故障问题不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。 当然延时队列还有很多其它选择比如利用 Java 的 DelayQueue利用 Redis 的 zset利用 Quartz 或者利用 kafka 的时间轮这些方式各有特点,看需要适用的场景。