深圳市多语言网站建设公司,百度网站建设费用多少知乎,优化站点,成都网站建设开发价业务场景#xff1a;1.生成订单30分钟未支付#xff0c;则自动取消#xff0c;我们该怎么实现呢#xff1f;2.生成订单60秒后,给用户发短信1 安装rabbitMqwindows安装ubuntu中安装2 添加maven依赖!-- https://mvnrepository.com/artifact/org.springframework.boot/spr…业务场景1.生成订单30分钟未支付则自动取消我们该怎么实现呢2.生成订单60秒后,给用户发短信1 安装rabbitMqwindows安装ubuntu中安装2 添加maven依赖!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency3 在application.properties配置spring.application.namerabbitmq-hello
# 配置rabbbitMq
spring.rabbitmq.hostlocalhost
spring.rabbitmq.port5672
spring.rabbitmq.usernamespringCloud
spring.rabbitmq.password1234564 具体的实现rabbitmq本身是没有延迟发送的功能但是我们通过消息的TTLTime To Live来实现所谓TTL就是指消息的存活时间RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间也可以对每一个单独的消息做单独的设置。超过了这个时间我们认为这个消息就死了称之为死信。如果队列设置了消息也设置了那么会取小的。所以一个消息如果被路由到不同的队列中这个消息死亡的时间有可能不一样不同的队列设置。这里单讲单个消息的TTL因为它才是实现延迟任务的关键。我们可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间两者是一样的效果。只是expiration字段是字符串参数所以要写个int类型的字符串byte[] messageBodyBytes Hello, world!.getBytes();AMQP.BasicProperties properties newAMQP.BasicProperties();
properties.setExpiration(60*1000);
channel.basicPublish(my-exchange,routing-key, properties, messageBodyBytes);当上面的消息扔到队列中后过了60秒如果没有被消费它就死了。不会被消费者消费到。这个消息后面的没有“死掉”的消息对顶上来被消费者消费。死信在队列中并不会被删除和释放它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务还要靠Dead Letter Exchange。下面我大致解释一下Dead Letter Exchanges4.1 Dead Letter Exchanges一个消息在满足如下条件下会进死信路由记住这里是路由而不是队列一个路由可以对应很多队列。1.一个消息被Consumer拒收了并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里被其他消费者使用。2.上面的消息的TTL到了消息过期了。3.队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。Dead Letter Exchange其实就是一种普通的exchange和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了会自动触发消息的转发发送到Dead Letter Exchange中去。4.2 实现延迟队列延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建立2个队列一个用于发送消息一个用于消息过期后的转发目标队列大致原理如下图所示。生产者输出消息到Queue1并且这个消息是设置有有效时间的比如60s。消息会在Queue1中等待60s如果没有消费者收掉的话它就是被转发到Queue2Queue2有消费者收到处理延迟任务。接下来正式进入代码阶段代码声明交换机、队列以及他们的绑定关系importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;ConfigurationpublicclassRabbitMQConfig{publicstaticfinalString DELAY_EXCHANGE_NAME delay.queue.demo.business.exchange;//普通的交换机publicstaticfinalString DELAY_QUEUEA_NAME delay.queue.demo.business.queuea;//声明两个队列 A BpublicstaticfinalString DELAY_QUEUEB_NAME delay.queue.demo.business.queueb;publicstaticfinalString DELAY_QUEUEA_ROUTING_KEY delay.queue.demo.business.queuea.routingkey;publicstaticfinalString DELAY_QUEUEB_ROUTING_KEY delay.queue.demo.business.queueb.routingkey;publicstaticfinalString DEAD_LETTER_EXCHANGE delay.queue.demo.deadletter.exchange;//Dead Letter ExchangespublicstaticfinalString DEAD_LETTER_QUEUEA_ROUTING_KEY delay.queue.demo.deadletter.delay_10s.routingkey;//死信交换机publicstaticfinalString DEAD_LETTER_QUEUEB_ROUTING_KEY delay.queue.demo.deadletter.delay_60s.routingkey;publicstaticfinalString DEAD_LETTER_QUEUEA_NAME delay.queue.demo.deadletter.queuea;publicstaticfinalString DEAD_LETTER_QUEUEB_NAME delay.queue.demo.deadletter.queueb;// 声明延时ExchangeBean(delayExchange)publicDirectExchangedelayExchange(){returnnewDirectExchange(DELAY_EXCHANGE_NAME);}// 声明死信ExchangeBean(deadLetterExchange)publicDirectExchangedeadLetterExchange(){returnnewDirectExchange(DEAD_LETTER_EXCHANGE);}// 声明延时队列A 延时10s// 并绑定到对应的死信交换机Bean(delayQueueA)publicQueuedelayQueueA(){MapString,Object args newHashMap(2);// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机args.put(x-dead-letter-exchange, DEAD_LETTER_EXCHANGE);// x-dead-letter-routing-key 这里声明当前队列的死信路由keyargs.put(x-dead-letter-routing-key, DEAD_LETTER_QUEUEA_ROUTING_KEY);// x-message-ttl 声明队列的TTLargs.put(x-message-ttl,1000*10);returnQueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();}// 声明延时队列B 延时 60s// 并绑定到对应的死信交换机Bean(delayQueueB)publicQueuedelayQueueB(){MapString,Object args newHashMap(2);// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机args.put(x-dead-letter-exchange, DEAD_LETTER_EXCHANGE);// x-dead-letter-routing-key 这里声明当前队列的死信路由keyargs.put(x-dead-letter-routing-key, DEAD_LETTER_QUEUEB_ROUTING_KEY);// x-message-ttl 声明队列的TTLargs.put(x-message-ttl,60000);returnQueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();}// 声明死信队列A 用于接收延时10s处理的消息Bean(deadLetterQueueA)publicQueuedeadLetterQueueA(){returnnewQueue(DEAD_LETTER_QUEUEA_NAME);}// 声明死信队列B 用于接收延时60s处理的消息Bean(deadLetterQueueB)publicQueuedeadLetterQueueB(){returnnewQueue(DEAD_LETTER_QUEUEB_NAME);}// 声明延时队列A绑定关系BeanpublicBindingdelayBindingA(Qualifier(delayQueueA)Queue queue,Qualifier(delayExchange)DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);}// 声明业务队列B绑定关系BeanpublicBindingdelayBindingB(Qualifier(delayQueueB)Queue queue,Qualifier(delayExchange)DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);}// 声明死信队列A绑定关系BeanpublicBindingdeadLetterBindingA(Qualifier(deadLetterQueueA)Queue queue,Qualifier(deadLetterExchange)DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);}// 声明死信队列B绑定关系BeanpublicBindingdeadLetterBindingB(Qualifier(deadLetterQueueB)Queue queue,Qualifier(deadLetterExchange)DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);}}消息的生产者importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importorg.springframework.stereotype.Service;importstaticcom.talent.infocenter.rabbitmq.RabbitMQConfig.*;ServicepublicclassDelayMessageSender{AutowiredprivateRabbitTemplate rabbitTemplate;publicenumDelayTypeEnum{DELAY_10s,DELAY_60s;}publicstaticDelayTypeEnumgetByIntValue(int value){switch(value){case10:returnDelayTypeEnum.DELAY_10s;case60:returnDelayTypeEnum.DELAY_60s;default:returnnull;}}publicvoidsendMsg(String msg,DelayTypeEnum type){switch(type){caseDELAY_10s:rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg);break;caseDELAY_60s:rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg);break;}}}消费者我们创建两个消费者分别消费10s和60s的订单importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.IOException;importjava.util.Date;importstaticcom.talent.infocenter.rabbitmq.RabbitMQConfig.DEAD_LETTER_QUEUEA_NAME;importstaticcom.talent.infocenter.rabbitmq.RabbitMQConfig.DEAD_LETTER_QUEUEB_NAME;Slf4jComponentpublicclassDeadLetterQueueConsumer{RabbitListener(queues DEAD_LETTER_QUEUEA_NAME)publicvoidreceiveA(Message message,Channel channel)throwsIOException{String msg newString(message.getBody());log.info(当前时间{},死信队列A收到消息{},newDate().toString(), msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}RabbitListener(queues DEAD_LETTER_QUEUEB_NAME)publicvoidreceiveB(Message message,Channel channel)throwsIOException{String msg newString(message.getBody());log.info(当前时间{},死信队列B收到消息{},newDate().toString(), msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}创建一个接口进行测试importcom.talent.infocenter.rabbitmq.DelayMessageSender;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RequestMethod;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;importjava.util.Date;importjava.util.Objects;Slf4jRestControllerpublicclassRabbitMQMsgController{AutowiredprivateDelayMessageSender sender;RequestMapping(value sendmsg, method RequestMethod.GET)publicvoidsendMsg(RequestParam(value msg)String msg,RequestParam(value delayType)Integer delayType){log.info(当前时间{},收到请求msg:{},delayType:{},newDate(), msg, delayType);sender.sendMsg(msg,Objects.requireNonNull(DelayMessageSender.getByIntValue(delayType)));}}接下来开始测试我用的是swagger大家可以用postman等其他方法自行测试打开我们的rabbitmq后台就可以看到我们交换机和队列信息同样的方法我们创建一个60s之后才能消费的订单上面的实现仅能设置两个指定的时间10s和60s接下来我们设置任意时间的延迟队列4.3 RabbitMq的优化我们需要一种更通用的方案才能满足需求那么就只能将TTL设置在消息属性里了只有如此我们才能更加灵活的实现延迟队列的具体业务开发方法也很简单我们只需要增加一个延时队列用于接收设置为任意延时时长的消息同时增加一个相应的死信队列和routingkey但是该方法有个极大的弊端就是如果使用在消息属性上设置TTL的方式消息可能并不会按时“死亡“因为RabbitMQ只会检查第一个消息是否过期如果过期则丢到死信队列所以如果第一个消息的延时时长很长而第二个消息的延时时长很短则第二个消息并不会优先得到执行此处则不再进行编写代码但是为了解决这个问题我们将利用rabbitMq插件实现延迟队列。4.4 利用插件实现延迟队列4.4.1 下载插件下载完成之后进行解压此处推荐bandzip进行解压并且将解压之后的文件夹放到rabbitmq的安装目录下的plugins目录下进入到sbin目录下使用cmd执行以下指令来启用插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange执行以上步骤之后开始重启我们的rabbitmq服务1.进入到服务2.进入到sbin目录双击rabbitmq-server.bat验证是否重启成功访问http://localhost:15672如果能够访问成功说明重启成功4.4.2 编写代码重新创建一个配置类importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.CustomExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;ConfigurationpublicclassDelayedRabbitMQConfig{publicstaticfinalString DELAYED_QUEUE_NAME delay.queue.demo.delay.queue;publicstaticfinalString DELAYED_EXCHANGE_NAME delay.queue.demo.delay.exchange;publicstaticfinalString DELAYED_ROUTING_KEY delay.queue.demo.delay.routingkey;BeanpublicQueueimmediateQueue(){returnnewQueue(DELAYED_QUEUE_NAME);}BeanpublicCustomExchangecustomExchange(){MapString,Object args newHashMap();args.put(x-delayed-type,direct);returnnewCustomExchange(DELAYED_EXCHANGE_NAME,x-delayed-message,true,false, args);}BeanpublicBindingbindingNotify(Qualifier(immediateQueue)Queue queue,Qualifier(customExchange)CustomExchange customExchange){returnBindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();}}新建一个消息的发送者importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importstaticcom.talent.infocenter.rabbitMq.DelayedRabbitMQConfig.DELAYED_EXCHANGE_NAME;importstaticcom.talent.infocenter.rabbitMq.DelayedRabbitMQConfig.DELAYED_ROUTING_KEY;ServicepublicclassProvider{AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsendDelayMsg(String msg,Integer delayTime){rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a -{a.getMessageProperties().setDelay(delayTime*1000);return a;});}}新建一个消息的消费者importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.IOException;importjava.util.Date;importstaticcom.talent.infocenter.rabbitMq.DelayedRabbitMQConfig.DELAYED_QUEUE_NAME;Slf4jComponentpublicclassConsumer{RabbitListener(queues DELAYED_QUEUE_NAME)publicvoidreceiveD(Message message,Channel channel)throwsIOException{String msg newString(message.getBody());log.info(当前时间{},延时队列收到消息{},newDate().toString(), msg);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}修改我们之前的接口importcom.talent.api.utils.RedisUtils;importcom.talent.infocenter.rabbitMq.Provider;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RequestMethod;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;importjava.util.Date;importjava.util.Objects;Slf4jRestControllerpublicclassRabbitMQMsgController{AutowiredprivateProvider provider;RequestMapping(value sendmsg, method RequestMethod.GET)publicvoidsendMsg(RequestParam(value msg)String msg,RequestParam(value delayTime)Integer delayTime){log.info(当前时间{},收到请求msg:{},delayTime:{},newDate(), msg, delayTime);provider.sendDelayMsg(msg, delayTime);}}接下来开始测试再接着测试一下我们不同顺序的订单是否是按照时间顺序进行消费的我们将订单0002设置为60s,订单0003设置为15s,看看订单0003能否在0002之前消费结果显而易见订单0003确实在第15s的时候被消费掉5.总结延时队列在需要延时处理的场景下非常有用而且十分稳定使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外通过RabbitMQ集群的特性可以很好的解决单点故障问题不会因为单个节点挂掉导致延时队列不可用或者消息丢失。当然延时队列还有很多其它选择比如利用Java的DelayQueu利用Redis的zset利用Quartz或者利用kafka的时间轮这些方式各有特点。