乌当区城乡建设局网站,ui网页设计课程总结,商城开发网站建设,做网站上传信息软件rabbitMQ安装插件rabbitmq-delayed-message-exchange
交换机由此type 表示组件安装成功 生产者发送消息时设置延迟值 消息在交换机滞纳至指定延迟后#xff0c;进入队列#xff0c;被消费者消费。 组件注解类#xff1a;
package com.esint.configs;import org.springfra…rabbitMQ安装插件rabbitmq-delayed-message-exchange
交换机由此type 表示组件安装成功 生产者发送消息时设置延迟值 消息在交换机滞纳至指定延迟后进入队列被消费者消费。 组件注解类
package com.esint.configs;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class DelayedQueueConfig {//交换机public static final String DELAYED_EXCHANGE_NAME delayed.exchange;//队列public static final String DELAYED_QUEUE_NAME delayed.queue;//routingKeypublic static final String DELAYED_ROUTING_KEY delayed.routingkey;/*** 基于插件声明一个自定义交换机* return*/Beanpublic CustomExchange delayedExchange(){//String name, String type, boolean durable, boolean autoDelete, MapString, Object arguments) {MapString, Object arguments new HashMap();arguments.put(x-delayed-type,direct);return new CustomExchange(DELAYED_EXCHANGE_NAME,x-delayed-message,true, false,arguments);}Beanpublic Queue delayedQueue(){return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();}Beanpublic Binding delayedQueueBindingDelayedExchange(Qualifier(delayedQueue) Queue delayedQueue,Qualifier(delayedExchange) CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}
生产者代码实现
package com.esint.controller;//发送延迟消息import com.esint.configs.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;Slf4j
RestController
RequestMapping(/ttl)
public class SendMesController {Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/sendDelayMsg/{message}/{delayTime})public void sendMsg(PathVariable String message,PathVariable Integer delayTime){log.info(当前时间{},发送一条ttl为{}ms的消息给延迟交换机转队列{},new Date().toString(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, mes-{mes.getMessageProperties().setDelay(delayTime);return mes;});}}
消费者实现
package com.esint.consumer;import com.esint.configs.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 基于插件的延时消息*/
Slf4j
Component
public class DelayQueueConsumer {//监听消息队列RabbitListener(queues DelayedQueueConfig.DELAYED_QUEUE_NAME)public void receiveDelayQueue(Message message){String msg new String(message.getBody());log.info(当前时间{} 收到延迟消息{},new Date().toString(),msg);}
}
测试
http://127.0.0.1:19092/ttl/sendDelayMsg/helloDelay1/30000 http://127.0.0.1:19092/ttl/sendDelayMsg/helloDelay2/3000 发送第一条消息helloDelay1 延迟30s 发送第二条消息helloDelay2 延迟3s 满足条件。 总结 阻塞层在交换机。 发送消息灵活设置时间现达到时间先被消费。 需要安装延时插件。