有哪些网站可以做设计比赛,个人做网络推广哪个网站好,下载android版本下载安装,win系统的wordpress目录 RabbitMQ认识概念使用场景优点AMQP协议JMS RabbitMQ安装安装elang安装RabbitMQ安装管理插件登录RabbitMQ消息队列的工作流程 RabbitMQ常用模型HelloWorld-基本消息模型生产者发送消息导包获取链接工具类消息的生产者 消费者消费消息模拟消费者手动签收消息 Work QueuesSen… 目录 RabbitMQ认识概念使用场景优点AMQP协议JMS RabbitMQ安装安装elang安装RabbitMQ安装管理插件登录RabbitMQ消息队列的工作流程 RabbitMQ常用模型HelloWorld-基本消息模型生产者发送消息导包获取链接工具类消息的生产者 消费者消费消息模拟消费者手动签收消息 Work QueuesSenderConsume1Consume2 订阅模型-FANOUT-广播SenderConsume1Consume2 订阅模型-Direct-定向SenderConsume1Consume2 订阅模型-Topic-通配符SenderConsume1Consume2 总结 SpringBoot集成RabbitMQ导包ymlconfigproducerconsumer RabbitMQ认识
概念
MQ全称为Message Queue即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ是由erlang语言开发基于AMQPAdvanced Message Queue Protocol高级消息队列协议协议实现的消息队列它是一种应用程序之间的通信方法消息队列在分布式系统开发中应用非常广泛。官方地址http://www.rabbitmq.com/
使用场景 优点
任务异步处理 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。丢进去由接收方分别异步处理
消除峰值 异步化提速(发消息)提高系统稳定性(多系统调用)服务解耦(5-10个服务)排序保证消除峰值 放入队列中不用马上都处理完有中间状态消息分发后可由多个订阅方分别异步处理
服务解耦 应用程序解耦合 MQ相当于一个中介生产方通过MQ与消费方交互它将应用程序进行解耦合。 将单体业务拆分为生产者消息队列和消费者
AMQP协议
AMQP是一套公开的消息队列协议最早在2003年被提出它旨在从协议层定义消息通信数据的标准格式 为的就是解决MQ市场上协议不统一的问题。RabbitMQ就是遵循AMQP标准协议开发的MQ服务。 其他PythonC#PHP也都能用
JMS
JMS是Java消息服务是java提供的一套消息服务API标准其目的是为所有的java应用程序提供统一的消息通信的标准类似java的 jdbc只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么不同jms是java语言专属的消息服务标准它是在api层定义标准并且只能用于java应用而AMQP是在协议层定义的标准是跨语言的。 只能Java用基本已经被摒弃
RabbitMQ安装
安装elang
otp_win64_20.2.exe 配置环境变量
安装RabbitMQ
rabbitmq-server-3.7.4.exe 可通过任务管理器或开始菜单启动或关闭服务
安装管理插件
安装rabbitMQ的管理插件方便在浏览器端管理RabbitMQ ,进入到RabbitMQ的sbin目录使用cmd执行命令 rabbitmq-plugins.bat enable rabbitmq_management 安装成功后重新启动RabbitMQ 开启可视化界面
重启MQ
登录RabbitMQ
进入浏览器输入http://localhost:15672初始账号和密码guest/guest
消息队列的工作流程 RabbitMQ常用模型
HelloWorld-基本消息模型
一个生产者与一个消费者
生产者发送消息
导包
dependencies!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactId!--和springboot2.0.5对应--version5.4.1/version/dependency
/dependencies获取链接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {/*** 建立与RabbitMQ的连接* return* throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory new ConnectionFactory();//设置服务地址factory.setHost(127.0.0.1);//端口和管理端端口15672不一样管理端是另外一台网页版的系统5672才是MQ本身factory.setPort(5672);//设置账号信息用户名、密码、vhostfactory.setVirtualHost(/);//集群的时候才用这个参数factory.setUsername(guest);factory.setPassword(guest);// 通过工程获取连接Connection connection factory.newConnection();return connection;}
}消息的生产者
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;//消息的生产者
public class Sender {public static final String HELLO_QUEUEhello_queue;public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//3.创建队列hello这里用默认的交换机/* String queue 队列的名字可自定义 boolean durable: 持久化 boolean exclusive是否独占大家都能用传falseboolean autoDelete: 用完即删关了就没了消费者还要拿所以传falseMapString, Object arguments没有其他要传的属性就传false */channel.queueDeclare(HELLO_QUEUE, true, false, false, null);String msg今天中午吃啥;//4.发送消息channel.basicPublish(, HELLO_QUEUE, null, msg.getBytes());channel.close();conn.close();}
}消费者消费消息
模拟消费者
import com.rabbitmq.client.*;
import java.io.IOException;//模拟消费者
public class Consume {public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//回调可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法Consumer callbacknew DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(consumerTag:consumerTag);// 消费者标识System.out.println(envelope:envelope);// 消息队列里面的一些公共属性System.out.println(消息内容new String(body));System.out.println(消费完成----------------);}};//3.监听队列/*queue :队列名字autoAck自动签收Consumer callback: 回调*/channel.basicConsume(Sender.HELLO_QUEUE,false,callback);}
}只要消费者不关生产者发一次消息消费者就自动监听消费一次消息
手动签收消息
import cn.itsource.utils.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;//模拟消费者
public class Consume {public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//回调可新建类实现Consumer接口或继承DefaultConsumer类或用匿名内部类覆写处理方法Consumer callbacknew DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(consumerTag:consumerTag);// 消费者标识System.out.println(envelope:envelope);// 消息队列里面的一些公共属性
// System.out.println(1/0);System.out.println(消息内容new String(body));System.out.println(消费完成----------------);//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);// 第二个参数为是否同时签收多个传false}};//3.监听队列/*queue :队列名字autoAck自动签收 签收不等于消费成功处理逻辑走完没有报错才算签收成功Consumer callback: 回调*/channel.basicConsume(Sender.HELLO_QUEUE,false,callback);}
}Work Queues 一个生产者与多个消费者。 默认轮询也可以改成能者多劳
Sender
//消息的生产者
/*如果有多个消费者监听同一个队列默认轮询*/
public class Sender {public static final String WORK_QUEUEwork_queue;public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//3.创建队列/*String queue 队列的名字boolean durable: 持久化boolean exclusive是否独占boolean autoDelete: 用完即删MapString, Object arguments*/channel.queueDeclare(WORK_QUEUE, true, false, false, null);String msg今天中午吃啥;//4.发送消息channel.basicPublish(, WORK_QUEUE, null, msg.getBytes());channel.close();conn.close();}
}Consume1
//模拟消费者
public class Consume1 {public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//同时处理的消息数量
// channel.basicQos(1);//回调Consumer callbacknew DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(consumerTag:consumerTag);System.out.println(envelope:envelope);//System.out.println(1/0);System.out.println(消息内容new String(body));
// try {
// Thread.sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }System.out.println(------------------------------------);//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(Sender.WORK_QUEUE,false,callback);}
}Consume2
//模拟消费者
public class Consume2 {public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//同时处理的消息数量
// channel.basicQos(1);//回调Consumer callbacknew DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(consumerTag:consumerTag);System.out.println(envelope:envelope);System.out.println(消息内容new String(body));
// try {
// Thread.sleep(10000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }System.out.println(------------------------------------);//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(Sender.WORK_QUEUE,false,callback);}
}订阅模型-FANOUT-广播 在广播模式下消息发送流程是这样的 1 可以有多个消费者 2 每个消费者有自己的queue队列 3 每个队列都要绑定到Exchange交换机 4 生产者发送的消息只能发送到交换机交换机来决定要发给哪个队列生产者无法决定。 5 交换机把消息发送给绑定过的所有队列 6 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
Sender
//消息的生产者
/*变化1.不创建 队列2.创建交换机3.给交换机发送消息*/
public class Sender {public static final String FANOUT_EXCHANGEfanout_exchange;public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//3.创建交换机/*exchange:交换机的名字type交换机的类型durable:是否持久化*/channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);String msg今天晚上吃啥;//4.发送消息channel.basicPublish(FANOUT_EXCHANGE, , null, msg.getBytes());channel.close();conn.close();}
}Consume1
//模拟消费者
/*1.创建队列2.队列绑定到交换机3.每个消费者要监听自己的队列*/
public class Consume1 {public static final String FANOUT_QUEUE1fanout_queue1;public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(FANOUT_QUEUE1, true, false, false, null);//绑定到交换机channel.queueBind(FANOUT_QUEUE1, Sender.FANOUT_EXCHANGE, );//回调Consumer callbacknew DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(consumerTag:consumerTag);System.out.println(envelope:envelope);//System.out.println(1/0);System.out.println(消息内容new String(body));System.out.println(------------------------------------);//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(FANOUT_QUEUE1,false,callback);}
}Consume2
//模拟消费者
/*1.创建队列2.队列绑定到交换机3.每个消费者要监听自己的队列*/
public class Consume2 {public static final String FANOUT_QUEUE2fanout_queue2;public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(FANOUT_QUEUE2, true, false, false, null);//绑定到交换机channel.queueBind(FANOUT_QUEUE2, Sender.FANOUT_EXCHANGE, );//回调Consumer callbacknew DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(consumerTag:consumerTag);System.out.println(envelope:envelope);//System.out.println(1/0);System.out.println(消息内容new String(body));System.out.println(------------------------------------);//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(FANOUT_QUEUE2,false,callback);}
}订阅模型-Direct-定向 把消息交给符合指定routing key 的队列 一堆或一个
Sender
//消息的生产者
/*变化1.交换机类型2.给交换机发送消息,指定 routing key*/
public class Sender {public static final String DIRECT_EXCHANGEdirect_exchange;public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//3.创建交换机/*exchange:交换机的名字type交换机的类型durable:是否持久化*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);String msg今天晚上吃啥;//4.发送消息channel.basicPublish(DIRECT_EXCHANGE, dept, null, msg.getBytes());channel.close();conn.close();}
}Consume1
//模拟消费者
/*1.指定routing key*/
public class Consume1 {public static final String DIRECT_QUEUE1direct_queue1;public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(DIRECT_QUEUE1, true, false, false, null);//绑定到交换机channel.queueBind(DIRECT_QUEUE1, Sender.DIRECT_EXCHANGE, emp.delete);//回调Consumer callbacknew DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(consumerTag:consumerTag);System.out.println(envelope:envelope);//System.out.println(1/0);System.out.println(消息内容new String(body));System.out.println(------------------------------------);//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(DIRECT_QUEUE1,false,callback);}
}Consume2
//模拟消费者
/*1.指定routing key*/
public class Consume2 {public static final String DIRECT_QUEUE2direct_queue2;public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(DIRECT_QUEUE2, true, false, false, null);//绑定到交换机channel.queueBind(DIRECT_QUEUE2, Sender.DIRECT_EXCHANGE, dept);//回调Consumer callbacknew DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(consumerTag:consumerTag);System.out.println(envelope:envelope);//System.out.println(1/0);System.out.println(消息内容new String(body));System.out.println(------------------------------------);//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(DIRECT_QUEUE2,false,callback);}
}订阅模型-Topic-通配符 Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符
Routingkey 一般都是有一个或多个单词组成多个单词之间以”.”分割例如 goods.insert
通配符规则 #匹配一个或多个词 *匹配不多不少恰好1个词
Sender
//消息的生产者
/*变化1.交换机类型2.给交换机发送消息,指定 routing key*/
public class Sender {public static final String TOPIC_EXCHANGEtopic_exchange;public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//3.创建交换机/*exchange:交换机的名字type交换机的类型durable:是否持久化*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);String msg今天晚上吃啥;//4.发送消息channel.basicPublish(TOPIC_EXCHANGE, user.insert.add.pubilsh, null, msg.getBytes());channel.close();conn.close();}
}Consume1
//模拟消费者
/*1.指定routing key*/
public class Consume1 {public static final String TOPIC_QUEUE1topic_queue1;public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(TOPIC_QUEUE1, true, false, false, null);//绑定到交换机/*#.1到多个单词*. 一个单词*/channel.queueBind(TOPIC_QUEUE1,Sender.TOPIC_EXCHANGE, user.#);//回调Consumer callbacknew DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(consumerTag:consumerTag);System.out.println(envelope:envelope);//System.out.println(1/0);System.out.println(消息内容new String(body));System.out.println(------------------------------------);//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(TOPIC_QUEUE1,false,callback);}
}Consume2
//模拟消费者
/*1.指定routing key*/
public class Consume2 {public static final String TOPIC_QUEUE2topic_queue2;public static void main(String[] args) throws Exception {//1.获取连接Connection conn ConnectionUtil.getConnection();//2.获取通道Channel channel conn.createChannel();//同时处理的消息数量channel.basicQos(1);//创建队列channel.queueDeclare(TOPIC_QUEUE2, true, false, false, null);//绑定到交换机/*#.1到多个单词*. 一个单词*/channel.queueBind(TOPIC_QUEUE2,Sender.TOPIC_EXCHANGE, email.*);//回调Consumer callbacknew DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(consumerTag:consumerTag);System.out.println(envelope:envelope);//System.out.println(1/0);System.out.println(消息内容new String(body));System.out.println(------------------------------------);//所有业务逻辑结束以后 手动签收channel.basicAck(envelope.getDeliveryTag(), false);}};//3.监听队列/*queue :队列名字autoAck自动签收 签收 不等于 消费成功Consumer callback: 回调*/channel.basicConsume(TOPIC_QUEUE2,false,callback);}
}总结
01_hello生产者 1.获取连接 2.获取通道 3.创建队列 4.发送消息消费者 1.获取连接 2.获取通道 3.监听队列 (并回调)02_workqueue 默认轮询 可修改(能者多劳)生产者 1.获取连接 2.获取通道 3.创建队列 4.发送消息消费者 1.获取连接 2.获取通道 3.监听队列 (并回调)03_fanout 广播 将消息交给所有绑定到交换机的队列(多个消费者都能收到)生产者 1.获取连接 2.获取通道 3.创建交换机 4.发送消息到交换机消费者 1.获取连接 2.获取通道 创建队列 绑定到交换机 3.监听队列 (并回调)04_direct 定向 把消息交给符合指定 routing key 的队列 一堆或一个生产者 1.获取连接 2.获取通道 3.创建交换机 4.发送消息到交换机消费者 1.获取连接 2.获取通道 创建队列 绑定到交换机 3.监听队列 (并回调)05_topic 通配符 把消息交给符合routing pattern (路由模式) 的队列 一堆或一个生产者 1.获取连接 2.获取通道 3.创建交换机 4.发送消息到交换机消费者 1.获取连接 2.获取通道 创建队列 绑定到交换机 3.监听队列 (并回调)
SpringBoot集成RabbitMQ
导包 propertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingproject.reporting.outputEncodingUTF-8/project.reporting.outputEncodingjava.version1.8/java.version/propertiesparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.0.5.RELEASE/version/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependency!--spirngboot集成rabbitmq--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency/dependenciesyml
server:port: 44000
spring:application:name: test‐rabbitmq‐producerrabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtualHost: /listener:simple:acknowledge-mode: manual #手动签收prefetch: 1 #消费者的消息并发处理数量#publisher-confirms: true #消息发送到交换机失败回调#publisher-returns: true #消息发送到队列失败回调template:mandatory: true # 必须设置成true 消息路由失败通知监听者而不是将消息丢弃config
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitMQConfig {public static final String EXCHANGE_SPRINGBOOTexchange_springboot;public static final String QUEUE1_SPRINGBOOTqueue1_springboot;public static final String QUEUE2_SPRINGBOOTqueue2_springboot;//创建一个交换机Beanpublic Exchange createExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_SPRINGBOOT).durable(true).build();}//创建两个队列Beanpublic Queue createQueue1(){return new Queue(QUEUE1_SPRINGBOOT,true);}Beanpublic Queue createQueue2(){return new Queue(QUEUE2_SPRINGBOOT,true);}//把交换机和队列绑定到一起Beanpublic Binding bind1(){return BindingBuilder.bind(createQueue1()).to(createExchange()).with(user.*).noargs();}Beanpublic Binding bind2(){return BindingBuilder.bind(createQueue2()).to(createExchange()).with(email.*).noargs();}//消费者 还原对象方式从MQ里取出json转为对象Bean(rabbitListenerContainerFactory)public RabbitListenerContainerFactory? rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setPrefetchCount(1);return factory;}//放到消息队列里面的转换转为json存进MQBeanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;}
}producer
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;SpringBootTest(classes App.class)
RunWith(SpringRunner.class)
public class Sender {AutowiredRabbitTemplate rabbitTemplate;Testpublic void test(){/*问题多系统之间 信息交互 传递对象解决方案转换为json存储实现1.fastjson 对象 - josn 作业2.重写转换器模式*/for (int i 0; i 5; i) {rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_SPRINGBOOT, email.save, new User(1L,文达));}System.out.println(消息发送完毕);}
}consumer
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;//消费者
Component
public class Consu {RabbitListener(queues {RabbitMQConfig.QUEUE1_SPRINGBOOT},containerFactory rabbitListenerContainerFactory)//用这个转换器接public void user(Payload User user, Channel channel, Message message) throws IOException {System.out.println(message);System.out.println(user队列user);//手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}RabbitListener(queues {RabbitMQConfig.QUEUE2_SPRINGBOOT})public void email(Payload User user,Channel channel,Message message ) throws IOException {System.out.println(message);System.out.println(email队列user);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}队列内容可传stringentity序列化对象json对象