怎么在濮阳网站做宣传,wordpress人体时钟,南阳企业网站seo,html怎么做商品页面知识小科普
在此之前#xff0c;简单说明下基于RabbitMQ实现延时队列的相关知识及说明下延时队列的使用场景。
延时队列使用场景
在很多的业务场景中#xff0c;延时队列可以实现很多功能#xff0c;此类业务中#xff0c;一般上是非实时的#xff0c;需要延迟处理的简单说明下基于RabbitMQ实现延时队列的相关知识及说明下延时队列的使用场景。
延时队列使用场景
在很多的业务场景中延时队列可以实现很多功能此类业务中一般上是非实时的需要延迟处理的需要进行重试补偿的。
订单超时关闭在支付场景中一般上订单在创建后30分钟或1小时内未支付的会自动取消订单。短信或者邮件通知在一些注册或者下单业务时需要在1分钟或者特定时间后进行短信或者邮件发送相关资料的。本身此类业务于主业务是无关联性的一般上的做法是进行异步发送。重试场景比如消息通知在第一次通知出现异常时会在隔几分钟之后进行再次重试发送。
RabbitMQ实现延时队列
本身在RabbitMQ中是未直接提供延时队列功能的但可以使用 TTL(Time-To-Live存活时间) 和 DLX(Dead-Letter-Exchange 死信队列交换机)的特性实现延时队列的功能。
存活时间Time-To-Live 简称 TTL
RabbitMQ中可以对队列和消息分别设置TTLTTL表明了一条消息可在队列中存活的最大时间。当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时这条消息会在TTL时间后死亡成为Dead Letter。如果既配置了消息的TTL又配置了队列的TTL那么较小的那个值会被取用。
死信交换Dead Letter Exchanges 简称 DLX
上个知识点也提到了设置了 TTL 的消息或队列最终会成为 Dead Letter 当消息在一个队列中变成死信之后它能被重新发送到另一个交换机中这个交换机就是DLX绑定此DLX的队列就是死信队列。
一个消息变成死信一般上是由于以下几种情况;
消息被拒绝
消息过期
队列达到了最大长度。所以通过 TTL 和 DLX 的特性可以模拟实现延时队列的功能。当队列中的消息超时成为死信后会把消息死信重新发送到配置好的交换机中然后分发到真实的消费队列。故简单来说我们可以创建2个队列一个队列用于发送消息一个队列用于消息过期后的转发的目标队列。
SpringBoot集成RabbitMQ实现延时队列实战
以下使用 SpringBoot 集成 RabbitMQ 进行实战说明在进行 http 消息通知时若通知失败地址不可用或者连接超时时将此消息转入延时队列中待特定时间后进行重新发送。
0.引入pom依赖 !-- rabbit --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency!-- 简化http操作 --dependencygroupIdcn.hutool/groupIdartifactIdhutool-http/artifactIdversion4.5.16/version/dependencydependencygroupIdcn.hutool/groupIdartifactIdhutool-json/artifactIdversion4.5.16/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency1.编写rabbitmq配置文件关键配置RabbitConfig.java
/**
*
* ClassName 类名RabbitConfig
* Description 功能说明
* p
* TODO
*/p
************************************************************************
* date 创建日期2019年7月17日
* author 创建人oKong
* version 版本号V1.0
*p
***************************修订记录*************************************
*
* 2019年7月17日 oKong 创建该类功能。
*
***********************************************************************
*/p
*/
Configuration
public class RabbitConfig {AutowiredConnectionFactory connectionFactory;/*** 消费者线程数 设置大点 大概率是能通知到的*/Value(${http.notify.concurrency:50})int concurrency;/*** 延迟队列的消费者线程数 可设置小点*/Value(${http.notify.delay.concurrency:20})int delayConcurrency;Beanpublic RabbitAdmin rabbitAdmin() {return new RabbitAdmin(connectionFactory);}Beanpublic DirectExchange httpMessageNotifyDirectExchange(RabbitAdmin rabbitAdmin) {//durable 是否持久化//autoDelete 是否自动删除即服务端或者客服端下线后 交换机自动删除DirectExchange directExchange new DirectExchange(ApplicationConstant.HTTP_MESSAGE_EXCHANGE,true,false);directExchange.setAdminsThatShouldDeclare(rabbitAdmin);return directExchange;}//设置消息队列Beanpublic Queue httpMessageStartQueue(RabbitAdmin rabbitAdmin) {/*创建接收队列4个参数name - 队列名称durable - false不进行持有化exclusive - true独占性autoDelete - true自动删除*/Queue queue new Queue(ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME, true, false, false);queue.setAdminsThatShouldDeclare(rabbitAdmin);return queue;}//队列绑定交换机Beanpublic Binding bindingStartQuene(RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageStartQueue) {Binding binding BindingBuilder.bind(httpMessageStartQueue).to(httpMessageNotifyDirectExchange).with(ApplicationConstant.HTTP_MESSAGE_START_RK);binding.setAdminsThatShouldDeclare(rabbitAdmin);return binding;}Beanpublic Queue httpMessageOneQueue(RabbitAdmin rabbitAdmin) {Queue queue new Queue(ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME, true, false, false);queue.setAdminsThatShouldDeclare(rabbitAdmin);return queue;}Beanpublic Binding bindingOneQuene(RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageOneQueue) {Binding binding BindingBuilder.bind(httpMessageOneQueue).to(httpMessageNotifyDirectExchange).with(ApplicationConstant.HTTP_MESSAGE_ONE_RK);binding.setAdminsThatShouldDeclare(rabbitAdmin);return binding;}//-------------设置延迟队列--开始--------------------Beanpublic Queue httpDelayOneQueue() {//name - 队列名称//durable - true//exclusive - false//autoDelete - falsereturn QueueBuilder.durable(http.message.dlx.one)//以下是重点当变成死信队列时会转发至 路由为x-dead-letter-exchange及x-dead-letter-routing-key的队列中.withArgument(x-dead-letter-exchange, ApplicationConstant.HTTP_MESSAGE_EXCHANGE).withArgument(x-dead-letter-routing-key, ApplicationConstant.HTTP_MESSAGE_ONE_RK).withArgument(x-message-ttl, 1*60*1000)//1分钟 过期时间单位毫秒当过期后 会变成死信队列之后进行转发.build();}//绑定到交换机上Beanpublic Binding bindingDelayOneQuene(RabbitAdmin rabbitAdmin, DirectExchange httpMessageNotifyDirectExchange, Queue httpDelayOneQueue) {Binding binding BindingBuilder.bind(httpDelayOneQueue).to(httpMessageNotifyDirectExchange).with(delay.one);binding.setAdminsThatShouldDeclare(rabbitAdmin);return binding;}//-------------设置延迟队列--结束--------------------//建议将正常的队列和延迟处理的队列分开//设置监听容器Bean(notifyListenerContainer)public SimpleRabbitListenerContainerFactory httpNotifyListenerContainer() {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手动ackfactory.setConnectionFactory(connectionFactory);factory.setPrefetchCount(1);factory.setConcurrentConsumers(concurrency);return factory;}// 设置监听容器Bean(delayNotifyListenerContainer)public SimpleRabbitListenerContainerFactory httpDelayNotifyListenerContainer() {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手动ackfactory.setConnectionFactory(connectionFactory);factory.setPrefetchCount(1);factory.setConcurrentConsumers(delayConcurrency);return factory;}
}ApplicationConstant.java
public class ApplicationConstant {/*** 发送http通知的 exchange 队列*/public static final String HTTP_MESSAGE_EXCHANGE http.message.exchange;/*** 配置消息队列和路由key值*/public static final String HTTP_MESSAGE_START_QUEUE_NAME http.message.start;public static final String HTTP_MESSAGE_START_RK rk.start;public static final String HTTP_MESSAGE_ONE_QUEUE_NAME http.message.one;public static final String HTTP_MESSAGE_ONE_RK rk.one;/*** 通知队列对应的延迟队列关系即过期队列之后发送到下一个的队列信息,可以根据实际情况添加当然也可以根据一定规则自动生成*/public static final MapString,String delayRefMap new HashMapString, String() {/*** */private static final long serialVersionUID -779823216035682493L;{put(HTTP_MESSAGE_START_QUEUE_NAME, delay.one);}};
}简单来说就是创建一个正常消息发送队列用于接收http消息请求的参数同时进行http请求。同时创建一个延时队列设置其 x-dead-letter-exchange 、x-dead-letter-routing-key 和 x-message-ttl 值将其转发到正常的队列中。使用一个map对象维护一个关系当正常消息异常时需要发送的延时队列的队列名称当然时间场景汇总根据需要可以进行动态配置或者根据一定规则进行动态映射。
2.创建监听类用于消息的消费操作此处使用RabbitListener来消费消息当然也可以使用SimpleMessageListenerContainer进行消息配置的创建了一个正常消息监听和延时队列监听由于一般上异常通知是低概率事件可根据不同的监听容器进行差异化配置。
/**
*
* ClassName 类名HttpMessagerLister
* Description 功能说明http通知消费监听接口
* p
* TODO
*/p
************************************************************************
* date 创建日期2019年7月17日
* author 创建人oKong
* version 版本号V1.0
*p
***************************修订记录*************************************
*
* 2019年7月17日 oKong 创建该类功能。
*
***********************************************************************
*/p
*/
Component
Slf4j
public class HttpMessagerLister {AutowiredHttpMessagerService messagerService;RabbitListener(id httpMessageNotifyConsumer, queues {ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME}, containerFactory notifyListenerContainer)public void httpMessageNotifyConsumer(Message message, Channel channel) throws Exception {doHandler(message, channel);}RabbitListener(id httpDelayMessageNotifyConsumer, queues {ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME,}, containerFactory delayNotifyListenerContainer)public void httpDelayMessageNotifyConsumer(Message message, Channel channel) throws Exception {doHandler(message, channel);}private void doHandler(Message message, Channel channel) throws Exception {String body new String(message.getBody(),utf-8);String queue message.getMessageProperties().getConsumerQueue();log.info(接收到通知请求{}队列名{},body, queue);//消息对象转换try {HttpEntity httpNotifyDto JSONUtil.toBean(body, HttpEntity.class);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//发送通知messagerService.notify(queue, httpNotifyDto);} catch(Exception e) {log.error(e.getMessage());//ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
}HttpMessagerService.java 消息真正处理的类此类是关键这里未进行日志记录真实场景中强烈建议进行消息通知的日志存储防止日后信息的查看同时也能通过发送状态在重试次数都失败后进行定时再次发送功能同时也有据可查。
Component
Slf4j
public class HttpMessagerService {AutowiredAmqpTemplate mqTemplate; public void notify(String queue,HttpEntity httpEntity) {//发起请求log.info(开始发起http请求:{}, httpEntity);try {switch(httpEntity.getMethod().toLowerCase()) {case POST:HttpUtil.post(httpEntity.getUrl(), httpEntity.getParams());break;case GET:default:HttpUtil.get(httpEntity.getUrl(), httpEntity.getParams());}} catch (Exception e) {//发生异常放入延迟队列中String nextRk ApplicationConstant.delayRefMap.get(queue);if(ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME.equals(queue)) {//若已经是最后一个延迟队列的消息队列了则后续可直接放入数据库中 待后续定时策略进行再次发送log.warn(http通知已经通知N次失败进入定时进行发起通知,url{}, httpEntity.getUrl());} else {log.warn(http重新发送通知{}, 通知队列rk为{}, 原队列{}, httpEntity.getUrl(), nextRk, queue);mqTemplate.convertAndSend(ApplicationConstant.HTTP_MESSAGE_EXCHANGE, nextRk, cn.hutool.json.JSONUtil.toJsonStr(httpEntity));}}}
}3.创建控制层服务真实场景中如SpringCloud微服务中一般上是创建个api接口供其他服务进行调用
Slf4j
RestController
Api(tags http测试接口)
public class HttpDemoController {AutowiredAmqpTemplate mqTemplate;PostMapping(/send)ApiOperation(valuesend,notes 发送http测试)public String sendHttp(RequestBody HttpEntity httpEntity) {//发送http请求log.info(开始发起http请求发布异步消息{}, httpEntity);mqTemplate.convertAndSend(ApplicationConstant.HTTP_MESSAGE_EXCHANGE, ApplicationConstant.HTTP_MESSAGE_START_RK, cn.hutool.json.JSONUtil.toJsonStr(httpEntity));return 发送成功url httpEntity.getUrl(); }
}4.配置文件添加RabbitMQ相关配置信息
spring.rabbitmq.host127.0.0.1
spring.rabbitmq.port5672
spring.rabbitmq.usernameguest
spring.rabbitmq.passwordguest
spring.rabbitmq.virtual-host/# 通知-消费者线程数 设置大点 大概率是能通知到的
http.notify.concurrency150
# 延迟队列的消费者线程数 可设置小点
http.notify.delay.concurrency105.编写启动类。
SpringBootApplication
Slf4j
public class DelayQueueApplication {public static void main(String[] args) throws Exception {SpringApplication.run(DelayQueueApplication.class, args);log.info(spring-boot-rabbitmq-delay-queue-chapter38服务启动!);}
}6.启动服务。使用swagger进行简单调用测试。
正常通知 2019-07-20 23:52:23.792 INFO 65216 --- [nio-8080-exec-1] c.l.l.s.c.controller.HttpDemoController : 开始发起http请求发布异步消息HttpEntity(urlwww.baidu.com, params{a1}, methodget)
2019-07-20 23:52:23.794 INFO 65216 --- [TaskExecutor-97] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知请求{method:get,params:{a:1},url:www.baidu.com}队列名http.message.start
2019-07-20 23:52:23.794 INFO 65216 --- [TaskExecutor-97] c.l.l.s.c.service.HttpMessagerService : 开始发起http请求:HttpEntity(urlwww.baidu.com, params{a1}, methodget)异常通知访问一个不存在的地址
2019-07-20 23:53:14.699 INFO 65216 --- [nio-8080-exec-4] c.l.l.s.c.controller.HttpDemoController : 开始发起http请求发布异步消息HttpEntity(urlwww.baidu.com1, params{a1}, methodget)
2019-07-20 23:53:14.705 INFO 65216 --- [TaskExecutor-84] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知请求{method:get,params:{a:1},url:www.baidu.com1}队列名http.message.start
2019-07-20 23:53:14.705 INFO 65216 --- [TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService : 开始发起http请求:HttpEntity(urlwww.baidu.com1, params{a1}, methodget)
2019-07-20 23:53:14.706 WARN 65216 --- [TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService : http重新发送通知www.baidu.com1, 通知队列rk为delay.one, 原队列http.message.start在 RabbitMQ 后台中可以看见 http.message.dlx.one 队列中存在这需要延时处理的消息在一分钟后会转发至 http.message.one 队列中。 在一分钟后可以看见消息本再次消费了。
2019-07-20 23:54:14.722 INFO 65216 --- [TaskExecutor-16] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知请求{method:get,params:{a:1},url:www.baidu.com1}队列名http.message.one
2019-07-20 23:54:14.723 INFO 65216 --- [TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService : 开始发起http请求:HttpEntity(urlwww.baidu.com1, params{a1}, methodget)
2019-07-20 23:54:14.723 WARN 65216 --- [TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService : http通知已经通知N次失败进入定时进行发起通知,urlwww.baidu.com1一些最佳实践
在正式场景中一般上补偿或者重试机制大概率是不会发送的倘若发生时一般上是第三方业务系统出现了问题故一般上在进行补充时应该在非高峰期进行操作故应该对延时监听器应该在高峰期时停止消费在非高峰期时进行消费。同时还可以根据不同的通知类型放入不一样的延时队列中保障业务的正常。这里简单说明下动态停止或者启动演示监听器的方式。一般上是使用RabbitListenerEndpointRegistry 对象获取延时监听器之后进行动态停止或者启用。可设置 RabbitListener 的id属性直接进行获取当然也可以直接获取所有的监听器进行自定义判断了。 AutowiredRabbitListenerEndpointRegistry registry;GetMapping(/set)ApiOperation(value set, notes 设置消息监听器的状态)public String setSimpleMessageListenerContainer(String status) {if(1.equals(status)) {registry.getListenerContainer(httpDelayMessageNotifyConsumer).start();} else {registry.getListenerContainer(httpDelayMessageNotifyConsumer).stop();}return status;}这里只是简单进行演示说明在真实场景下可以使用定时器判断当前是否为高峰期进而进行动态设置监听器的状态。
参考资料
https://www.rabbitmq.com/admin-guide.htmlhttps://www.rabbitmq.com/ttl.html