网站流量高iis如何做负载均衡,wordpress黑色名片主题,卓博招聘人才网,wordpress 允许评论1. 实现方式
1. 设置队列过期时间#xff1a;延迟队列消息过期 死信队列#xff0c;所有消息过期时间一致
2. 设置消息的过期时间#xff1a;此种方式下有缺陷#xff0c;MQ只会判断队列第一条消息是否过期#xff0c;会导致消息的阻塞需要额外安装 rabbitmq_delayed_me…1. 实现方式
1. 设置队列过期时间延迟队列消息过期 死信队列所有消息过期时间一致
2. 设置消息的过期时间此种方式下有缺陷MQ只会判断队列第一条消息是否过期会导致消息的阻塞需要额外安装 rabbitmq_delayed_message_exchange 插件才能解决此问题导入Spring 集成RabbitMQ MAEVN
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactIdversion2.2.5.RELEASE/version
/dependency2. 设置队列过期时间延迟队列消息过期 死信队列 推送消息至延迟队列 - 消息过期自动推送到死信队列 - 消费死信队列 2.1. MQ配置信息
2.1.1. 自定义队列配置
…/bootstrap.yml
# rabbitmq自定义配置
rabbitmq:ttlExchange: medical_dev_ttl_topic_changettlKey: dev_ttlttlQueue: medical.dev.ttl.topic.queuedelayExpireTime: 600ttlQueueSize: 10000deadExchange: medical_dev_dead_topic_changedeadKey: dev_deaddeadQueue: medical.dev.dead.topic.queue2.1.2. 读取自定义MQ配置信息
/*** amqp配置文件*/
Data
Component
ConfigurationProperties(prefix rabbitmq)
public class MyConfigProperties {/*** 延迟队列*/public String ttlExchange;public String ttlKey;public String ttlQueue;private Integer delayExpireTime;public Integer ttlQueueSize;/*** 死信队列*/public String deadExchange;public String deadKey;public String deadQueue;}2.2. 配置文件自动生成队列
2.2.1. 延迟队列
import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.util.HashMap;/*** 延迟队列配置文件* * author mingAn.xie*/
Configuration
public class RabbitMQConfigTTL {ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机Beanpublic TopicExchange ttlTopicExchange(){return new TopicExchange(myConfigProperties.getTtlExchange());}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列会被存储在磁盘上当消息代理重启时仍然存在暂存队列当前连接有效// exclusive:默认也是false只能被当前创建的连接使用而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除当没有生产者或者消费者使用此队列该队列会自动删除。Beanpublic Queue ttlTopicduanxinQueue(){HashMapString, Object args new HashMap();// 给队列设置消息过期时间毫秒值args.put(x-message-ttl, mqConfigProperties.getDelayExpireTime() * 1000);// 设置队列最大长度args.put(x-max-length, myConfigProperties.getTtlQueueSize());// 设置死信队列交换机名称// 当消息在一个队列中变成死信后,它能就发送到另一个交换机中这个交换机就是DLX绑定DLX的队列被称之为死信队列// 编程死信队列的原因消息被拒绝消息过期队列达到最大长度args.put(x-dead-letter-exchange, myConfigProperties.getDeadExchange());// 设置死信队列路由keyargs.put(x-dead-letter-routing-key, myConfigProperties.getDeadKey());return new Queue(myConfigProperties.getTtlQueue(), true, false, false, args);}// 3: 绑定对用关系Beanpublic Binding ttlTopicsmsBinding(){return BindingBuilder.bind(ttlTopicduanxinQueue()).to(ttlTopicExchange()).with(myConfigProperties.getTtlKey());}}2.2.2. 死信队列 import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;/*** 死信队列配置文件* * author mingAn.xie*/
Configuration
public class RabbitMQConfigDead {ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机Beanpublic TopicExchange deadTopicExchange(){return new TopicExchange(myConfigProperties.getDeadExchange());}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列会被存储在磁盘上当消息代理重启时仍然存在暂存队列当前连接有效// exclusive:默认也是false只能被当前创建的连接使用而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除当没有生产者或者消费者使用此队列该队列会自动删除。Beanpublic Queue deadTopicduanxinQueue(){return new Queue(myConfigProperties.getDeadQueue(), true);}// 3: 绑定对用关系Beanpublic Binding deadTopicsmsBinding(){return BindingBuilder.bind(deadTopicduanxinQueue()).to(deadTopicExchange()).with(myConfigProperties.getDeadKey());}}2.3. 生产者推送消息
import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** RabbitMQ生产者推送消息类* * author xiemingan*/
Component
Slf4j
public class RabbitmqProducer {Resourceprivate RabbitTemplate rabbitTemplate;Resourceprivate MyConfigProperties myConfigProperties;/*** param pushMessage 推送消息体*/public void pushTtlMessage(String pushMessage) {// 推送消息至交换机并指定路由keyrabbitTemplate.convertAndSend(myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);log.info(MQ消息推送队列, exchange: {}, key: {}, message: {}, myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);}}2.4. 消费者处理消息
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** author mingAn.xie*/
Log4j2
Component
public class RabbitmqConsumer {/*** 消费死信队列* param message 消息体*/RabbitListener(queues ${rabbitmq.deadQueue})public void pushMessages(Message message) {String body new String(message.getBody()).trim();if (StringUtils.isEmpty(body)){return;}log.info(MQ消息消费, RabbitmqConsumer.pushMessages() : {}, body);}}3. 设置消息的过期时间 设置交换机类型为 x-delayed-type推送消息至交换机直连队列消费 3.1. 安装插件 rabbitmq_delayed_message_exchange 前言这里默认使用环境为 Liunx 系统 Docker 安装 RabbitMQ 具体可以参考这篇文章Docker 安装 RabbitMQ 挂载配置文件 安装插件版本需要与RabbitMQ版本一致否则可能会导致安装失败可先进入RabbitMQ容器中查看其他插件版本 插件各版本地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 这里以最新版本 v3.13.0 举例
# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez# 将插件复制进容器中: rabbitmq_xxxxxx
docker cp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq_xxxxxx:/plugins# 进入容器: rabbitmq_xxxxxx
docker exec -it rabbitmq_xxxxxx bash
cd plugins# 查询插件列表, 此处可看到插件的版本
rabbitmq-plugins list# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange交换机类型中出现 x-delayed-type 表示安装成功 3.2. MQ配置信息
3.2.1. 自定义队列配置
…/bootstrap.yml
#mq队列自定义配置
rabbitmq:saveTaskTtlExchange: ey240001_pro_save_task_ttl_topic_exchangesaveTaskTtlKey: ey240001_pro_save_task_ttlsaveTaskTtlQueue: ey240001.pro.save.task.ttl.topic.queuesaveTaskTtlQueueSize: 100003.2.2. 读取自定义MQ配置信息
/*** amqp配置文件** author mingAn.xie*/
Data
Component
ConfigurationProperties(prefix rabbitmq)
public class MyConfigProperties {/*** 任务待办生成延时队列*/public String saveTaskTtlExchange;public String saveTaskTtlKey;public String saveTaskTtlQueue;public Integer saveTaskTtlQueueSize;}3.3. 配置文件生成 x-delayed-type 交换机
import com.awsa.site.mq.MyConfigProperties;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** x-delayed-type 交换机延迟队列配置* * author mingAn.xie*/
Configuration
public class RabbitMQConfigSaveTaskTtl {ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机Beanpublic CustomExchange saveTaskTopicExchange() {MapString, Object args new HashMap();// 设置延迟队列插件类型按过期时间消费args.put(x-delayed-type, direct);// 参数name 交换机名称type 交换机类型durable 是否持久化autoDelete 是否自动删除arguments 参数return new CustomExchange(myConfigProperties.getSaveTaskTtlExchange(), x-delayed-message, true, false, args);}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列会被存储在磁盘上当消息代理重启时仍然存在暂存队列当前连接有效// exclusive:默认也是false只能被当前创建的连接使用而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除当没有生产者或者消费者使用此队列该队列会自动删除。Beanpublic Queue saveTaskTopicduanxinQueue() {return new Queue(myConfigProperties.getSaveTaskTtlQueue(), true, false, false);}// 3: 绑定对用关系Beanpublic Binding saveTaskTopicsmsBinding() {return BindingBuilder.bind(saveTaskTopicduanxinQueue()).to(saveTaskTopicExchange()).with(myConfigProperties.getSaveTaskTtlKey()).noargs();}}3.4. 生产者推送消息
import com.awsa.site.mq.MyConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 生产者推送消息类* * author xiemingan*/
Component
Slf4j
public class RabbitmqProducer {Resourceprivate RabbitTemplate rabbitTemplate;Resourceprivate MyConfigProperties myConfigProperties;/*** param pushMessage 推送消息体* param ttlTime 延时时间毫秒值*/public void pushTtlMessage(String pushMessage, long ttlTime) {ttlTime ttlTime 0 ? 1000 : ttlTime;// 3.1.推送MQ延迟消息队列long finalTtlTime ttlTime;MessagePostProcessor messagePostProcessor message - {// 设置延迟时间message.getMessageProperties().setDelay((int) finalTtlTime);return message;};rabbitTemplate.convertAndSend(myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, messagePostProcessor);log.info(MQ消息推送队列, exchange: {}, key: {}, message: {}, ttlTime: {}, myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, ttlTime);}}
3.5. 消费者处理消息
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** author mingAn.xie*/
Log4j2
Component
public class RabbitmqConsumer {/*** 消费延时消息* param message 消息体*/RabbitListener(queues ${rabbitmq.saveTaskTtlQueue})public void pushMessages(Message message) {String body new String(message.getBody()).trim();if (StringUtils.isEmpty(body)) {return;}log.info(MQ延迟消息消费, RabbitmqConsumer.pushMessages() : {}, body);}}