pc 网站建设,简阳seo排名优化培训,wordpress微信个人支付,行知网站建设一、RabbitMQ是什么#xff1f;
1.RabbitMQ简介
RabbitMQ是有erlang语言开发#xff0c;基于AMQP#xff08;Advanced Message Queue 高级消息队列协议#xff09;协议实现的消息队列。 常见的消息队列有#xff1a;RabbitMQ、Kafka 和 ActiveMQ
2.RabbitMQ的优点
Rab…一、RabbitMQ是什么
1.RabbitMQ简介
RabbitMQ是有erlang语言开发基于AMQPAdvanced Message Queue 高级消息队列协议协议实现的消息队列。 常见的消息队列有RabbitMQ、Kafka 和 ActiveMQ
2.RabbitMQ的优点
RabbitMQ最初起源于金融系统用于不同模块之间的消息通讯。
优点
可靠性可持久化消息传输和发布确认。 灵活性通过交换机将消息路由到对应的队列。 集群多台mq可组成集群对外提供整体服务 支持多语言支持多种语言 可界面操作提供简易的用户操作界面 等等。
3.常用组件
1.生产者(Producer)消息的制造者 2.消费者(Consumer)消息的消费者 3.消息Message消息对象包括业务参数和mq的参数 4.队列Queue缓存或暂存储消息的容器 5.连接Connection应用服务和mq进行交互的tcp连接 6.信道ChannelAMQP命令都是通过信道来完成的它是一个虚拟的通道来复用ctp连接。 7.交换机Exchange负责从生产者接收消息根据路由规则发送到指定的队列里面。 8.路由键Routing Key交换机根据路由键将消息发往指定的队列。 9.虚拟主机Virtual Host就像是一个RabbitMQ 服务。
4.RabbitMQ的结构图
一张图介绍RabbitMQ组成以及各个组件之间的关系参考网上画的。
5.交换机的类型
交换机是用来发送消息到指定的队列里面的它使用哪种路由算法是由交换机类型和绑定的规则所决定的。
A-直连交换机 直连交换机是根据交换机和队列之间绑定的路由键来将消息发往指定的队列里面。如果交换机与多个队列绑定则在发送携带路由键的消息时只发给此路由键的队列每个队列都是相同副本比较适合一对一。
例如我用直连交换机test-direct-exchange根据路由键test-direct发送一条消息然后去队列里面看消息如图所示 B-扇形交换机 扇形交换机是将消息发往与它绑定的队列而不去理会绑定的路由键是否一致。如果交换机与多个队列绑定每个队列都是相同副本起到广播的作用。
例如我用扇形交换机test-fanout-exchange根据路由键test-fanout发送一条消息然后去队列里面看消息如图所示
但是三个队列都收到了消息可见扇形交换机会忽略其路由键
C-主题交换机 主题交换机是通过消息的路由键跟交换机和队列之间的绑定路由键进行匹配将消息发给匹配上的队列跟直连交换机的一对多相似但是他的路由键可以支持模糊匹配。
例如我用主题交换机test-topic-exchange根据路由键test.topic2发送一条消息然后去队列里面看消息如图所示 根据消息路由键和绑定的路由键进行模糊匹配推送消息。
D-头交换机 头交换机是主题交换机有点相似主题交换机是基于路由键而头交换机是基于消息的headers数据所以在发送消息给头交换机时指定Routing key是不起作用的。头交换机在绑定队列时需要指定参数Arguments发送消息时需要指定headers和Arguments相匹配消息才能被推到相应的队列。
例如我用头交换机test-headers-exchange根据路由键test-headers1发送一条消息然后去队列里面看消息如图所示
如果前两个队列能收到消息证明路由键不生效。
二、定时推送思路实现
rabbitmq实现延时消息主要有两种方式
死信消息队列ttl死信exchange 延时插件 (rabbitmq-delayed-message-exchange)
rabbitmq 实现方式一队列ttl死信exchange
简述使用两个队列一个队列接收消息不消费等待指定时间后消息死亡再由该队列绑定的死信exchange再次将其路由到另一个队列提供业务消费。
ttl 和 死信exchange 相关知识 ttl 先贴两个个rabbitmq官方文档
Time-To-Live and Expirationhttps://www.rabbitmq.com/ttl.html Dead Letter Exchangeshttpshttps://www.rabbitmq.com/dlx.html 我这里也简单介绍下 rabbitmq 可以给 消息 和 队列 设置 ttl生存时间 队列设置x-message-ttl60000 队列中所有消息都只有60s存活时间 指定消息设置expire60000 指定消息只有60s存活时间 如果队列和消息同时设置了ttl则取较小的那个作为ttl。消息死亡后不会被消费者消费。
死信exchange 死信死亡的消息
消费者使用 basic.reject 或 basic.nack 并将requeue参数设置为 false 来否定的消息 ttl到期的消息 队列超过长度限制被丢弃的消息 当一个队列设置了死信exchange 后这个队列的死信都会被投递到死信exchange中然后可以再次路由到其他队列中如果指定了死信routing key 则死信消息routing key 变为设置的routing key未设置则为原始 routing key。
使用介绍 先声明一个消费队列 queue_dlx用来接收死信消息并提供消费 然后声明一个死信exchange_dlx, 绑定 queue_dlx接收消息后路由至queue_dlx 声明一个延迟队列queue_delay, 用来接收业务消息但不提供消费等待消息死亡后转至死信exchange。即延迟 声明一个exchange由业务发送消息到exchange然后转至queue_delay. 一个消息的流程大概是
简单分析 缺点 1.只能支持固定延迟等级的消息 2.使用较复杂得声明一堆队列exchange 3.一个致命的问题就是消息顺序不会按照延迟时间的先后顺序输出而是按照queue本身先进先出的规则。即10秒延迟的消息如果是在20秒延迟消息后扔入的那么也要等20秒延迟的消息输出后才能输出。除非消息的延迟时间是一致的否则无法满足业务要求
优点 1.支持镜像队列复制实现高可用 2.支持大量消息成千上万 3.适用场景 使用固定延迟时间的场景。
备注对于高版本3.6及以上的rabbitmq建议使用lazy-mode作为延迟队列防止大量延时消息堆积而占用大量内存从而触发rabbitmq换页阻塞队列。 如果使用spring的话即使低版本rabbitmq也不用太担心spring-amqp默认发送持久化消息即使触发换页也只是把消息从内存中逐出而已。
rabbitmq 实现方式二rabbitmq延时插件
简述延时消息不直接投递到队列中而是先转储到本地Mnesia数据库中然后定时器在消息到期后再将其投递到队列中。
延时插件使用 关于用法可以直接看这个文档或者网上搜一搜这里就不介绍了。 github地址https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
其大概原理就是指定了延时的消息会被先保存在 Mnesia erlang编写的数据库管理系统中然后有一个定时器去查询最近需要被投递的消息将其投递到目标队列中。
简单分析 优点 基本支持任意延迟时间不能超过1个月
缺点 延时不可靠存在消息数量较大或使用很久后延迟不准确会推迟 无备份机制延时消息存在单个节点磁盘中,不支持ram类型的节点 数据得存磁盘里面增加大量内存的占用 经测试发现发送大量延时消息后rabbitmq内存占用明显增高比普通消息还要高很多那种。
安装插件需要重启 适用场景如果不是无关紧要的小业务不建议使用。
3.基于队列ttl死信exchange代码实现
实现消息延迟发送的具体思路是首先创建一个交换机来当做死信交换机再创建一个队列与这个死信交换机进行绑定就称作死信队列。其次创建一个交换机来当做正常交换机在创建一个队列与这个正常交换机进行绑定同时将死信交换机和死信路由键配置到这个正常队列里面。这样当一条带有存活时间的消息通过正常交换机发送过来时首先进入正常队列里面然后到了存活时间就会通过死信交换机根据路由键发送到死信队列里面然后消费者消费死信队列里的消息就达到了延迟消费的目的。
java代码实现。
1.首先创建maven项目导入pom文件
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit-test/artifactIdscopetest/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build
2.配置配置文件如果是yml就用对应的书写规则
spring.rabbitmq.hostlocalhost
spring.rabbitmq.usernameguest
spring.rabbitmq.passwordguest
spring.rabbitmq.port5672
spring.rabbitmq.virtual-host/3.配置mq的相关组件
package com.wps.cn.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** author wangps* date 2022年11月22日 14:44*/Configuration
public class QueueConfig {public static final String NORMAL_QUEUE_NAME normal_queue_name;public static final String NORMAL_EXCHANGE_NAME normal_exchange_name;public static final String NORMAL_ROUTING_KEY normal_routing_key;public static final String DLX_QUEUE_NAME dlx_queue_name;public static final String DLX_EXCHANGE_NAME dlx_exchange_name;public static final String DLX_ROUTING_KEY dlx_routing_key;/*** 死信队列* return*/BeanQueue dlxQueue() {return new Queue(DLX_QUEUE_NAME, true);}/*** 死信交换机* return*/BeanDirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE_NAME);}/*** 绑定死信队列和死信交换机* return*/BeanBinding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}/*** 普通消息队列* return*/BeanQueue normalQueue() {MapString, Object args new HashMap();//设置消息过期时间此方法是在队列的颗粒度设置比较局限所以在消息上设置过期时间
// args.put(x-message-ttl, 1000*5);//设置死信交换机args.put(x-dead-letter-exchange, DLX_EXCHANGE_NAME);//设置死信 routing_keyargs.put(x-dead-letter-routing-key, DLX_ROUTING_KEY);return new Queue(NORMAL_QUEUE_NAME, true, false, false, args);}/*** 普通交换机* return*/BeanDirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE_NAME);}/*** 绑定普通队列和与之对应的交换机* return*/BeanBinding nomalBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(NORMAL_ROUTING_KEY);}
}
4.创建消费者
package com.wps.cn.consumer;import com.rabbitmq.client.Channel;
import com.wps.cn.config.QueueConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;import java.util.Map;/*** author wangps* date 2022年11月22日 14:55*/
Component
public class DlxConsumer {private static final Logger logger LoggerFactory.getLogger(DlxConsumer.class);RabbitListener(queues QueueConfig.DLX_QUEUE_NAME)public void process(String order, Message message, Headers MapString, Object headers, Channel channel) {logger.info(订单号消息, order);System.out.println(执行结束....message);}
}
5.创建controller当做生产者
package com.wps.cn.controller;import com.wps.cn.config.QueueConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;/*** author wangps* date 2022年11月22日 15:50*/
RestController
RequestMapping(/producer)
public class TestProducer {private static final Logger logger LoggerFactory.getLogger(TestProducer.class);Autowiredprivate AmqpTemplate rabbitTemplate;GetMapping(/sendMessage)public Object submit(){String orderId UUID.randomUUID().toString();logger.info(提交订单消息,orderId);rabbitTemplate.convertAndSend(QueueConfig.NORMAL_EXCHANGE_NAME,QueueConfig.NORMAL_ROUTING_KEY,orderId,message - {message.getMessageProperties().setExpiration(1000*5);return message;});return {orderId:orderId};}}
6.创建启动类
package com.wps.cn;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class DxlRabbitmqTestApplication {public static void main(String[] args) {SpringApplication.run(DxlRabbitmqTestApplication.class, args);}}
7.运行启动类然后在页面访问模拟推送消息
http://localhost:8080/producer/sendMessage 观察日志可以看出消息发出后在5s后消费者收到消息从而达到延迟消费的情况。 在这里插入图片描述 4.基于rabbitmq延时插件代码实现
环境准备
1.安装有 RabbitMQ 的服务器。
2.下载延时消息插件rabbitmq_delayed_message_exchange 下载地址https://www.rabbitmq.com/community-plugins.html
RabbitMQ延时队列插件下载页面
点击下载之后下载 rabbitmq_delayed_message_exchange-3.8.0.ez 这个文件。
3.将下载的文件复制到 RabbitMQ 的插件目录下一般是/opt/rabbitmq/plugins。
4.启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange代码实现
1、导入 MAVEN 依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency2、定义交换机、队列、路由KEY
/*** RabbitMQ常量** author ZhengNC* date 2020/9/21 11:40*/
public interface RabbitConstant {/*** 交换机*/interface Exchanges{/*** 延时交换机通过延时插件实现 rabbitmq_delayed_message_exchange*/String delayedExchange spring.boot.delayed.exchange;}/*** 队列*/interface Queues{/*** 延时队列通过延时插件实现*/String delayedQueue spring.boot.delayed.queue;}/*** 路由key*/interface RouterKey{/*** 延时路由key通过延时插件实现*/String delayedRouteKey delayed.route.key;}
}3、配置 RABBITMQ 绑定关系
package com.qixi.mq.delay.config;import com.qixi.mq.delay.common.constant.RabbitConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ配置** author ZhengNC* date 2020/9/14 10:40*/
Configuration
public class RabbitConfig {/*** 延时队列通过延时插件实现** return*/Bean(delayedQueue)public Queue delayedQueue(){return new Queue(RabbitConstant.Queues.delayedQueue);}/*** 延时交换机Direct交换机 起名通过延时插件实现*定义了一个x-delayed-message类型的交换机由于Spring AMQP中没有这个类型的交换机* 所以我们使用一个CustomExchange来定义这个插件构建的交换机* 它和其它交换机相同实现了AbstructExchange。* 唯一的区别是没有指定type类型。type类型可以自定义* 这样我们就可以通过构造方法自定义交换机的类型。* 在使用到延迟交换机插件的时候我们使用插件新添加了一个x-delayed-message类型的交换机。* return*/Bean(delayedExchange)public CustomExchange delayedExchange(){MapString, Object map new HashMap();map.put(x-delayed-type, direct);return new CustomExchange(RabbitConstant.Exchanges.delayedExchange,x-delayed-message, true, false, map);}/*** 绑定延时队列和延时交换机延时插件实现方式** param delayedQueue* param delayedExchange* return*/Beanpublic Binding delayedQueue_delayedExchange(Qualifier(delayedQueue) Queue delayedQueue,Qualifier(delayedExchange)CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(RabbitConstant.RouterKey.delayedRouteKey).noargs();}
}4、延时消息生产者
package com.qixi.mq.delay.producer;import com.qixi.mq.delay.common.constant.RabbitConstant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** 延时消息生产者** author ZhengNC* date 2020/9/21 14:15*/
Service
public class TTLProducer {Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送一条延时消息延时插件的实现方式** param message*/public void sendDelayedMessage(String message){rabbitTemplate.convertAndSend(RabbitConstant.Exchanges.delayedExchange,RabbitConstant.RouterKey.delayedRouteKey,message,msg - {//设置此消息延时十秒msg.getMessageProperties().setHeader(x-delay, 10000);return msg;});}
}5、延时消息的消费者
package com.qixi.mq.delay.consumer;import com.qixi.mq.delay.common.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;
import java.time.format.DateTimeFormatter;/*** 延时消息的消费者** author ZhengNC* date 2020/9/21 14:08*/
Component
public class TTLConsumer {/*** 消费延时消息延时插件实现** param message*/RabbitListener(queues RabbitConstant.Queues.delayedQueue)public void delayedConsumer(String message){System.out.println(消费了一条消息消费时间 DateTimeFormatter.ofPattern(HH:mm:ss).format(LocalTime.now()));System.out.println(message);}
}6、编写接口测试发送消息
package com.qixi.mq.delay.controller;import com.qixi.mq.delay.common.dto.ResponseEntity;
import com.qixi.mq.delay.producer.TTLProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalTime;
import java.time.format.DateTimeFormatter;/**** author ZhengNC* date 2020/9/21 14:27*/
RestController
RequestMapping(ttl)
public class TTLProducerController {Autowiredprivate TTLProducer producer;/*** 发送延时消息延时插件实现方式** return*/GetMapping(sendDelayedMsg)public ResponseEntityString sendDelayedMsg(){DateTimeFormatter timeFormatter DateTimeFormatter.ofPattern(HH:mm:ss);StringBuilder message new StringBuilder(这是一条延时消息消息的发送时间为);message.append(timeFormatter.format(LocalTime.now()));producer.sendDelayedMessage(message.toString());return ResponseEntity.success();}
}7、测试结果
消费了一条消息消费时间10:00:11 这是一条延时消息消息的发送时间为10:00:01