佛山论坛建站模板,温州自适应网站建设,怎样建设百度网站,如何做网站美工1、 环境准备
创建Virtual Hosts
虚拟主机#xff1a;类似于mysql中的database。他们都是以“/”开头 设置权限 2. 五种消息模型
RabbitMQ提供了6种消息模型#xff0c;但是第6种其实是RPC#xff0c;并不是MQ#xff0c;因此不予学习。那么也就剩下5种。
但是其实3、4…1、 环境准备
创建Virtual Hosts
虚拟主机类似于mysql中的database。他们都是以“/”开头 设置权限 2. 五种消息模型
RabbitMQ提供了6种消息模型但是第6种其实是RPC并不是MQ因此不予学习。那么也就剩下5种。
但是其实3、4、5这三种都属于订阅模型只不过进行路由的方式不同。 依赖
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcn.atguigu.rabbitmq/groupIdartifactIdatguigu-rabbitmq/artifactIdversion0.0.1-SNAPSHOT/versionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.3.0.RELEASE/version/parentpropertiesjava.version1.8/java.version/propertiesdependenciesdependencygroupIdorg.apache.commons/groupIdartifactId /artifactIdversion3.3.2/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependency/dependencies
/project我们抽取一个建立RabbitMQ连接的工具类方便其他程序获取连接
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class ConnectionUtil {public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory new ConnectionFactory();//设置服务地址factory.setHost(192.168.1.129);//端口factory.setPort(5672);//设置账号信息用户名、密码、vhostfactory.setVirtualHost(/zhenguo);factory.setUsername(anni);factory.setPassword(123456);// 通过工程获取连接Connection connection factory.newConnection();return connection;}
}2.1. 基本消息模型-simple
特点一个生产者一个消费者一个队列
2.1.1. 生产者发送消息
public class Send {private final static String QUEUE_NAME simple_queue;public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection ConnectionUtil.getConnection();// 从连接中创建通道这是完成大部分API的地方。Channel channel connection.createChannel();// 声明创建队列必须声明队列才能够发送消息我们可以把消息发送到队列中。/*** 参数1queue队列名称* 参数2durable是否持久化* 参数3exclusive队列是否为专用队列如果是专用队列断开连接后会自动删除* 参数4autoDelete队列长时间闲置时是否需要删除* 参数5arguments队列的其他参数*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息内容String message Hello World!;channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println( [x] Sent message );//关闭通道和连接channel.close();connection.close();}
}控制台 2.1.2. 管理工具 中查看消息
进入队列页面可以看到新建了一个队列simple_queue 点击队列名称进入详情页可以查看消息 在控制台查看消息并不会将消息消费所以消息还在。
2.1.3. 消费者获取消息
public class Recv {private final static String QUEUE_NAME simple_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 创建通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [x] received : msg !);}};// 监听队列第二个参数是否自动进行消息确认。阻塞等待获取队列中的消息channel.basicConsume(QUEUE_NAME, true, consumer);}
}控制台 这个时候队列中的消息就没了 我们发现消费者已经获取了消息但是程序没有停止一直在监听队列中是否有新的消息。一旦有新的消息进入队列就会立即打印.
2.1.4. 消息确认机制ACK
通过刚才的案例可以看出消息一旦被消费者接收队列中的消息就会被删除。
那么问题来了RabbitMQ怎么知道消息被接收了呢
如果消费者领取消息后还没执行操作就挂掉了呢或者抛出了异常消息消费失败但是RabbitMQ无从得知这样消息就丢失了
因此RabbitMQ有一个ACK机制。当消费者获取消息后会向RabbitMQ发送回执ACKAcknowledge character确认字符告知消息已经被接收。不过这种回执ACK分两种情况
自动ACK消息一旦被接收消费者自动发送ACK手动ACK消息接收后不会发送ACK需要手动调用
大家觉得哪种更好呢
这需要看消息的重要性
如果消息不太重要丢失也没有影响那么自动ACK会比较方便如果消息非常重要不容丢失。那么最好在消费完成后手动ACK否则接收消息后就自动ACKRabbitMQ就会把消息从队列中删除。如果此时消费者宕机那么消息就丢失了。
我们之前的测试都是自动ACK的如果要手动ACK需要改动我们的代码
public class Recv2 {private final static String QUEUE_NAME simple_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 创建通道final Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [x] received : msg !);// 手动进行ACKchannel.basicAck(envelope.getDeliveryTag(), false);}};// 监听队列第二个参数false手动进行ACKchannel.basicConsume(QUEUE_NAME, false, consumer);}
}注意到最后一行代码
channel.basicConsume(QUEUE_NAME, false, consumer);如果第二个参数为true则会自动进行ACK如果为false则需要手动ACK。方法的声明 2.1.4.1. 自动ACK存在的问题
修改消费者添加异常如下 生产者不做任何修改直接运行消息发送成功 运行消费者程序抛出异常。但是消息依然被消费 管理界面 2.1.4.2. 演示手动ACK
修改消费者把自动改成手动去掉之前制造的异常 生产者不变再次运行 运行消费者 但是查看管理界面发现 停掉消费者的程序发现 这是因为虽然我们设置了手动ACK但是代码中并没有进行消息确认所以消息并未被真正消费掉。
当我们关掉这个消费者消息的状态再次称为Ready
修改代码手动ACK 执行 消息消费成功
2.2. work消息模型
工作队列或者竞争消费者模式
特点多个消费者一个生产者一个队列 在第一篇教程中我们编写了一个程序从一个命名队列中发送并接受消息。在这里我们将创建一个工作队列在多个工作者之间分配耗时任务。
工作队列又称任务队列。主要思想就是避免执行资源密集型任务时必须等待它执行完成。相反我们稍后完成任务我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时任务将在他们之间共享但是一个消息只能被一个消费者获取。
这个概念在Web应用程序中特别有用因为在短的HTTP请求窗口中无法处理复杂的任务。
接下来我们来模拟这个流程
P生产者任务的发布者C1消费者领取任务并且完成任务假设完成速度较快C2消费者2领取任务并完成任务假设完成速度慢面试题避免消息堆积
1采用workqueue多个消费者监听同一队列。
2接收到消息以后而是通过线程池异步消费。
2.2.1. 生产者
生产者与案例1中的几乎一样
public class Send {private final static String QUEUE_NAME test_work_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循环发布任务for (int i 0; i 50; i) {// 消息内容String message task .. i;channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println( [x] Sent message );Thread.sleep(i * 2);}// 关闭通道和连接channel.close();connection.close();}
}不过这里我们是循环发送50条消息。
2.2.2. 消费者1
public class Recv1{private final static String QUEUE_NAME test_work_queue;
public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 创建通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg new String(body);System.out.println( [消费者1] received : msg !);channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(QUEUE_NAME, false ,consumer);
}
}2.2.3. 消费者2
public class Recv2{private final static String QUEUE_NAME test_work_queue;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 创建通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg new String(body);System.out.println( [消费者2] received : msg !);//模拟消耗时间try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(QUEUE_NAME, false ,consumer);}
}与消费者1基本类似就是没有设置消费耗时时间。
这里是模拟有些消费者快有些比较慢。
接下来两个消费者一同启动然后发送50条消息 可以发现两个消费者各自消费了25条消息而且各不相同这就实现了任务的分发。
2.2.4. 能者多劳
刚才的实现有问题吗
消费者2比消费者1的效率要低一次任务的耗时较长然而两人最终消费的消息数量是一样的消费者1大量时间处于空闲状态消费者2一直忙碌
现在的状态属于是把任务平均分配正确的做法应该是消费越快的人消费的越多。
怎么实现呢
在比较慢的消费者创建队列后我们可以使用basicQos方法参数prefetchCount 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说不要向工作人员发送新消息直到它处理并确认了前一个消息。 相反它会将其分派给空闲的下一个工作人员。(注意必须手动ack) 再次测试 2.3. 订阅模型分类
在之前的模式中我们创建了一个工作队列。 工作队列背后的假设是每个任务只被传递给一个工作人员。 在这一部分我们将做一些完全不同的事情 - 我们将会传递一个信息给多个消费者。 这种模式被称为“发布/订阅”。
订阅模型示意图 解读
1、1个生产者多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息经过交换机到达队列实现一个消息被多个消费者获取的目的
XExchanges交换机一方面接收生产者发送的消息。另一方面知道如何处理消息例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作取决于Exchange的类型。
Exchange类型有以下几种
Fanout广播将消息交给所有绑定到交换机的队列Direct定向把消息交给符合指定routing key 的队列 Topic通配符把消息交给符合routing pattern路由模式 的队列
我们这里先学习
Fanout广播模式Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定或者没有符合路由规则的队列那么消息会丢失
2.4. 订阅模型-Fanout
Fanout也称为广播。
流程图 在广播模式下消息发送流程是这样的
1 可以有多个消费者2 每个消费者有自己的queue队列3 每个队列都要绑定到Exchange交换机4 生产者发送的消息只能发送到交换机交换机来决定要发给哪个队列生产者无法决定。5 交换机把消息发送给绑定过的所有队列6 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
2.4.1. 生产者
两个变化
1 声明Exchange不再声明Queue2 发送消息到Exchange不再发送到Queue
public class Send {private final static String EXCHANGE_NAME fanout_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明exchange指定类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, fanout);// 消息内容String message Hello everyone;// 发布消息到Exchangechannel.basicPublish(EXCHANGE_NAME, , null, message.getBytes());System.out.println( [生产者] Sent message );channel.close();connection.close();}
}
2.4.2. 消费者1
public class Recv1 {private final static String QUEUE_NAME fanout_exchange_queue_1;private final static String EXCHANGE_NAME fanout_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, );// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者1] received : msg !);}};// 监听队列自动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}
要注意代码中队列需要和交换机绑定
2.4.3. 消费者2
public class Recv2 {private final static String QUEUE_NAME fanout_exchange_queue_2;private final static String EXCHANGE_NAME fanout_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, );// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者2] received : msg !);}};// 监听队列手动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}
2.4.4. 测试
我们运行两个消费者然后发送1条消息 应用场景文字直播
2.5. 订阅模型-Direct
有选择性的接收消息
在订阅模式中生产者发布消息所有消费者都可以获取所有消息。
在路由模式中我们将添加一个功能 - 我们将只能订阅一部分消息。 例如我们只能将重要的错误消息引导到日志文件以节省磁盘空间同时仍然能够在控制台上打印所有日志消息。
但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key
消息的发送方在向Exchange发送消息时也必须指定消息的routing key。 P生产者向Exchange发送消息发送消息时会指定一个routing key。
XExchange交换机接收生产者的消息然后把消息递交给 与routing key完全匹配的队列
C1消费者其所在队列指定了需要routing key 为 error 的消息
C2消费者其所在队列指定了需要routing key 为 info、error、warning 的消息
2.5.1. 生产者
此处我们模拟商品的增删改发送消息的RoutingKey分别是insert、update、delete
public class Send {private final static String EXCHANGE_NAME direct_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明exchange指定类型为directchannel.exchangeDeclare(EXCHANGE_NAME, direct);// 消息内容String message 商品新增了 id 1001;// 发送消息并且指定routing key 为insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, insert, null, message.getBytes());System.out.println( [商品服务] Sent message );channel.close();connection.close();}
}2.5.2. 消费者1
我们此处假设消费者1只接收两种类型的消息更新商品和删除商品。
public class Recv {private final static String QUEUE_NAME direct_exchange_queue_1;private final static String EXCHANGE_NAME direct_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机同时指定需要订阅的routing key。假设此处需要update和delete消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, update);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, delete);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者1] received : msg !);}};// 监听队列自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
2.5.3. 消费者2
我们此处假设消费者2接收所有类型的消息新增商品更新商品和删除商品。
public class Recv2 {private final static String QUEUE_NAME direct_exchange_queue_2;private final static String EXCHANGE_NAME direct_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机同时指定需要订阅的routing key。订阅 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, insert);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, update);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, delete);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者2] received : msg !);}};// 监听队列自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}2.5.4. 测试
我们分别发送增、删、改的RoutingKey发现结果 2.6. 订阅模型-Topic
Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符
Routingkey 一般都是有一个或多个单词组成多个单词之间以”.”分割例如 item.insert
通配符规则
#匹配一个或多个词*匹配恰好1个词举例
audit.#能够匹配audit.irs.corporate 或者 audit.irsaudit.*只能匹配audit.irs2.6.1. 生产者
使用topic类型的Exchange发送消息的routing key有3种 item.isnert、item.update、item.delete
public class Send {private final static String EXCHANGE_NAME topic_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明exchange指定类型为topicchannel.exchangeDeclare(EXCHANGE_NAME, topic);// 消息内容String message 新增商品 : id 1001;// 发送消息并且指定routing key 为insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, item.insert, null, message.getBytes());System.out.println( [商品服务] Sent message );channel.close();connection.close();}
}2.6.2. 消费者1
我们此处假设消费者1只接收两种类型的消息更新商品和删除商品
public class Recv {private final static String QUEUE_NAME topic_exchange_queue_1;private final static String EXCHANGE_NAME topic_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机同时指定需要订阅的routing key。需要 update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, item.update);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, item.delete);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者1] received : msg !);}};// 监听队列自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
2.6.3. 消费者2
我们此处假设消费者2接收所有类型的消息新增商品更新商品和删除商品。
/*** 消费者2*/
public class Recv2 {private final static String QUEUE_NAME topic_exchange_queue_2;private final static String EXCHANGE_NAME topic_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机同时指定需要订阅的routing key。订阅 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, item.*);// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者2] received : msg !);}};// 监听队列自动ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
2.7. 持久化
如何避免消息丢失
1 消费者的ACK机制。可以防止消费者丢失消息。
2 但是如果在消费者消费之前MQ就宕机了消息就没了。
是可以将消息进行持久化呢
要将消息持久化前提是队列、Exchange都持久化
2.7.1. 交换机持久化 2.7.2. 队列持久化 2.7.3. 消息持久化
MessageProperties使用rabbitmq包下的类