部队网站模板,做自己卖东西的网站,柳州做网站的公司有哪些,目前最好的推广平台概念
从计算机术语层面来说#xff0c;RabbitMQ 模型更像是一种交换机模型。
Queue 队列
Queue#xff1a;队列#xff0c;是RabbitMQ 的内部对象#xff0c;用于存储消息。 RabbitMQ 中消息只能存储在队列中#xff0c;这一点和Kafka相反。Kafka将消息存储在topicRabbitMQ 模型更像是一种交换机模型。
Queue 队列
Queue队列是RabbitMQ 的内部对象用于存储消息。 RabbitMQ 中消息只能存储在队列中这一点和Kafka相反。Kafka将消息存储在topic主题这个逻辑层面而相对应的队列逻辑只是topic实际存储文件中的位移标识。
RabbitMQ不支持队列层面的广播消费。
交换器、路由键、绑定
交换器
Exchange 交换器生产者将消息发送到Exchange由交换器将消息路由到一个或者多个队列中如果路由不到则会返回给生产者或者丢弃消息。
类型
RabbitMQ 中的交换器有四种类型
direct径直的
它会吧消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中。
fanout扇出
它会把所有发送到该交换器的消息路由到所有与该路由器绑定的队列中。
topic模糊匹配
它与 direct 类型的交换器相似也是将消息路由到 BindingKey 和 RouingKey 相匹配的队列中但这里的匹配规则有些不同
RoutingKey 为一个点号“.” 分隔的字符串被点号“.”分隔开的每一段独立的字符串称为一个单词如 com.rabbitmq.client、java.util.concurrent;BindingKey 和 RoutingKey 一样也是点号分隔的字符串BindingKey 中可以存在两种特殊字符串 “*”“#”用于模糊匹配其中星号用于匹配一个单词“#”用于匹配零个和多个单词。
举个栗子
路由键为 “com.rabbitmq.client” 的消息会同时路由到 queue1 和 queue2路由键为 “com.hidden.client” 的消息会路由到 queue2中路由键为 “com.hidden.demo” 的消息会路由到 queue2中路由键为 “java.rabbitmq.demo”的消息会路由到 queue1中路由键为 “java.util.concurrent” 的消息将会被丢弃或者返回给生产者需要设置 mandatory 参数因为它没有匹配任何路由键 headers
headers 类型的交换器不依赖于路由键的匹配规则来路由消息而是根据发送的消息内容中 headers 属性进行匹配。headers 类型的交换器性能会很差而且也不实用基本上不会看到它的存在。
开发向导
RabbitMQ使用过程
生产者发送消息
连接 RabbitMQ Broker建立一个 Connection 开启 一个信道 Channel声明交换器并设置相关属性比如交换机类型、是否持久化等声明队列并设置相关属性如果是否排他、是否持久化、是否自动删除等通过 RoutingKey 将 Exchange 和 Queue 绑定发送消息到 RabbitMQ其中包括路由键、交换器等信息交换器根据路由键查找匹配的队列如果找到则将从生产者发送过的消息存入队列中如果没有找到则根据生产者配置的属性选择丢弃或者回退给生产者关闭信道 10.关闭连接
消费者接收消息
连接到 RabbitMq Broker建立一个 Connection开启一个信道 Channel向 RabbitMq Broker 请求消费对应队列的消息可能会设置想要的回调函数以及做一些准备工作接收消息消费者确认ack接收到的消息RabbitMQ 从队列中删除已被确认消费的消息关闭信道关闭连接
由使用过程发现无论生产者或者消费者都需要和 RabbitMQ Broker 建立连接这个连接Connection就是一条 TCP 连接一旦 TCP 连接建立起来客户端紧接着可以创建一个 AMQP 信道Channel每个信道都会被指派一个唯一ID。
信道是建立在 Connection 之上的虚拟连接RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。
使用交换器和队列
代码清单
channel.exchangeDeclare(exchangeName,direct,true);
String queueName channel.queueDeclare().getQueue();
// or 声明队列
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);exchangeDeclare 方法详解
exchangeDeclare 有多个重载方法这些重载方法都是由下面这个方法中缺省的某些参数构成的。
Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,MapString, Object arguments) throws IOException;参数说明
exchange交换器名称type交换器类型如direct、fanout、topic、headersdurable是否持久化持久化可以将交换器存盘在服务器重启的时候不会丢失相关消息autoDelete是否自动删除自动删除前提只有有一个队列或交换器与这个交换器绑定了之后所有与这个交换器绑定的都解绑。只有绑过到解绑的过程交换器才会删除没有绑过则不会删除internal是否内置的如果是内置交换器则客户端无法直接发送消息到这个交换器只能通过交换器路由到交换器这种方式argument 结构化参数比如 alternate-exchange等
queueDeclare 方法详解
Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,MapString, Object arguments) throws IOException;参数说明
queue队列名称durable是否持久化持久化的队列会存盘服务器重启的时候可以保证不丢失相关信息exclusive是否排他如果一个队列设置为排他则该队列仅对首次声明它的连接可见连接断开后自动删除排他是基于连接Connection可见的。其他Connection 不可声明同名排他队列而且即使排他队列设置了持久化一旦连接关闭或者客户端退出排他队列都会自动删除。autoDelete是否自动删除自动删除的前提是至少有一个消费者连接到这个队列之后所有与这个队列连接的消费者都断开后才会自动删除如果没有消费者连接过则不会删除arguments队列的其他一些参数如x-message-ttl、 x-expires、 x-max-length、 x-max-length-bytes、 x-dead-letter-exchange、 x-dead-letter-routing-key、 x-max-priority等
注意要点 生产者和消费者都能够使用 queueDeclare 来声明一个队列但是如果消费者在同一个信道上订阅了另一个队列就无法再声明队列了。必须先取消订阅然后将信道置为“传输”模式之后才能声明队列。
queueBind 方法详解
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;Queue.BindOk queueBind(String queue, String exchange, String routingKey, MapString, Object arguments) throws IOException;void queueBindNoWait(String queue, String exchange, String routingKey, MapString, Object arguments) throws IOException;
参数说明
queue队列名称exchange交换器名称routingKey用来绑定队列和交换器的路由键argument定义绑定的一些参数
不仅可以将队列和交换器绑定起来也可以将已经绑定的关系解除
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, MapString, Object arguments) throws IOException;exchangeBind 方法详解
不仅可以将交换器与队列绑定也可以将交换器与交换器进行绑定用法如出一辙。
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, MapString, Object arguments) throws IOException;
void exchangeBindNoWait(String destination, String source, String routingKey, MapString, Object arguments) throws IOException;