在阿里巴巴做网站多少钱2019,wordpress如何去掉版权,百度云网盘资源搜索引擎入口,如何做网站图标文章目录 发布/订阅#xff08;Publish/Subscribe#xff09;交换机临时队列绑定总体代码示例 路由#xff08;Routing#xff09;绑定直连交换机多重绑定发送日志订阅总体代码示例 更多相关内容可查看 发布/订阅#xff08;Publish/Subscribe#xff09;
构建一个简单的… 文章目录 发布/订阅Publish/Subscribe交换机临时队列绑定总体代码示例 路由Routing绑定直连交换机多重绑定发送日志订阅总体代码示例 更多相关内容可查看 发布/订阅Publish/Subscribe
构建一个简单的日志系统
我们将通过构建一个简单的日志系统来说明这个模式。它将包含两个程序 – 第一个程序将发出日志消息第二个程序将接收并打印它们。在我们的日志系统中每个运行中的接收程序都将收到这些消息。这样我们就能够运行一个接收程序并将日志定向到磁盘同时我们也能够运行另一个接收程序并在屏幕上看到日志。
基本上发布的日志消息将被广播到所有的接收程序。
交换机
Rabbit中完整的消息模型:
生产者是发送消息的用户应用程序。队列是存储消息的缓冲区。消费者是接收消息的用户应用程序。
在RabbitMQ中消息模型的核心思想是生产者从不直接发送任何消息到队列。实际上很多时候生产者甚至不知道消息是否会被发送到任何队列。
相反生产者只能将消息发送到一个交换机。交换机是一个非常简单的东西。它一边从生产者接收消息另一边将它们推送到队列。交换机必须确切地知道如何处理收到的消息。它应该被附加到特定的队列吗它应该被附加到多个队列吗还是应该被丢弃。这些规则由交换机类型定义。
有几种可用的交换机类型direct、topic、headers和fanout。我们将专注于最后一种类型 – fanout。让我们创建一个这种类型的交换机并称其为logs
channel.exchangeDeclare(logs, fanout);fanout交换机非常简单。正如你可能从名称中猜到的那样它只是将接收到的所有消息广播到它所知道的所有队列。这正是我们的日志记录器所需要的。
列出交换机 要列出服务器上的交换机您可以运行非常有用的rabbitmqctl命令
sudo rabbitmqctl list_exchanges在这个列表中会有一些amq.*交换机和默认未命名交换机。这些是默认创建的,无需考虑
之前对交换机一无所知但仍然能够将消息发送到队列。这是因为我们使用的是默认交换通过空字符串来标识它。
回想一下之前发布消息的方式
channel.basicPublish(, hello, null, message.getBytes());第一个参数是交换机的名称。空字符串表示默认或无名交换机如果存在指定 routingKey 的队列则消息会被路由到该队列。
现在我们可以将消息发布到我们命名的交换机
channel.basicPublish(logs, , null, message.getBytes());这样我们将消息发布到名为 logs 的交换机中而不是默认的无名交换机。
临时队列
在之前我们使用的队列都有特定的名称。能够给队列命名对我们来说非常重要 - 我们需要将工作者指向同一个队列。在想要在生产者和消费者之间共享队列时给队列命名非常重要。
但是对于我们的日志记录器来说情况并非如此。我们希望收到所有日志消息而不仅仅是其中的一部分。我们也只对当前正在流动的消息感兴趣而不是旧消息。为了解决这个问题我们需要两件事情。
首先每当我们连接到 Rabbit 时我们都需要一个全新的空队列。为此我们可以创建一个具有随机名称的队列或者更好的是让服务器为我们选择一个随机的队列名称。其次一旦我们断开消费者连接队列应该自动删除。
在 Java 中当我们向 queueDeclare() 方法提供没有参数时我们创建一个非持久化、独占、自动删除的队列并且由服务器生成一个名称
String queueName channel.queueDeclare().getQueue();在这一点上queueName 包含一个随机的队列名称。例如它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg
绑定 我们已经创建了一个fanout 交换机和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的这种关系称为绑定
channel.queueBind(queueName, logs, );从现在开始logs 交换机将会将消息追加到我们的队列中。
列出绑定 您可以使用以下命令列出现有的绑定
rabbitmqctl list_bindings总体代码示例 发出日志消息的 producer 程序logsroutingKeyfanoutEmitLog.java
public class EmitLog {private static final String EXCHANGE_NAME logs;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {//指定交换机类型-fanoutchannel.exchangeDeclare(EXCHANGE_NAME, fanout);String message argv.length 1 ? info: Hello World! :String.join( , argv);//绑定交换机 发送消息到交换机EXCHANGE_NAME中channel.basicPublish(EXCHANGE_NAME, , null, message.getBytes(UTF-8));System.out.println( [x] Sent message );}}
}如果还没有队列绑定到交换机则消息将丢失 但这对我们来说没关系如果还没有消费者在获取我们可以安全地丢弃该消息。
代码为ReceiveLogs.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogs {private static final String EXCHANGE_NAME logs;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();//指定交换机类型-fanoutchannel.exchangeDeclare(EXCHANGE_NAME, fanout);String queueName channel.queueDeclare().getQueue();//绑定交换机跟队列channel.queueBind(queueName, EXCHANGE_NAME, );System.out.println( [*] Waiting for messages. To exit press CTRLC);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}
}如果你想将日志保存到文件中只需打开控制台并输入以下命令
java -cp $CP ReceiveLogs logs_from_rabbit.log如果你希望在屏幕上看到日志开启一个新的终端并运行
java -cp $CP ReceiveLogs当然要发出日志只需输入
java -cp $CP EmitLog使用 rabbitmqctl list_bindings 命令你可以验证代码实际上是否按我们想要的方式创建了绑定和队列。如果有两个 ReceiveLogs.java 程序正在运行你应该会看到类似如下的输出
sudo rabbitmqctl list_bindings
# Listing bindings ...
# logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# ...done.结果的解释很简单来自 exchange logs 的数据发送到两个带有服务器分配名称的队列中。这正是我们想要的。
路由Routing
绑定
在前面的示例中我们已经创建了绑定。你可能还记得 代码如下
channel.queueBind(queueName, EXCHANGE_NAME, );绑定是交换机和队列之间的关系。这可以简单地理解为队列对来自该交换机的消息感兴趣。
绑定可以接受一个额外的routingKey 参数。为了避免与basic_publish参数混淆我们将其称为绑定键。以下是我们如何使用键创建绑定的示例
channel.queueBind(queueName, EXCHANGE_NAME, black);绑定键的含义取决于交换机类型。我们先前使用的fanout 交换机简单地忽略了它的值。
直连交换机
在我们之前的教程中我们的日志系统将所有消息广播给所有消费者。我们希望扩展其功能以允许根据消息的严重性进行过滤。例如我们可能希望一个将日志消息写入磁盘的程序只接收关键错误而不浪费磁盘空间来记录警告或信息日志消息。
我们之前使用的是fanout 交换机它并没有提供太多的灵活性 - 它只能进行无脑广播。
相反我们将使用直连交换机。直连交换机背后的路由算法很简单 - 消息将被发送到绑定键与消息的路由键完全匹配的队列中。
为了说明这一点考虑以下设置 在这个设置中我们可以看到直连交换机 X 与两个绑定到它的队列。第一个队列绑定的绑定键是 orange而第二个队列有两个绑定一个绑定键为 black另一个为 green。
在这样的设置中使用路由键 orange发布到交换机的消息将被路由到队列 Q1。具有路由键 black 或 green的消息将发送到 Q2。所有其他消息将被丢弃。
多重绑定 将多个队列与相同的绑定键绑定是完全合法的。在我们的示例中我们可以添加一个在交换机 X 和队列 Q1 之间的绑定绑定键为black。在这种情况下直连交换机将像fanout 交换机一样行为将消息广播到所有匹配的队列。具有路由键 black 的消息将被传递到 Q1 和 Q2。
发送日志
我们将使用这个模型来构建我们的日志系统。与使用fanout 交换机不同我们将消息发送到一个直连交换机。我们将日志严重程度作为路由键提供。这样接收程序就能够选择它想要接收的严重程度。让我们先专注于发送日志。
和往常一样我们首先需要创建一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, direct);接下来我们准备发送一条消息
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());我们假设 ‘severity’ 可以是 info、warning 或 error 中的一个。
订阅
接收消息的方式与之前的教程类似只有一个例外 - 我们将为每个我们感兴趣的严重程度创建一个新的绑定。
String queueName channel.queueDeclare().getQueue();for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);
}总体代码示例 生产者类的代码EmitLogDirect.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class EmitLogDirect {private static final String EXCHANGE_NAME direct_logs;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {//指定交换机类型channel.exchangeDeclare(EXCHANGE_NAME, direct);//指定日志类型String severity getSeverity(argv);String message getMessage(argv);//发送日志channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes(UTF-8));System.out.println( [x] Sent severity : message );}}//..
}消费者代码为ReceiveLogsDirect.java
import com.rabbitmq.client.*;public class ReceiveLogsDirect {private static final String EXCHANGE_NAME direct_logs;public static void main(String[] argv) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();//指定交换机类型channel.exchangeDeclare(EXCHANGE_NAME, direct);//获取交换机随机名字String queueName channel.queueDeclare().getQueue();if (argv.length 1) {System.err.println(Usage: ReceiveLogsDirect [info] [warning] [error]);System.exit(1);}for (String severity : argv) {//指定日志类型channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println( [*] Waiting for messages. To exit press CTRLC);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received delivery.getEnvelope().getRoutingKey() : message );};channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}
}如果你只想将 ‘warning’ 和 ‘error’而不是 ‘info’日志消息保存到文件中只需打开控制台并输入以下命令
java -cp $CP ReceiveLogsDirect warning error logs_from_rabbit.log如果你想在屏幕上看到所有的日志消息打开一个新的终端并执行以下命令
java -cp $CP ReceiveLogsDirect info warning error
# [*] Waiting for logs. To exit press CTRLC例如要发出一个错误日志消息只需输入以下命令
java -cp $CP EmitLogDirect error Run. Run. Or it will explode.
# [x] Sent error:Run. Run. Or it will explode.