京东网站开发多少钱,注册网站的流程,北京网站推广怎么做,wdcp网站搬家RabbitMQ入门与进阶 基础篇1. 为什么需要消息队列?2. 什么是消息队列?3. RabbitMQ体系结构介绍4. RabbitMQ安装5. HelloWorld6. RabbitMQ经典用法(工作模式)7. Work Queues8. Publish/Subscribe9. Routing10. Topics 进阶篇1. RabbitMQ整合SpringBoot2. 消息可靠性投递故障情… RabbitMQ入门与进阶 基础篇1. 为什么需要消息队列?2. 什么是消息队列?3. RabbitMQ体系结构介绍4. RabbitMQ安装5. HelloWorld6. RabbitMQ经典用法(工作模式)7. Work Queues8. Publish/Subscribe9. Routing10. Topics 进阶篇1. RabbitMQ整合SpringBoot2. 消息可靠性投递故障情况1:消息没有发送到消息队列解决思路A:在生产者端进行确认具体操作中我们会分别针对交换机和队列来确认如果没有成功发送到消息队列服务器上那就可以尝试重新发送解决思路B:为目标交换机指定备份交换机当目标交换机投递失败时把消息投递至备份交换机 故障情况2:消息队列服务器宕机导致内存中消息丢失解决思路:消息持久化到硬盘上哪怕服务器重启也不会导致消息丢失 故障情况3:消费端宕机或抛异常导致消息没有成功被消费消费端消费消息成功给服务器返回ACK信息然后消息队列删除该消息消费端消费消息失败给服务器端返回ACK信息同时把消息恢复为待消费的状态这样就可以再次取回消息重试一次(当然这就需要消费端接口支持幂等性) 3. 消费端限流4. 消息超时5. 死信和死信队列6. 延迟队列7. 事务消息8. 惰性队列9. 优先级队列 集群篇1. 工作机制2. 集群搭建3. 负载均衡4. 仲裁队列5. 流式队列6. 异地容灾 基础篇
1. 为什么需要消息队列? 2. 什么是消息队列? 3. RabbitMQ体系结构介绍 4. RabbitMQ安装
操作001RabbitMQ安装
一、安装 拉取镜像
docker pull rabbitmq:3.13-management-d 参数后台运行 Docker 容器--name 参数设置容器名称-p 参数映射端口号格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问15672供后台管理界面访问-v 参数卷映射目录-e 参数设置容器内的环境变量这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USERguest \
-e RABBITMQ_DEFAULT_PASS123456 \
rabbitmq:3.13-management二、验证
访问后台管理界面http://192.168.200.100:15672 使用上面创建Docker容器时指定的默认用户名、密码登录 三、可能的问题
1、问题现象
在使用Docker拉取RabbitMQ镜像的时候如果遇到提示missing signature key那就说明Docker版本太低了需要升级
比如我目前的Docker版本如下图所示 2、解决办法 基于CentOS7 ①卸载当前Docker更好的办法是安装Docker前曾经给服务器拍摄了快照此时恢复快照
如果不曾拍摄快照那只能执行卸载操作了
yum erase -y docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-selinux \docker-engine-selinux \docker-engine \docker-ce②升级yum库yum update -y③安装Docker最新版yum install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin如果这一步看到提示没有可用软件包 docker-ce那就添加Docker的yum源
yum install -y yum-utils
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo④设置Docker服务systemctl start docker
systemctl enable docker3、验证
上述操作执行完成后再次查看Docker版本 5. HelloWorld
操作002HelloWorld
一、目标
生产者发送消息消费者接收消息用最简单的方式实现
官网说明参见下面超链接
RabbitMQ tutorial - “Hello World!” — RabbitMQ 二、具体操作
1、创建Java工程
①消息发送端生产者 ②消息接收端消费者 ③添加依赖
dependenciesdependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.20.0/version/dependency
/dependencies2、发送消息
①Java代码
不用客气整个代码全部复制——当然连接信息改成你自己的
package com.atguigu.rabbitmq.simple; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory connectionFactory new ConnectionFactory(); // 设置主机地址 connectionFactory.setHost(192.168.200.100); // 设置连接端口号默认为 5672connectionFactory.setPort(5672);// 虚拟主机名称默认为 /connectionFactory.setVirtualHost(/);// 设置连接用户名默认为guest connectionFactory.setUsername(guest);// 设置连接密码默认为guest connectionFactory.setPassword(123456);// 创建连接 Connection connection connectionFactory.newConnection(); // 创建频道 Channel channel connection.createChannel(); // 声明创建队列 // queue 参数1队列名称 // durable 参数2是否定义持久化队列当 MQ 重启之后还在 // exclusive 参数3是否独占本次连接。若独占只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列 // autoDelete 参数4是否在不使用的时候自动删除队列也就是在没有Consumer时自动删除 // arguments 参数5队列其它参数 channel.queueDeclare(simple_queue, true, false, false, null); // 要发送的信息 String message 你好小兔子; // 参数1交换机名称,如果没有指定则使用默认Default Exchange // 参数2路由key,简单模式可以传递队列名称 // 参数3配置信息 // 参数4消息内容 channel.basicPublish(, simple_queue, null, message.getBytes()); System.out.println(已发送消息 message); // 关闭资源 channel.close(); connection.close(); } }②查看效果 3、接收消息
①Java代码
不用客气整个代码全部复制——当然连接信息改成你自己的
package com.atguigu.rabbitmq.simple; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static void main(String[] args) throws Exception { // 1.创建连接工厂 ConnectionFactory factory new ConnectionFactory(); // 2. 设置参数 factory.setHost(192.168.200.100); factory.setPort(5672); factory.setVirtualHost(/); factory.setUsername(guest);factory.setPassword(123456); // 3. 创建连接 Connection Connection connection factory.newConnection(); // 4. 创建Channel Channel channel connection.createChannel(); // 5. 创建队列 // 如果没有一个名字叫simple_queue的队列则会创建该队列如果有则不会创建 // 参数1. queue队列名称 // 参数2. durable是否持久化。如果持久化则当MQ重启之后还在 // 参数3. exclusive是否独占。 // 参数4. autoDelete是否自动删除。当没有Consumer时自动删除掉 // 参数5. arguments其它参数。 channel.queueDeclare(simple_queue,true,false,false,null); // 接收消息 DefaultConsumer consumer new DefaultConsumer(channel){ // 回调方法,当收到消息后会自动执行该方法 // 参数1. consumerTag标识 // 参数2. envelope获取一些信息交换机路由key... // 参数3. properties配置信息 // 参数4. body数据 Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(consumerTagconsumerTag); System.out.println(Exchangeenvelope.getExchange()); System.out.println(RoutingKeyenvelope.getRoutingKey()); System.out.println(propertiesproperties); System.out.println(bodynew String(body)); } }; // 参数1. queue队列名称 // 参数2. autoAck是否自动确认类似咱们发短信发送成功会收到一个确认消息 // 参数3. callback回调对象 // 消费者类似一个监听程序主要是用来监听消息 channel.basicConsume(simple_queue,true,consumer); } }②控制台打印 consumerTagamq.ctag-8EB87GaZFP52LKSMcj98UA Exchange RoutingKeysimple_queue propertiescontentHeader(content-typenull, content-encodingnull, headersnull, delivery-modenull, prioritynull, correlation-idnull, reply-tonull, expirationnull, message-idnull, timestampnull, typenull, user-idnull, app-idnull, cluster-idnull) body你好小兔子 ③查看后台管理界面
因为消息被消费掉了所以RabbitMQ服务器上没有了 6. RabbitMQ经典用法(工作模式) 7. Work Queues 操作003工作队列模式
一、生产者代码 1、封装工具类
package com.atguigu.rabbitmq.util; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static final String HOST_ADDRESS 192.168.200.100; public static Connection getConnection() throws Exception { // 定义连接工厂 ConnectionFactory factory new ConnectionFactory(); // 设置服务地址 factory.setHost(HOST_ADDRESS); // 端口 factory.setPort(5672); //设置账号信息用户名、密码、vhost factory.setVirtualHost(/); factory.setUsername(guest); factory.setPassword(123456); // 通过工程获取连接 Connection connection factory.newConnection(); return connection; } public static void main(String[] args) throws Exception { Connection con ConnectionUtil.getConnection(); // amqp://guest192.168.200.100:5672/ System.out.println(con); con.close(); } }2、编写代码
package com.atguigu.rabbitmq.work; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; public class Producer { public static final String QUEUE_NAME work_queue; public static void main(String[] args) throws Exception { Connection connection ConnectionUtil.getConnection(); Channel channel connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); for (int i 1; i 10; i) { String body ihello rabbitmq~~~; channel.basicPublish(,QUEUE_NAME,null,body.getBytes()); } channel.close(); connection.close(); } }3、发送消息效果 二、消费者代码
1、编写代码
创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同代码完全一样。
package com.atguigu.rabbitmq.work; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { static final String QUEUE_NAME work_queue; public static void main(String[] args) throws Exception { Connection connection ConnectionUtil.getConnection(); Channel channel connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer new DefaultConsumer(channel){ Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(Consumer1 bodynew String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }注意运行的时候先启动两个消费端程序然后再启动生产者端程序。 如果已经运行过生产者程序则手动把work_queue队列删掉。
2、运行效果
最终两个消费端程序竞争结果如下 8. Publish/Subscribe 操作004发布订阅模式
一、生产者代码
package com.atguigu.rabbitmq.fanout; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { // 1、获取连接 Connection connection ConnectionUtil.getConnection(); // 2、创建频道 Channel channel connection.createChannel(); // 参数1. exchange交换机名称 // 参数2. type交换机类型 // DIRECT(direct)定向 // FANOUT(fanout)扇形广播发送消息到每一个与之绑定队列。 // TOPIC(topic)通配符的方式 // HEADERS(headers)参数匹配 // 参数3. durable是否持久化 // 参数4. autoDelete自动删除 // 参数5. internal内部使用。一般false // 参数6. arguments其它参数 String exchangeName test_fanout; // 3、创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); // 4、创建队列 String queue1Name test_fanout_queue1; String queue2Name test_fanout_queue2; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 5、绑定队列和交换机 // 参数1. queue队列名称 // 参数2. exchange交换机名称 // 参数3. routingKey路由键绑定规则 // 如果交换机的类型为fanoutroutingKey设置为 channel.queueBind(queue1Name,exchangeName,); channel.queueBind(queue2Name,exchangeName,); String body 日志信息张三调用了findAll方法...日志级别info...; // 6、发送消息 channel.basicPublish(exchangeName,,null,body.getBytes()); // 7、释放资源 channel.close(); connection.close(); } }二、消费者代码
1、消费者1号
package com.atguigu.rabbitmq.fanout; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection ConnectionUtil.getConnection(); Channel channel connection.createChannel(); String queue1Name test_fanout_queue1; channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer new DefaultConsumer(channel){ Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(bodynew String(body)); System.out.println(队列 1 消费者 1 将日志信息打印到控制台.....); } }; channel.basicConsume(queue1Name,true,consumer); } }2、消费者2号
package com.atguigu.rabbitmq.fanout; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection ConnectionUtil.getConnection(); Channel channel connection.createChannel(); String queue2Name test_fanout_queue2; channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer new DefaultConsumer(channel){ Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(bodynew String(body)); System.out.println(队列 2 消费者 2 将日志信息打印到控制台.....); } }; channel.basicConsume(queue2Name,true,consumer); } }三、运行效果
还是先启动消费者然后再运行生产者程序发送消息 四、小结
交换机和队列的绑定关系如下图所示 交换机需要与队列进行绑定绑定之后一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别
工作队列模式本质上是绑定默认交换机发布订阅模式绑定指定交换机监听同一个队列的消费端程序彼此之间是竞争关系绑定同一个交换机的多个队列在发布订阅模式下消息是广播的每个队列都能接收到消息
9. Routing 操作006-路由模式
一、生产者代码
package com.atguigu.rabbitmq.routing; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { Connection connection ConnectionUtil.getConnection(); Channel channel connection.createChannel(); String exchangeName test_direct; // 创建交换机 channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null); // 创建队列 String queue1Name test_direct_queue1; String queue2Name test_direct_queue2; // 声明创建队列 channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 队列绑定交换机 // 队列1绑定error channel.queueBind(queue1Name,exchangeName,error); // 队列2绑定info error warning channel.queueBind(queue2Name,exchangeName,info); channel.queueBind(queue2Name,exchangeName,error); channel.queueBind(queue2Name,exchangeName,warning); String message 日志信息张三调用了delete方法.错误了,日志级别warning; // 发送消息 channel.basicPublish(exchangeName,warning,null,message.getBytes()); System.out.println(message); // 释放资源 channel.close(); connection.close(); } }二、消费者代码
1、消费者1号
package com.atguigu.rabbitmq.routing; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection ConnectionUtil.getConnection(); Channel channel connection.createChannel(); String queue1Name test_direct_queue1; channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer new DefaultConsumer(channel){ Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(bodynew String(body)); System.out.println(Consumer1 将日志信息打印到控制台.....); } }; channel.basicConsume(queue1Name,true,consumer); } }2、消费者2号
package com.atguigu.rabbitmq.routing; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection ConnectionUtil.getConnection(); Channel channel connection.createChannel(); String queue2Name test_direct_queue2; channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer new DefaultConsumer(channel){ Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(bodynew String(body)); System.out.println(Consumer2 将日志信息存储到数据库.....); } }; channel.basicConsume(queue2Name,true,consumer); } }三、运行结果
1、绑定关系 2、消费消息 10. Topics 操作006主题模式
一、生产者代码
package com.atguigu.rabbitmq.topic; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { Connection connection ConnectionUtil.getConnection(); Channel channel connection.createChannel(); String exchangeName test_topic; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); String queue1Name test_topic_queue1; String queue2Name test_topic_queue2; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 绑定队列和交换机 // 参数1. queue队列名称 // 参数2. exchange交换机名称 // 参数3. routingKey路由键,绑定规则 // 如果交换机的类型为fanout ,routingKey设置为 // routing key 常用格式系统的名称.日志的级别。 // 需求 所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name,exchangeName,.error); channel.queueBind(queue1Name,exchangeName,order.*); channel.queueBind(queue2Name,exchangeName,*.*); // 分别发送消息到队列order.info、goods.info、goods.error String body [所在系统order][日志级别info][日志内容订单生成保存成功]; channel.basicPublish(exchangeName,order.info,null,body.getBytes()); body [所在系统goods][日志级别info][日志内容商品发布成功]; channel.basicPublish(exchangeName,goods.info,null,body.getBytes()); body [所在系统goods][日志级别error][日志内容商品发布失败]; channel.basicPublish(exchangeName,goods.error,null,body.getBytes()); channel.close(); connection.close(); } }二、消费者代码
1、消费者1号
消费者1监听队列1
package com.atguigu.rabbitmq.topic; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection ConnectionUtil.getConnection(); Channel channel connection.createChannel(); String QUEUE_NAME test_topic_queue1; channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer new DefaultConsumer(channel){ Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(bodynew String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }2、消费者2号
消费者2监听队列2
package com.atguigu.rabbitmq.topic; import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection ConnectionUtil.getConnection(); Channel channel connection.createChannel(); String QUEUE_NAME test_topic_queue2; channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer new DefaultConsumer(channel){ Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(bodynew String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }三、运行效果
队列1 队列2 进阶篇
1. RabbitMQ整合SpringBoot 操作007整合SpringBoot
1、消费者工程
①创建module ②配置POM
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.1.5/version
/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency
/dependencies③YAML
增加日志打印的配置
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
logging:level:com.atguigu.mq.listener.MyMessageListener: info④主启动类
仿照生产者工程的主启动类改一下类名即可
package com.atguigu.mq;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class RabbitMQConsumerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQConsumerMainType.class, args);}}⑤监听器
package com.atguigu.mq.listener;import lombok.extern.slf4j.Slf4j;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
Slf4j
public class MyMessageListener {public static final String EXCHANGE_DIRECT exchange.direct.order; public static final String ROUTING_KEY order; public static final String QUEUE_NAME queue.order; RabbitListener(bindings QueueBinding(value Queue(value QUEUE_NAME, durable true),exchange Exchange(value EXCHANGE_DIRECT),key {ROUTING_KEY}))public void processMessage(String dateString,Message message,Channel channel) {log.info(dateString);}}2、RabbitListener注解属性对比
①bindings属性
表面作用 指定交换机和队列之间的绑定关系指定当前方法要监听的队列 隐藏效果如果RabbitMQ服务器上没有这里指定的交换机和队列那么框架底层的代码会创建它们
②queues属性
RabbitListener(queues {QUEUE_ATGUIGU})作用指定当前方法要监听的队列注意此时框架不会创建相关交换机和队列必须提前创建好
3、生产者工程
①创建module ②配置POM
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.1.5/version
/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependency
/dependencies③YAML
spring: rabbitmq: host: 192.168.200.100port: 5672 username: guest password: 123456 virtual-host: /④主启动类
package com.atguigu.mq; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; SpringBootApplication
public class RabbitMQProducerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args); }}⑤测试程序
package com.atguigu.mq.test;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
public class RabbitMQTest { public static final String EXCHANGE_DIRECT exchange.direct.order; public static final String ROUTING_KEY order;Autowired private RabbitTemplate rabbitTemplate;Test public void testSendMessage() { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, Hello atguigu); } }2. 消息可靠性投递 故障情况1:消息没有发送到消息队列
解决思路A:在生产者端进行确认具体操作中我们会分别针对交换机和队列来确认如果没有成功发送到消息队列服务器上那就可以尝试重新发送
操作008-01-A生产者端消息确认机制
一、创建module 二、搭建环境
1、配置POM
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.1.5/version
/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency
/dependencies2、主启动类
没有特殊设定
package com.atguigu.mq; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; SpringBootApplication
public class RabbitMQProducerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args); }}3、YAML
注意publisher-confirm-type和publisher-returns是两个必须要增加的配置如果没有则本节功能不生效
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /publisher-confirm-type: CORRELATED 交换机的确认publisher-returns: true 队列的确认
logging:level:com.atguigu.mq.config.MQProducerAckConfig: info三、创建配置类
1、目标
在这里我们为什么要创建这个配置类呢首先我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息
方法名方法功能所属接口接口所属类confirm()确认消息是否发送到交换机ConfirmCallbackRabbitTemplatereturnedMessage()确认消息是否发送到队列ReturnsCallbackRabbitTemplate
然后就是对RabbitTemplate的功能进行增强因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。
原本RabbitTemplate对象并没有生产者端消息确认的功能要给它设置对应的组件才可以。
而设置对应的组件需要调用RabbitTemplate对象下面两个方法
设置组件调用的方法所需对象类型setConfirmCallback()ConfirmCallback接口类型setReturnCallback()ReturnCallback接口类型
2、API说明
①ConfirmCallback接口
这是RabbitTemplate内部的一个接口源代码如下 /*** A callback for publisher confirmations.**/FunctionalInterfacepublic interface ConfirmCallback {/*** Confirmation callback.* param correlationData correlation data for the callback.* param ack true for ack, false for nack* param cause An optional cause, for nack, when available, otherwise null.*/void confirm(Nullable CorrelationData correlationData, boolean ack, Nullable String cause);}生产者端发送消息之后回调confirm()方法
ack参数值为true表示消息成功发送到了交换机ack参数值为false表示消息没有发送到交换机
②ReturnCallback接口
同样也RabbitTemplate内部的一个接口源代码如下 /*** A callback for returned messages.** since 2.3*/FunctionalInterfacepublic interface ReturnsCallback {/*** Returned message callback.* param returned the returned message and metadata.*/void returnedMessage(ReturnedMessage returned);}注意接口中的returnedMessage()方法仅在消息没有发送到队列时调用
ReturnedMessage类中主要属性含义如下
属性名类型含义messageorg.springframework.amqp.core.Message消息以及消息相关数据replyCodeint应答码类似于HTTP响应状态码replyTextString应答码说明exchangeString交换机名称routingKeyString路由键名称
3、配置类代码
①要点1
加Component注解加入IOC容器
②要点2
配置类自身实现ConfirmCallback、ReturnCallback这两个接口然后通过this指针把配置类的对象设置到RabbitTemplate对象中。
操作封装到了一个专门的void init()方法中。
为了保证这个void init()方法在应用启动时被调用我们使用PostConstruct注解来修饰这个方法。
关于PostConstruct注解大家可以参照以下说明 PostConstruct注解是Java中的一个标准注解它用于指定在对象创建之后立即执行的方法。当使用依赖注入如Spring框架或者其他方式创建对象时PostConstruct注解可以确保在对象完全初始化之后执行相应的方法。 使用PostConstruct注解的方法必须满足以下条件 方法不能有任何参数。方法必须是非静态的。方法不能返回任何值。 当容器实例化一个带有PostConstruct注解的Bean时它会在调用构造函数之后并在依赖注入完成之前调用被PostConstruct注解标记的方法。这样我们可以在该方法中进行一些初始化操作比如读取配置文件、建立数据库连接等。 ③代码
有了以上说明下面我们就可以展示配置类的整体代码
package com.atguigu.mq.config;import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;Component
Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{Autowiredprivate RabbitTemplate rabbitTemplate;PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info(消息发送到交换机成功数据 correlationData);} else {log.info(消息发送到交换机失败数据 correlationData 原因 cause);}}Overridepublic void returnedMessage(ReturnedMessage returned) {log.info(消息主体: new String(returned.getMessage().getBody()));log.info(应答码: returned.getReplyCode());log.info(描述 returned.getReplyText());log.info(消息使用的交换器 exchange : returned.getExchange());log.info(消息使用的路由键 routing : returned.getRoutingKey());}
}四、发送消息
package com.atguigu.mq.test;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
public class RabbitMQTest { public static final String EXCHANGE_DIRECT exchange.direct.order;public static final String ROUTING_KEY order;Autowired private RabbitTemplate rabbitTemplate;Test public void testSendMessage() { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, Hello atguigu); } }通过调整代码测试如下三种情况
交换机正确、路由键正确交换机正确、路由键不正确无法发送到队列交换机不正确无法发送到交换机
解决思路B:为目标交换机指定备份交换机当目标交换机投递失败时把消息投递至备份交换机 操作008-01-B备份交换机
一、创建备份交换机
1、创建备份交换机
注意备份交换机一定要选择fanout类型因为原交换机转入备份交换机时并不会指定路由键 2、创建备份交换机要绑定的队列
①创建队列 ②绑定交换机
注意这里是要和备份交换机绑定 3、针对备份队列创建消费端监听器 public static final String EXCHANGE_DIRECT_BACKUP exchange.direct.order.backup;public static final String QUEUE_NAME_BACKUP queue.order.backup;RabbitListener(bindings QueueBinding(value Queue(value QUEUE_NAME_BACKUP, durable true),exchange Exchange(value EXCHANGE_DIRECT_BACKUP),key {}))public void processMessageBackup(String dateString,Message message,Channel channel) {log.info(BackUp: dateString);}二、设定备份关系
1、原交换机删除
· 2、重新创建原交换机 3、原交换机重新绑定原队列 三、测试
启动消费者端发送消息但是路由键不对于是转入备份交换机
故障情况2:消息队列服务器宕机导致内存中消息丢失
解决思路:消息持久化到硬盘上哪怕服务器重启也不会导致消息丢失
操作008-02交换机和队列持久化
一、测试非持久化交换机和队列
1、创建非持久化交换机 创建之后可以在列表中看到 2、创建非持久化队列 创建之后可以在列表中看到 3、绑定 4、发送消息 public static final String EXCHANGE_TRANSIENT exchange.transient.user;public static final String ROUTING_KEY_TRANSIENT user;Testpublic void testSendMessageTransient() {rabbitTemplate.convertAndSend(EXCHANGE_TRANSIENT,ROUTING_KEY_TRANSIENT,Hello atguigu user~~~);}5、查看已发送消息 结论临时性的交换机和队列也能够接收消息但如果RabbitMQ服务器重启之后会怎么样呢
6、重启RabbitMQ服务器
docker restart rabbitmq重启之后刚才临时性的交换机和队列都没了。在交换机和队列这二者中队列是消息存储的容器队列没了消息就也跟着没了。
二、持久化的交换机和队列
我们其实不必专门创建持久化的交换机和队列因为它们默认就是持久化的。接下来我们只需要确认一下存放到队列中尚未被消费端取走的消息是否会随着RabbitMQ服务器重启而丢失
1、发送消息
运行以前的发送消息方法即可不过要关掉消费端程序
2、在管理界面查看消息 3、重启RabbitMQ服务器
docker restart rabbitmq4、再次查看消息
仍然还在 三、结论
在后台管理界面创建交换机和队列时默认就是持久化的模式。
此时消息也是持久化的不需要额外设置。
故障情况3:消费端宕机或抛异常导致消息没有成功被消费
消费端消费消息成功给服务器返回ACK信息然后消息队列删除该消息
消费端消费消息失败给服务器端返回ACK信息同时把消息恢复为待消费的状态这样就可以再次取回消息重试一次(当然这就需要消费端接口支持幂等性)
操作008-03消费端消息确认
一、ACK
ACK是acknowledge的缩写表示已确认
二、默认情况
默认情况下消费端取回消息后默认会自动返回ACK确认消息所以在前面的测试中消息被消费端消费之后RabbitMQ得到ACK确认信息就会删除消息
但实际开发中消费端根据消息队列投递的消息执行对应的业务未必都能执行成功如果希望能够多次重试那么默认设定就不满足要求了
所以还是要修改成手动确认
三、创建消费端module
1、配置POM
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.1.5/version
/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency
/dependencies2、YAML
增加针对监听器的设置
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual 把消息确认模式改为手动确认3、主启动类
没有特殊设定
package com.atguigu.mq;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class RabbitMQConsumerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQConsumerMainType.class, args);}}四、消费端监听器
1、创建监听器类
package com.atguigu.mq.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;Component
public class MyMessageListener {public static final String EXCHANGE_DIRECT exchange.direct.order;public static final String ROUTING_KEY order;public static final String QUEUE_NAME queue.order;public void processMessage(String dataString, Message message, Channel channel) {}}2、在接收消息的方法上应用注解
// 修饰监听方法
RabbitListener(// 设置绑定关系bindings QueueBinding(// 配置队列信息durable 设置为 true 表示队列持久化autoDelete 设置为 false 表示关闭自动删除value Queue(value QUEUE_NAME, durable true, autoDelete false),// 配置交换机信息durable 设置为 true 表示队列持久化autoDelete 设置为 false 表示关闭自动删除exchange Exchange(value EXCHANGE_DIRECT, durable true, autoDelete false),// 配置路由键信息key {ROUTING_KEY}
))
public void processMessage(String dataString, Message message, Channel channel) {}3、接收消息方法内部逻辑
业务处理成功手动返回ACK信息表示消息成功消费业务处理失败手动返回NACK信息表示消息消费失败。此时有两种后续操作供选择 把消息重新放回消息队列RabbitMQ会重新投递这条消息那么消费端将重新消费这条消息——从而让业务代码再执行一遍不把消息放回消息队列返回reject信息表示拒绝那么这条消息的处理就到此为止
4、相关API
先回到PPT理解“deliveryTag交付标签机制”
下面我们探讨的三个方法都是来自于com.rabbitmq.client.Channel接口
①basicAck()方法
方法功能给Broker返回ACK确认信息表示消息已经在消费端成功消费这样Broker就可以把消息删除了参数列表
参数名称含义long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识boolean multiple取值为true为小于、等于deliveryTag的消息批量返回ACK信息取值为false仅为指定的deliveryTag返回ACK信息
②basicNack()方法
方法功能给Broker返回NACK信息表示消息在消费端消费失败此时Broker的后续操作取决于参数requeue的值参数列表
参数名称含义long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识boolean multiple取值为true为小于、等于deliveryTag的消息批量返回ACK信息取值为false仅为指定的deliveryTag返回ACK信息boolean requeue取值为trueBroker将消息重新放回队列接下来会重新投递给消费端取值为falseBroker将消息标记为已消费不会放回队列
③basicReject()方法
方法功能根据指定的deliveryTag对该消息表示拒绝参数列表
参数名称含义long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识boolean requeue取值为trueBroker将消息重新放回队列接下来会重新投递给消费端取值为falseBroker将消息标记为已消费不会放回队列
basicNack()和basicReject()有啥区别 basicNack()有批量操作basicReject()没有批量操作
5、完整代码示例
package com.atguigu.mq.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;Component
Slf4j
public class MyMessageListener {public static final String EXCHANGE_DIRECT exchange.direct.order;public static final String ROUTING_KEY order;public static final String QUEUE_NAME queue.order;// 修饰监听方法RabbitListener(// 设置绑定关系bindings QueueBinding(// 配置队列信息durable 设置为 true 表示队列持久化autoDelete 设置为 false 表示关闭自动删除value Queue(value QUEUE_NAME, durable true, autoDelete false),// 配置交换机信息durable 设置为 true 表示队列持久化autoDelete 设置为 false 表示关闭自动删除exchange Exchange(value EXCHANGE_DIRECT, durable true, autoDelete false),// 配置路由键信息key {ROUTING_KEY}))public void processMessage(String dataString, Message message, Channel channel) throws IOException {// 1、获取当前消息的 deliveryTag 值备用long deliveryTag message.getMessageProperties().getDeliveryTag();try {// 2、正常业务操作log.info(消费端接收到消息内容 dataString);// System.out.println(10 / 0);// 3、给 RabbitMQ 服务器返回 ACK 确认信息channel.basicAck(deliveryTag, false);} catch (Exception e) {// 4、获取信息看当前消息是否曾经被投递过Boolean redelivered message.getMessageProperties().getRedelivered();if (!redelivered) {// 5、如果没有被投递过那就重新放回队列重新投递再试一次channel.basicNack(deliveryTag, false, true);} else {// 6、如果已经被投递过且这一次仍然进入了 catch 块那么返回拒绝且不再放回队列channel.basicReject(deliveryTag, false);}}}}五、要点总结
要点1把消息确认模式改为手动确认要点2调用Channel对象的方法返回信息 ACKAcknowledgement表示消息处理成功NACKNegative Acknowledgement表示消息处理失败Reject拒绝同样表示消息处理失败 要点3后续操作 requeue为true重新放回队列重新投递再次尝试requeue为false不放回队列不重新投递 要点4deliveryTag 消息的唯一标识查找具体某一条消息的依据
六、流程梳理 七、多啰嗦一句
消费端如果设定消息重新放回队列Broker重新投递消息那么消费端就可以再次消费消息这是一种“重试”机制这需要消费端代码支持“幂等性”——这属于前置知识不展开了。
3. 消费端限流 操作009Prefetch
一、思路
生产者发送100个消息对照两种情况 消费端没有设置prefetch参数100个消息被全部取回消费端设置prefetch参数为1100个消息慢慢取回
二、生产者端代码
Test
public void testSendMessage() {for (int i 0; i 100; i) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,Hello atguigu i);}
}三、消费者端代码
// 2、正常业务操作
log.info(消费端接收到消息内容 dataString);// System.out.println(10 / 0);
TimeUnit.SECONDS.sleep(1);// 3、给 RabbitMQ 服务器返回 ACK 确认信息
channel.basicAck(deliveryTag, false);四、测试
1、未使用prefetch
不要启动消费端程序如果正在运行就把它停了运行生产者端程序发送100条消息查看队列中消息的情况 说明 Ready表示已经发送到队列的消息数量Unacked表示已经发送到消费端但是消费端尚未返回ACK信息的消息数量Total未被删除的消息总数 接下来启动消费端程序再查看队列情况 能看到消息全部被消费端取走了正在逐个处理、确认说明有多少消息消费端就并发处理多少
2、设定prefetch
①YAML配置
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manualprefetch: 1 设置每次最多从消息队列服务器取回多少消息②测试流程
停止消费端程序运行生产者端程序发送100条消息查看队列中消息的情况 接下来启动消费端程序持续观察队列情况 能看到消息不是一次性全部取回的而是有个过程
4. 消息超时 操作010消息超时
一、队列层面设置
1、设置 别忘了设置绑定关系 2、测试
不启动消费端程序向设置了过期时间的队列中发送100条消息等10秒后看是否全部被过期删除 二、消息层面设置
1、设置
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;Test
public void testSendMessageTTL() { // 1、创建消息后置处理器对象 MessagePostProcessor messagePostProcessor (Message message) - { // 设定 TTL 时间以毫秒为单位message.getMessageProperties().setExpiration(5000); return message;};// 2、发送消息 rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, Hello atguigu, messagePostProcessor);
}2、查看效果
这次我们是发送到普通队列上 5. 死信和死信队列 操作011死信
一、测试相关准备
1、创建死信交换机和死信队列
常规设定即可没有特殊设置
死信交换机exchange.dead.letter.video死信队列queue.dead.letter.video死信路由键routing.key.dead.letter.video
2、创建正常交换机和正常队列
注意一定要注意正常队列有诸多限定和设置这样才能让无法处理的消息进入死信交换机 正常交换机exchange.normal.video正常队列queue.normal.video正常路由键routing.key.normal.video
全部设置完成后参照如下细节 3、Java代码中的相关常量声明
public static final String EXCHANGE_NORMAL exchange.normal.video;
public static final String EXCHANGE_DEAD_LETTER exchange.dead.letter.video; public static final String ROUTING_KEY_NORMAL routing.key.normal.video;
public static final String ROUTING_KEY_DEAD_LETTER routing.key.dead.letter.video; public static final String QUEUE_NORMAL queue.normal.video;
public static final String QUEUE_DEAD_LETTER queue.dead.letter.video;二、消费端拒收消息
1、发送消息的代码
Test
public void testSendMessageButReject() { rabbitTemplate .convertAndSend( EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, 测试死信情况1消息被拒绝);
}2、接收消息的代码
①监听正常队列
RabbitListener(queues {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {// 监听正常队列但是拒绝消息log.info(★[normal]消息接收到但我拒绝。);channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}②监听死信队列
RabbitListener(queues {QUEUE_DEAD_LETTER})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException { // 监听死信队列 log.info(★[dead letter]dataString dataString);log.info(★[dead letter]我是死信监听方法我接收到了死信消息);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}3、执行结果 三、消息数量超过队列容纳极限
1、发送消息的代码
Test
public void testSendMultiMessage() { for (int i 0; i 20; i) { rabbitTemplate.convertAndSend( EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, 测试死信情况2消息数量超过队列的最大容量 i); }
}2、接收消息的代码
消息接收代码不再拒绝消息
RabbitListener(queues {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {// 监听正常队列log.info(★[normal]消息接收到。);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}重启微服务使代码修改生效。
3、执行效果
正常队列的参数如下图所示 生产者发送20条消息之后消费端死信队列接收到前10条消息 四、消息超时未消费
1、发送消息的代码
正常发送一条消息即可所以使用第一个例子的代码。
Test
public void testSendMessageTimeout() {rabbitTemplate.convertAndSend(EXCHANGE_NORMAL,ROUTING_KEY_NORMAL,测试死信情况3消息超时);
}2、执行效果
队列参数生效 因为没有消费端监听程序所以消息未超时前滞留在队列中 消息超时后进入死信队列 6. 延迟队列 操作012延迟插件
一、插件简介
官网地址https://github.com/rabbitmq/rabbitmq-delayed-message-exchange延迟极限最多两天
二、插件安装
1、确定卷映射目录
docker inspect rabbitmq运行结果 Mounts: [{Type: volume,Name: rabbitmq-plugin,Source: /var/lib/docker/volumes/rabbitmq-plugin/_data,Destination: /plugins,Driver: local,Mode: z,RW: true,Propagation: },{Type: volume,Name: cca7bc3012f5b76bd6c47a49ca6911184f9076f5f6263b41f4b9434a7f269b11,Source: /var/lib/docker/volumes/cca7bc3012f5b76bd6c47a49ca6911184f9076f5f6263b41f4b9434a7f269b11/_data,Destination: /var/lib/rabbitmq,Driver: local,Mode: ,RW: true,Propagation: }]和容器内/plugins目录对应的宿主机目录是/var/lib/docker/volumes/rabbitmq-plugin/_data
2、下载延迟插件
官方文档说明页地址https://www.rabbitmq.com/community-plugins.html 下载插件安装文件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data3、启用插件 登录进入容器内部
docker exec -it rabbitmq /bin/bashrabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange退出Docker容器
exit重启Docker容器
docker restart rabbitmq4、确认
确认点1查看当前节点已启用插件的列表 确认点2如果创建新交换机时可以在type中看到x-delayed-message选项那就说明插件安装好了 三、创建交换机
rabbitmq_delayed_message_exchange插件在工作时要求交换机是x-delayed-message类型才可以创建方式如下 关于x-delayed-type参数的理解 原本指定交换机类型的地方使用了x-delayed-message这个值那么这个交换机除了支持延迟消息之外到底是direct、fanout、topic这些类型中的哪一个呢 这里就额外使用x-delayed-type来指定交换机本身的类型 四、代码测试
1、生产者端代码
Test
public void testSendDelayMessage() {rabbitTemplate.convertAndSend(EXCHANGE_DELAY,ROUTING_KEY_DELAY,测试基于插件的延迟消息 [ new SimpleDateFormat(hh:mm:ss).format(new Date()) ],messageProcessor - {// 设置延迟时间以毫秒为单位messageProcessor.getMessageProperties().setHeader(x-delay, 10000);return messageProcessor;});
}2、消费者端代码
①情况A资源已创建
package com.atguigu.mq.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date; Component
Slf4j
public class MyDelayMessageListener {public static final String QUEUE_DELAY queue.delay.video;RabbitListener(queues {QUEUE_DELAY})public void process(String dataString, Message message, Channel channel) throws IOException { log.info([生产者] dataString);log.info([消费者] new SimpleDateFormat(hh:mm:ss).format(new Date()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}②情况B资源未创建
package com.atguigu.mq.listener; import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component; import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date; Component
Slf4j
public class MyDelayMessageListener { public static final String EXCHANGE_DELAY exchange.delay.video;public static final String ROUTING_KEY_DELAY routing.key.delay.video;public static final String QUEUE_DELAY queue.delay.video;RabbitListener(bindings QueueBinding( value Queue(value QUEUE_DELAY, durable true, autoDelete false), exchange Exchange( value EXCHANGE_DELAY, durable true, autoDelete false, type x-delayed-message, arguments Argument(name x-delayed-type, value direct)), key {ROUTING_KEY_DELAY} )) public void process(String dataString, Message message, Channel channel) throws IOException { log.info([生产者] dataString); log.info([消费者] new SimpleDateFormat(hh:mm:ss).format(new Date())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }3、执行效果
①交换机类型 ②生产者端效果
注意使用rabbitmq_delayed_message_exchange插件后即使消息成功发送到队列上也会导致returnedMessage()方法执行 ③消费者端效果 7. 事务消息 操作013事务消息之生产者端
一、测试代码
1、引入依赖
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.1.5/version
/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency
/dependencies2、yaml配置
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /3、主启动类
package com.atguigu.mq;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class RabbitMQProducerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args);}}4、相关配置
package com.atguigu.mq.config;import lombok.Data;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
Data
public class RabbitConfig {Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
}5、测试代码
package com.atguigu.mq.test;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
Slf4j
public class RabbitMQTest {public static final String EXCHANGE_NAME exchange.tx.dragon;public static final String ROUTING_KEY routing.key.tx.dragon;Resourceprivate RabbitTemplate rabbitTemplate;Testpublic void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, I am a dragon(tx msg ~~~01));// 2、抛出异常log.info(do bad: 10 / 0);// 3、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, I am a dragon(tx msg ~~~02));}}二、执行测试
1、未使用事务
抛出异常前的消息发送了抛异常后的消息没有发送 为了不影响后续操作我们直接在管理界面这里把这条消息消费掉 2、使用事务
①说明
因为在junit中给测试方法使用Transactional注解默认就会回滚所以回滚操作需要使用RollBack注解操控
②测试提交事务的情况
Test
Transactional
Rollback(value false)
public void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, I am a dragon(tx msg [commit] ~~~01));// 2、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, I am a dragon(tx msg [commit] ~~~02));
}③测试回滚事务的情况
Test
Transactional
Rollback(value true)
public void testSendMessageInTx() {// 1、发送第一条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, I am a dragon(tx msg [rollback] ~~~01));// 2、发送第二条消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, I am a dragon(tx msg [rollback] ~~~02));
}8. 惰性队列
操作014惰性队列
一、创建惰性队列
1、官网说明 队列可以创建为默认或惰性模式模式指定方式是
使用队列策略建议设置queue.declare参数
如果策略和队列参数同时指定那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的那么只能通过删除队列再重新创建来修改。
2、基于策略方式设定 登录Docker容器
docker exec -it rabbitmq /bin/bash运行rabbitmqctl命令
rabbitmqctl set_policy Lazy ^lazy-queue$ {queue-mode:lazy} --apply-to queues命令解读 rabbitmqctl命令所在目录是/opt/rabbitmq/sbin该目录已配置到Path环境变量 set_policy是子命令表示设置策略 Lazy是当前要设置的策略名称是我们自己自定义的不是系统定义的 ^lazy-queue$是用正则表达式限定的队列名称凡是名称符合这个正则表达式的队列都会应用这里的设置 {“queue-mode”:“lazy”}是一个JSON格式的参数设置指定了队列的模式为lazy –-apply-to参数指定该策略将应用于队列queues级别 命令执行后所有名称符合正则表达式的队列都会应用指定策略包括未来新创建的队列
如果需要修改队列模式可以执行如下命令不必删除队列再重建
rabbitmqctl set_policy Lazy ^lazy-queue$ {queue-mode:default} --apply-to queues3、在声明队列时使用参数设定
参数名称x-queue-mode可用参数值 defaultlazy 不设置就是取值为default
Java代码原生API设置方式
MapString, Object args new HashMapString, Object();
args.put(x-queue-mode, lazy);
channel.queueDeclare(myqueue, false, false, false, args);Java代码注解设置方式
Queue(value QUEUE_NAME, durable true, autoDelete false, arguments {Argument(name x-queue-mode, value lazy)
})二、实操演练
1、生产者端代码
①配置POM parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.1.5/version/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency/dependencies②配置YAML
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /③主启动类
package com.atguigu.mq;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class RabbitMQLazyProducer {public static void main(String[] args) {SpringApplication.run(RabbitMQLazyProducer.class, args);}}④发送消息
package com.atguigu.mq.test;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
public class RabbitMQTest {public static final String EXCHANGE_LAZY_NAME exchange.atguigu.lazy;public static final String ROUTING_LAZY_KEY routing.key.atguigu.lazy;Resourceprivate RabbitTemplate rabbitTemplate;Testpublic void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_LAZY_NAME, ROUTING_LAZY_KEY, I am a message for test lazy queue.);}}2、消费者端代码
①配置POM parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.1.5/version/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency/dependencies②配置YAML
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /③主启动类
package com.atguigu.mq;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class RabbitMQLazyConsumerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQLazyConsumerMainType.class, args);}}④监听器
package com.atguigu.mq.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;Component
Slf4j
public class MyLazyMessageProcessor {public static final String EXCHANGE_LAZY_NAME exchange.atguigu.lazy;public static final String ROUTING_LAZY_KEY routing.key.atguigu.lazy;public static final String QUEUE_LAZY_NAME queue.atguigu.lazy;RabbitListener(bindings QueueBinding(value Queue(value QUEUE_LAZY_NAME, durable true, autoDelete false, arguments {Argument(name x-queue-mode, value lazy)}),exchange Exchange(value EXCHANGE_LAZY_NAME, durable true, autoDelete false),key {ROUTING_LAZY_KEY}))public void processMessageLazy(String data, Message message, Channel channel) {log.info(消费端接收到消息 data);}}三、测试 先启动消费端 基于消费端RabbitListener注解中的配置自动创建了队列 发送消息
9. 优先级队列
操作015优先级队列
一、创建相关资源
1、创建交换机
exchange.test.priority 2、创建队列
queue.test.priority
x-max-priority 3、队列绑定交换机 二、生产者发送消息
1、配置POM
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.1.5/version
/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency
/dependencies2、配置YAML
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /3、主启动类
package com.atguigu.mq;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class RabbitMQPriorityProducer {public static void main(String[] args) {SpringApplication.run(RabbitMQPriorityProducer.class, args);}}4、发送消息
不要启动消费者程序让多条不同优先级的消息滞留在队列中第一次发送优先级为1的消息第二次发送优先级为2的消息第三次发送优先级为3的消息先发送的消息优先级低后发送的消息优先级高将来看看消费端是不是先收到优先级高的消息
①第一次发送优先级为1的消息
package com.atguigu.mq.test;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
public class RabbitMQTest {public static final String EXCHANGE_PRIORITY exchange.test.priority;public static final String ROUTING_KEY_PRIORITY routing.key.test.priority;Resourceprivate RabbitTemplate rabbitTemplate;Testpublic void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, I am a message with priority 1., message-{message.getMessageProperties().setPriority(1);return message;});}}②第二次发送优先级为2的消息
Test
public void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, I am a message with priority 2., message-{message.getMessageProperties().setPriority(2);return message;});
}③第三次发送优先级为3的消息
Test
public void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, I am a message with priority 3., message-{message.getMessageProperties().setPriority(3);return message;});
}三、消费端接收消息
1、配置POM
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.1.5/version
/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency
/dependencies2、配置YAML
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /3、主启动类
package com.atguigu.mq;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication
public class RabbitMQPriorityConsumer {public static void main(String[] args) {SpringApplication.run(RabbitMQPriorityConsumer.class, args);}}4、监听器
package com.atguigu.mq.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;Slf4j
Component
public class MyMessageProcessor {public static final String QUEUE_PRIORITY queue.test.priority;RabbitListener(queues {QUEUE_PRIORITY})public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {log.info(data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}5、测试效果
对于已经滞留服务器的消息只要消费端一启动就能够收到消息队列的投递打印效果如下 集群篇
1. 工作机制 2. 集群搭建
操作016集群搭建
一、安装RabbitMQ
1、前置要求
CentOS发行版的版本≥CentOS 8 Stream
镜像下载地址https://mirrors.163.com/centos/8-stream/isos/x86_64/CentOS-Stream-8-20240318.0-x86_64-dvd1.iso
RabbitMQ安装方式官方指南 2、安装Erlang环境
①创建yum库配置文件
vim /etc/yum.repos.d/rabbitmq.repo②加入配置内容
以下内容来自官方文档https://www.rabbitmq.com/docs/install-rpm In /etc/yum.repos.d/rabbitmq.repoZero dependency Erlang RPM[modern-erlang]
namemodern-erlang-el8uses a Cloudsmith mirror yum.novemberain.com in addition to its Cloudsmith upstream.Unlike Cloudsmith, the mirror does not have any traffic quotas
baseurlhttps://yum1.novemberain.com/erlang/el/8/$basearchhttps://yum2.novemberain.com/erlang/el/8/$basearchhttps://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/$basearch
repo_gpgcheck1
enabled1
gpgkeyhttps://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key
gpgcheck1
sslverify1
sslcacert/etc/pki/tls/certs/ca-bundle.crt
metadata_expire300
pkg_gpgcheck1
autorefresh1
typerpm-md[modern-erlang-noarch]
namemodern-erlang-el8-noarchuses a Cloudsmith mirror yum.novemberain.com.Unlike Cloudsmith, it does not have any traffic quotas
baseurlhttps://yum1.novemberain.com/erlang/el/8/noarchhttps://yum2.novemberain.com/erlang/el/8/noarchhttps://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/noarch
repo_gpgcheck1
enabled1
gpgkeyhttps://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.keyhttps://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
gpgcheck1
sslverify1
sslcacert/etc/pki/tls/certs/ca-bundle.crt
metadata_expire300
pkg_gpgcheck1
autorefresh1
typerpm-md[modern-erlang-source]
namemodern-erlang-el8-sourceuses a Cloudsmith mirror yum.novemberain.com.Unlike Cloudsmith, it does not have any traffic quotas
baseurlhttps://yum1.novemberain.com/erlang/el/8/SRPMShttps://yum2.novemberain.com/erlang/el/8/SRPMShttps://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/SRPMS
repo_gpgcheck1
enabled1
gpgkeyhttps://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.keyhttps://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
gpgcheck1
sslverify1
sslcacert/etc/pki/tls/certs/ca-bundle.crt
metadata_expire300
pkg_gpgcheck1
autorefresh1RabbitMQ Server[rabbitmq-el8]
namerabbitmq-el8
baseurlhttps://yum2.novemberain.com/rabbitmq/el/8/$basearchhttps://yum1.novemberain.com/rabbitmq/el/8/$basearchhttps://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/$basearch
repo_gpgcheck1
enabled1Cloudsmiths repository key and RabbitMQ package signing key
gpgkeyhttps://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.keyhttps://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
gpgcheck1
sslverify1
sslcacert/etc/pki/tls/certs/ca-bundle.crt
metadata_expire300
pkg_gpgcheck1
autorefresh1
typerpm-md[rabbitmq-el8-noarch]
namerabbitmq-el8-noarch
baseurlhttps://yum2.novemberain.com/rabbitmq/el/8/noarchhttps://yum1.novemberain.com/rabbitmq/el/8/noarchhttps://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/noarch
repo_gpgcheck1
enabled1Cloudsmiths repository key and RabbitMQ package signing key
gpgkeyhttps://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.keyhttps://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
gpgcheck1
sslverify1
sslcacert/etc/pki/tls/certs/ca-bundle.crt
metadata_expire300
pkg_gpgcheck1
autorefresh1
typerpm-md[rabbitmq-el8-source]
namerabbitmq-el8-source
baseurlhttps://yum2.novemberain.com/rabbitmq/el/8/SRPMShttps://yum1.novemberain.com/rabbitmq/el/8/SRPMShttps://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/SRPMS
repo_gpgcheck1
enabled1
gpgkeyhttps://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key
gpgcheck0
sslverify1
sslcacert/etc/pki/tls/certs/ca-bundle.crt
metadata_expire300
pkg_gpgcheck1
autorefresh1
typerpm-md③更新yum库
–nobest表示所需安装包即使不是最佳选择也接受
yum update -y --nobest④正式安装Erlang
yum install -y erlang3、安装RabbitMQ 导入GPG密钥
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key下载 RPM 包
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server-3.13.0-1.el8.noarch.rpm安装
rpm -ivh rabbitmq-server-3.13.0-1.el8.noarch.rpm4、RabbitMQ基础配置 启用管理界面插件
rabbitmq-plugins enable rabbitmq_management启动 RabbitMQ 服务
systemctl start rabbitmq-server将 RabbitMQ 服务设置为开机自动启动
systemctl enable rabbitmq-server新增登录账号密码
rabbitmqctl add_user atguigu 123456设置登录账号权限
rabbitmqctl set_user_tags atguigu administrator
rabbitmqctl set_permissions -p / atguigu .* .* .*配置所有稳定功能 flag 启用
rabbitmqctl enable_feature_flag all重启RabbitMQ服务生效
systemctl restart rabbitmq-server5、收尾工作
rm -rf /etc/yum.repos.d/rabbitmq.repo二、克隆VMWare虚拟机
1、目标
通过克隆操作一共准备三台VMWare虚拟机
集群节点名称虚拟机 IP 地址node01192.168.200.100node02192.168.200.150node03192.168.200.200
2、克隆虚拟机 3、给新机设置 IP 地址
在CentOS 7中可以使用nmcli命令行工具修改IP地址。以下是具体步骤
查看网络连接信息
nmcli con show停止指定的网络连接将connection_name替换为实际的网络连接名称
nmcli con down connection_name修改IP地址将connection_name替换为实际的网络连接名称将new_ip_address替换为新的IP地址将subnet_mask替换为子网掩码将gateway替换为网关 new_ip_address/subnet_mask这里是 CIDR 表示法
nmcli con mod connection_name ipv4.addresses new_ip_address/subnet_mask
nmcli con mod connection_name ipv4.gateway gateway
nmcli con mod connection_name ipv4.method manual启动网络连接
nmcli con up connection_name验证新的IP地址是否生效
ip addr show4、修改主机名称
主机名称会被RabbitMQ作为集群中的节点名称后面会用到所以需要设置一下。
修改方式如下
vim /etc/hostname5、保险措施
为了在后续操作过程中万一遇到操作失误友情建议拍摄快照。
三、集群节点彼此发现
1、node01设置
①设置 IP 地址到主机名称的映射
修改文件/etc/hosts追加如下内容
192.168.200.100 node01
192.168.200.150 node02
192.168.200.200 node03②查看当前RabbitMQ节点的Cookie值并记录
[rootnode01 ~] cat /var/lib/rabbitmq/.erlang.cookie
NOTUPTIZIJONXDWWQPOJ③重置节点应用
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app2、node02设置
①设置 IP 地址到主机名称的映射
修改文件/etc/hosts追加如下内容
192.168.200.100 node01
192.168.200.150 node02
192.168.200.200 node03②修改当前RabbitMQ节点的Cookie值
node02和node03都改成和node01一样
vim /var/lib/rabbitmq/.erlang.cookie③重置节点应用并加入集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbitnode01
rabbitmqctl start_app3、node03设置
①设置 IP 地址到主机名称的映射
修改文件/etc/hosts追加如下内容
192.168.200.100 node01
192.168.200.150 node02
192.168.200.200 node03②修改当前RabbitMQ节点的Cookie值
node02和node03都改成和node01一样
vim /var/lib/rabbitmq/.erlang.cookie③重置节点应用并加入集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbitnode01
rabbitmqctl start_app④查看集群状态
rabbitmqctl cluster_status4、附录
如有需要踢出某个节点则按下面操作执行 被踢出的节点
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app节点1
rabbitmqctl forget_cluster_node rabbitnode02四、负载均衡Management UI
1、说明
其实访问任何一个RabbitMQ实例的管理界面都是对集群操作所以配置负载均衡通过统一入口访问在我们学习期间就是锦上添花先给管理界面做负载均衡然后方便我们在管理界面上创建交换机、队列等操作
2、安装HAProxy
yum install -y haproxy
haproxy -v
systemctl start haproxy
systemctl enable haproxy3、修改配置文件
配置文件位置 /etc/haproxy/haproxy.cfg 在配置文件末尾增加如下内容 frontend rabbitmq_ui_frontend bind 192.168.200.100:22222 mode http default_backend rabbitmq_ui_backend backend rabbitmq_ui_backend mode http balance roundrobin option httpchk GET / server rabbitmq_ui1 192.168.200.100:15672 check server rabbitmq_ui2 192.168.200.150:15672 check server rabbitmq_ui3 192.168.200.200:15672 check 设置SELinux策略允许HAProxy拥有权限连接任意端口
setsebool -P haproxy_connect_any1SELinux是Linux系统中的安全模块它可以限制进程的权限以提高系统的安全性。在某些情况下SELinux可能会阻止HAProxy绑定指定的端口这就需要通过设置域domain的安全策略来解决此问题。 通过执行setsebool -P haproxy_connect_any1命令您已经为HAProxy设置了一个布尔值允许HAProxy连接到任意端口。这样HAProxy就可以成功绑定指定的socket并正常工作。 重启HAProxy
systemctl restart haproxy4、测试效果 五、负载均衡核心功能
1、增加配置 frontend rabbitmq_frontend bind 192.168.200.100:11111 mode tcp default_backend rabbitmq_backend backend rabbitmq_backend mode tcp balance roundrobin server rabbitmq1 192.168.200.100:5672 check server rabbitmq2 192.168.200.150:5672 check server rabbitmq3 192.168.200.200:5672 check 重启HAProxy服务
systemctl restart haproxy3、测试
①创建组件
交换机exchange.cluster.test队列queue.cluster.test路由键routing.key.cluster.test
②创建生产者端程序
[1]配置POM
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.1.5/version
/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency
/dependencies[2]主启动类
package com.atguigu.mq; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; SpringBootApplication
public class RabbitMQProducerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args);}}[3]配置YAML
spring:rabbitmq:host: 192.168.200.100port: 11111username: atguigupassword: 123456virtual-host: /publisher-confirm-type: CORRELATED 交换机的确认publisher-returns: true 队列的确认
logging:level:com.atguigu.mq.config.MQProducerAckConfig: info[4]配置类
package com.atguigu.mq.config;import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;Configuration
Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{Autowiredprivate RabbitTemplate rabbitTemplate;PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info(消息发送到交换机成功数据 correlationData);} else {log.info(消息发送到交换机失败数据 correlationData 原因 cause);}}Overridepublic void returnedMessage(ReturnedMessage returned) {log.info(消息主体: new String(returned.getMessage().getBody()));log.info(应答码: returned.getReplyCode());log.info(描述 returned.getReplyText());log.info(消息使用的交换器 exchange : returned.getExchange());log.info(消息使用的路由键 routing : returned.getRoutingKey());}
}[5] Junit测试类
package com.atguigu.mq.test;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;SpringBootTest
public class RabbitMQTest {Resourceprivate RabbitTemplate rabbitTemplate;public static final String EXCHANGE_CLUSTER_TEST exchange.cluster.test;public static final String ROUTING_KEY_CLUSTER_TEST routing.key.cluster.test;Testpublic void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_CLUSTER_TEST, ROUTING_KEY_CLUSTER_TEST, message test cluster~~~);}}③创建消费端程序
[1]配置POM
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.1.5/version
/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency
/dependencies[2]主启动类
package com.atguigu.mq; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; SpringBootApplication
public class RabbitMQProducerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args);}}[3]配置YAML
spring:rabbitmq:host: 192.168.200.100port: 11111username: atguigupassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual
logging:level:com.atguigu.mq.listener.MyProcessor: info[4]监听器
package com.atguigu.mq.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;Component
Slf4j
public class MyProcessor {RabbitListener(queues {queue.cluster.test})public void processNormalQueueMessage(String data, Message message, Channel channel) throws IOException {log.info(消费端 data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}[5]运行效果 六、镜像队列
1、提出问题
现在我们创建过的队列它是属于节点1的 现在我们停掉节点1的rabbit应用 停止rabbit应用
rabbitmqctl stop_app再次发送消息 为了后续操作再重新启动rabbit应用
rabbitmqctl start_app2、创建策略使队列镜像化 3、创建新的队列
要求队列名称必须符合策略中指定的正则表达式 绑定交换机 4、测试
节点1关闭rabbit应用 然后就发现两个镜像队列自动分布到了节点2和节点3上 调整Java代码中的组件名称
public static final String EXCHANGE_CLUSTER_TEST exchange.cluster.test;
public static final String ROUTING_KEY_MIRROR_TEST routing.key.mirror.test;
public static final String QUEUE_MIRROR_TEST mirror.queue.test;3. 负载均衡 4. 仲裁队列
操作017仲裁队列
一、创建仲裁队列 说明鉴于仲裁队列的功能肯定是需要在前面集群的基础上操作 1、创建交换机
和仲裁队列绑定的交换机没有特殊我们还是创建一个direct交换机即可
交换机名称exchange.quorum.test 2、创建仲裁队列
队列名称queue.quorum.test 3、绑定交换机
路由键routing.key.quorum.test 二、测试仲裁队列
1、常规测试
像使用经典队列一样发送消息、消费消息
①生产者端
public static final String EXCHANGE_QUORUM_TEST exchange.quorum.test;
public static final String ROUTING_KEY_QUORUM_TEST routing.key.quorum.test;Test
public void testSendMessageToQuorum() {rabbitTemplate.convertAndSend(EXCHANGE_QUORUM_TEST, ROUTING_KEY_QUORUM_TEST, message test quorum ~~~);
}②消费者端
public static final String QUEUE_QUORUM_TEST queue.quorum.test;RabbitListener(queues {QUEUE_QUORUM_TEST})
public void quorumMessageProcess(String data, Message message, Channel channel) throws IOException {log.info(消费端 data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}2、高可用测试
①停止某个节点的rabbit应用 停止rabbit应用
rabbitmqctl stop_app②查看仲裁队列对应的节点情况 ③再次发送消息
收发消息仍然正常
5. 流式队列 操作018Stream Queue
一、启用插件 说明只有启用了Stream插件才能使用流式队列的完整功能 在集群每个节点中依次执行如下操作 启用Stream插件
rabbitmq-plugins enable rabbitmq_stream重启rabbit应用
rabbitmqctl stop_app
rabbitmqctl start_app查看插件状态
rabbitmq-plugins list二、负载均衡
在文件/etc/haproxy/haproxy.cfg末尾追加
frontend rabbitmq_stream_frontend
bind 192.168.200.100:33333
mode tcp
default_backend rabbitmq_stream_backendbackend rabbitmq_stream_backend
mode tcp
balance roundrobin
server rabbitmq1 192.168.200.100:5552 check
server rabbitmq2 192.168.200.150:5552 check
server rabbitmq3 192.168.200.200:5552 check三、Java代码
1、引入依赖 Stream 专属 Java 客户端官方网址https://github.com/rabbitmq/rabbitmq-stream-java-client Stream 专属 Java 客户端官方文档网址https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/
dependenciesdependencygroupIdcom.rabbitmq/groupIdartifactIdstream-client/artifactIdversion0.15.0/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.30/version/dependencydependencygroupIdch.qos.logback/groupIdartifactIdlogback-classic/artifactIdversion1.2.3/version/dependency
/dependencies2、创建Stream 说明不需要创建交换机 ①代码方式创建
Environment environment Environment.builder().host(192.168.200.100).port(33333).username(atguigu).password(123456).build();environment.streamCreator().stream(stream.atguigu.test2).create();environment.close();②ManagementUI创建 3、生产者端程序
①内部机制说明
[1]官方文档 Internally, the Environment will query the broker to find out about the topology of the stream and will create or re-use a connection to publish to the leader node of the stream. 翻译 在内部Environment将查询broker以了解流的拓扑结构并将创建或重用连接以发布到流的 leader 节点。 [2]解析
在 Environment 中封装的连接信息仅负责连接到 brokerProducer 在构建对象时会访问 broker 拉取集群中 Leader 的连接信息将来实际访问的是集群中的 Leader 节点Leader 的连接信息格式是节点名称:端口号 [3]配置
为了让本机的应用程序知道 Leader 节点名称对应的 IP 地址我们需要在本地配置 hosts 文件建立从节点名称到 IP 地址的映射关系 ②示例代码
Environment environment Environment.builder().host(192.168.200.100).port(33333).username(atguigu).password(123456).build();Producer producer environment.producerBuilder().stream(stream.atguigu.test).build();byte[] messagePayload hello rabbit stream.getBytes(StandardCharsets.UTF_8);CountDownLatch countDownLatch new CountDownLatch(1);producer.send(producer.messageBuilder().addData(messagePayload).build(),confirmationStatus - {if (confirmationStatus.isConfirmed()) {System.out.println([生产者端]the message made it to the broker);} else {System.out.println([生产者端]the message did not make it to the broker);}countDownLatch.countDown();});countDownLatch.await();producer.close();environment.close();4、消费端程序
Environment environment Environment.builder().host(192.168.200.100).port(33333).username(atguigu).password(123456).build();environment.consumerBuilder().stream(stream.atguigu.test).name(stream.atguigu.test.consumer).autoTrackingStrategy().builder().messageHandler((offset, message) - {byte[] bodyAsBinary message.getBodyAsBinary();String messageContent new String(bodyAsBinary);System.out.println([消费者端]messageContent messageContent Offset offset.offset());}).build();四、指定偏移量消费
1、偏移量 2、官方文档说明 The offset is the place in the stream where the consumer starts consuming from. The possible values for the offset parameter are the following: OffsetSpecification.first(): starting from the first available offset. If the stream has not been truncated, this means the beginning of the stream (offset 0).OffsetSpecification.last(): starting from the end of the stream and returning the last chunk of messages immediately (if the stream is not empty).OffsetSpecification.next(): starting from the next offset to be written. Contrary to OffsetSpecification.last(), consuming with OffsetSpecification.next() will not return anything if no-one is publishing to the stream. The broker will start sending messages to the consumer when messages are published to the stream.OffsetSpecification.offset(offset): starting from the specified offset. 0 means consuming from the beginning of the stream (first messages). The client can also specify any number, for example the offset where it left off in a previous incarnation of the application.OffsetSpecification.timestamp(timestamp): starting from the messages stored after the specified timestamp. Note consumers can receive messages published a bit before the specified timestamp. Application code can filter out those messages if necessary. 3、指定Offset消费
Environment environment Environment.builder().host(192.168.200.100).port(33333).username(atguigu).password(123456).build();CountDownLatch countDownLatch new CountDownLatch(1);Consumer consumer environment.consumerBuilder().stream(stream.atguigu.test).offset(OffsetSpecification.first()).messageHandler((offset, message) - {byte[] bodyAsBinary message.getBodyAsBinary();String messageContent new String(bodyAsBinary);System.out.println([消费者端]messageContent messageContent);countDownLatch.countDown();}).build();countDownLatch.await();consumer.close();4、对比
autoTrackingStrategy 方式始终监听Stream中的新消息狗狗看家忠于职守指定偏移量方式针对指定偏移量的消息消费之后就停止狗狗叼飞盘叼回来就完
6. 异地容灾
操作024Federation插件
一、简介
Federation插件的设计目标是使RabbitMQ在不同的Broker节点之间进行消息传递而无须建立集群。
它可以在不同的管理域中的Broker或集群间传递消息这些管理域可能设置了不同的用户和vhost也可能运行在不同版本的RabbitMQ和Erlang上。Federation基于AMQP 0-9-1协议在不同的Broker之间进行通信并且设计成能够容忍不稳定的网络连接情况。
二、Federation交换机
1、总体说明
各节点操作启用联邦插件下游操作 添加上游连接端点创建控制策略
2、准备工作
为了执行相关测试我们使用Docker创建两个RabbitMQ实例。
特别提示由于Federation机制的最大特点就是跨集群同步数据所以这两个Docker容器中的RabbitMQ实例不加入集群是两个独立的broker实例。
docker run -d \
--name rabbitmq-shenzhen \
-p 51000:5672 \
-p 52000:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USERguest \
-e RABBITMQ_DEFAULT_PASS123456 \
rabbitmq:3.13-managementdocker run -d \
--name rabbitmq-shanghai \
-p 61000:5672 \
-p 62000:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USERguest \
-e RABBITMQ_DEFAULT_PASS123456 \
rabbitmq:3.13-management3、启用联邦插件
在上游、下游节点中都需要开启。
Docker容器中的RabbitMQ已经开启了rabbitmq_federation还需要开启rabbitmq_federation_management
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_managementrabbitmq_federation_management插件启用后会在Management UI的Admin选项卡下看到 4、添加上游连接端点
在下游节点填写上游节点的连接信息 5、创建控制策略 6、测试
①测试计划
特别提示
普通交换机和联邦交换机名称要一致交换机名称要能够和策略正则表达式匹配上发送消息时两边使用的路由键也要一致队列名称不要求一致 ②创建组件
所在机房交换机名称路由键队列名称深圳机房上游federated.exchange.demorouting.key.demo.testqueue.normal.shenzhen上海机房下游federated.exchange.demorouting.key.demo.testqueue.normal.shanghai
创建组件后可以查看一下联邦状态连接成功的联邦状态如下 ③发布消息执行测试
在上游节点向交换机发布消息 看到下游节点接收到了消息 三、Federation队列
1、总体说明
Federation队列和Federation交换机的最核心区别就是
Federation Police作用在交换机上就是Federation交换机Federation Police作用在队列上就是Federation队列
2、创建控制策略 3、测试
①测试计划
上游节点和下游节点中队列名称是相同的只是下游队列中的节点附加了联邦策略而已
所在机房交换机路由键队列深圳机房上游exchange.normal.shenzhenrouting.key.normal.shenzhenfed.queue.demo上海机房下游————fed.queue.demo
②创建组件
上游节点都是常规操作此处省略。重点需要关注的是下游节点的联邦队列创建时需要指定相关参数
创建组件后可以查看一下联邦状态连接成功的联邦状态如下 ③执行测试
在上游节点向交换机发布消息 但此时发现下游节点中联邦队列并没有接收到消息这是为什么呢这里就体现出了联邦队列和联邦交换机工作逻辑的区别。
对联邦队列来说如果没有监听联邦队列的消费端程序它是不会到上游去拉取消息的
如果有消费端监听联邦队列那么首先消费联邦队列自身的消息如果联邦队列为空这时候才会到上游队列节点中拉取消息。
所以现在的测试效果需要消费端程序配合才能看到 操作025Shovel
一、启用Shovel插件
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management二、配置Shovel 三、测试
1、测试计划
节点交换机路由键队列深圳节点exchange.shovel.testexchange.shovel.testqueue.shovel.demo.shenzhen上海节点————queue.shovel.demo.shanghai
2、测试效果
①发布消息 ②源节点 ③目标节点