桂林什么公司做网站推广好,网站域名怎么登陆,七彩建设发展有限公司官方网站,阜平网站建设RabbitMQ Streams是一种持久复制数据结构#xff0c;可以完成与队列相同的任务#xff1a;它们缓冲来自生产者的消息#xff0c;这些消息由消费者读取。然而#xff0c;流与队列的区别在于两个重要方面#xff1a;消息的存储和消费方式。 Streams为仅追加的消息日志建模可以完成与队列相同的任务它们缓冲来自生产者的消息这些消息由消费者读取。然而流与队列的区别在于两个重要方面消息的存储和消费方式。 Streams为仅追加的消息日志建模这些消息可以重复读取直到过期。流始终是持久的和复制的。这种流行为的更技术性的描述是“非破坏性消费者语义”。 要从RabbitMQ中的流中读取消息一个或多个使用者订阅它并根据需要多次读取相同的消息。 流中的数据可以通过RabbitMQ客户端库或通过专用的二进制协议插件和关联的客户端使用。强烈建议使用后一个选项因为它提供对所有流特定功能的访问并提供尽可能最好的吞吐量性能。 现在您可能会问以下问题
那么流会取代队列吗我应该放弃使用队列吗
为了回答这些问题引入流不是为了取代队列而是为了补充队列。Streams为新的RabbitMQ用例开辟了许多机会这些用例在使用Streams的用例中进行了描述。 以下信息详细说明流的使用以及流的管理和维护操作。 您还应该查看流插件信息以了解有关使用二进制RabbitMQ stream协议的流的更多信息以及功能矩阵的流核心和流插件比较页面。
一、使用流的用例
开发流最初是为了涵盖现有队列类型无法提供或具有缺点的4个消息传递用例
1、大型扇形分叉
当想要将相同的消息传递给多个订阅者时用户当前必须为每个消费者绑定一个专用队列。如果使用者数量很大这可能会变得效率低下特别是在需要持久性和/或复制时。流将允许任意数量的使用者以非破坏性的方式消费来自同一队列的相同消息从而不需要绑定多个队列。流消费者还可以从副本中读取数据从而允许读取负载在集群中分布。
2、重放时间旅行
由于所有当前的RabbitMQ队列类型都具有破坏性消费行为即当消费者用完消息时将从队列中删除消息因此不可能重新读取已消费的消息。流将允许消费者在日志中的任何点连接并从那里读取。
3、吞吐量性能
没有任何持久队列类型能够提供可以与任何现有的基于日志的消息传递系统竞争的吞吐量。Streams的设计以性能为主要目标。
4、大量积压工作
大多数RabbitMQ队列被设计为收敛于空状态并因此进行了优化当给定队列上有数百万条消息时性能可能会更差。流旨在以有效的方式存储大量数据并将内存开销降至最低。
二、如何使用RabbitMQ Streams
可以指定可选队列和使用者参数的AMQP 0.9.1客户端库将能够将流用作常规AMQP 09.1队列。 就像队列一样必须首先声明流。
1、声明RabbitMQ Stream
要声明流请将x-queue-type队列参数设置为stream默认值为classic。此参数必须由客户端在声明时提供不能使用策略设置或更改它。这是因为策略定义或适用的策略可以动态更改但队列类型不能更改。必须在声明时指定。 下面的片段显示了如何使用AMQP 0.9.1 Java客户端创建流
ConnectionFactory factory new ConnectionFactory();
Connection connection factory.newConnection();
Channel channel connection.createChannel();
channel.queueDeclare(my-stream,true, // durablefalse, false, // not exclusive, not auto-deleteCollections.singletonMap(x-queue-type, stream)
);
使用设置为stream的x-queue-type参数声明队列将在每个配置的RabbitMQ节点上创建一个具有副本的流。流是仲裁系统因此强烈建议使用不均匀的集群大小。 流仍然是AMQP 0.9.1队列因此它可以在创建后绑定到任何交换就像任何其他RabbitMQ队列一样。 如果使用管理UI声明则必须使用队列类型下拉菜单指定流类型。 流支持3个额外的队列参数这些参数最好使用策略配置
x-max-length-bytes 设置流的最大大小以字节为单位。默认值未设置。
x-max-age
设置流的最长期限。默认值未设置。
x-stream-max-segment-size-bytes
单位字节。流在磁盘上被划分为固定大小的段文件。 此设置控制它们的大小。 默认值500000000 字节。
以下代码片段演示如何将流的最大大小设置为 20 GB并使用 100 MB 的段文件
MapString, Object arguments new HashMap();
arguments.put(x-queue-type, stream);
arguments.put(x-max-length-bytes, 20_000_000_000); // maximum stream size: 20 GB
arguments.put(x-stream-max-segment-size-bytes, 100_000_000); // size of segment files: 100 MB
channel.queueDeclare(my-stream,true, // durablefalse, false, // not exclusive, not auto-deletearguments
); 三、客户端操作
1、Consuming
由于流永远不会删除任何消息因此任何消费者都可以开始读取/消费 从日志中的任何一点。这由 x-stream-offset consumer 参数控制。 如果未指定消费者将从写入的下一个偏移量开始读取 添加到使用者启动后的日志中。支持以下值
first - 从日志中的第一条可用消息开始last - 从最后写入的消息“块”块 是流中使用的储运单位简单来说就是批次 由几到几千条消息组成的消息具体取决于入口next - 与不指定任何偏移量相同偏移量 - 一个数值指定要附加到日志的确切偏移量。 如果此偏移量不存在它将分别钳制到日志的开始或结束。时间戳 - 一个时间戳值指定要附加到日志的时间点。 它将限制到最接近的偏移量如果时间戳超出流的范围它将分别限制日志的开始或结束。 在 AMQP 0.9.1 中使用的时间戳是精度为 00 秒的 POSIX 时间即自 00-00-1970 0101 UTC 以来的秒数。 请注意使用者可以接收在指定时间戳之前发布的消息。间隔 - 一个字符串值指定相对于当前时间附加日志的时间间隔。使用与 x-max-age 相同的规范
以下代码片段演示如何使用第一个偏移量规范
span stylebackground-color:#232323span stylecolor:#e6e1dcchannel.basicQos(span stylecolor:#a5c261100/span); span stylecolor:#bc9458em// QoS must be specified/em/span
channel.basicConsume(span stylecolor:#a5c261my-stream/span,false,Collections.singletonMap(span stylecolor:#a5c261x-stream-offset/span, span stylecolor:#a5c261first/span), span stylecolor:#bc9458em// first offset specification/em/span(consumerTag, message) - {span stylecolor:#bc9458em// message processing/em/spanspan stylecolor:#bc9458em// .../em/spanchannel.basicAck(message.getEnvelope().getDeliveryTag(), false); span stylecolor:#bc9458em// ack is required/em/span},consumerTag - { });
/span/span
以下代码片段演示如何指定要从中消费的特定偏移量
span stylebackground-color:#232323span stylecolor:#e6e1dcchannel.basicQos(span stylecolor:#a5c261100/span); span stylecolor:#bc9458em// QoS must be specified/em/span
channel.basicConsume(span stylecolor:#a5c261my-stream/span,false,Collections.singletonMap(span stylecolor:#a5c261x-stream-offset/span, span stylecolor:#a5c2615000/span), span stylecolor:#bc9458em// offset value/em/span(consumerTag, message) - {span stylecolor:#bc9458em// message processing/em/spanspan stylecolor:#bc9458em// .../em/spanchannel.basicAck(message.getEnvelope().getDeliveryTag(), false); span stylecolor:#bc9458em// ack is required/em/span},consumerTag - { });
/span/span
以下代码片段演示如何指定要从中使用的特定时间戳
span stylebackground-color:#232323span stylecolor:#e6e1dcspan stylecolor:#bc9458em// an hour ago/em/span
span stylecolor:#da4939Date/span span stylecolor:#a5c261timestamp/span span stylecolor:#c26230new/span span stylecolor:#ffc66dDate/span(System.currentTimeMillis() - span stylecolor:#a5c26160/span * span stylecolor:#a5c26160/span * span stylecolor:#a5c2611_000/span)
channel.basicQos(span stylecolor:#a5c261100/span); span stylecolor:#bc9458em// QoS must be specified/em/span
channel.basicConsume(span stylecolor:#a5c261my-stream/span,false,Collections.singletonMap(span stylecolor:#a5c261x-stream-offset/span, timestamp), span stylecolor:#bc9458em// timestamp offset/em/span(consumerTag, message) - {span stylecolor:#bc9458em// message processing/em/spanspan stylecolor:#bc9458em// .../em/spanchannel.basicAck(message.getEnvelope().getDeliveryTag(), false); span stylecolor:#bc9458em// ack is required/em/span},consumerTag - { });
/span/span
2、其他流操作
以下操作的使用方式与经典队列和仲裁队列类似 但有些具有一些特定于队列的行为。
声明队列删除发布者确认消费订阅消费需要 QoS 要设置的预取。acks 作为一种信用机制来推进当前 消费者的偏移量。为使用者设置 QoS 预取消费者确认牢记 QoS 预取限制取消消费者 四、流的单个活动消费者功能
流的单个活动使用者是 RabbitMQ 3.11 及更高版本中提供的一项功能。 它在流上提供独占消费和消费连续性。 当共享同一流和名称的多个使用者实例启用单个活动使用者时这些实例中只有一个实例将同时处于活动状态因此将接收消息。 其他实例将处于空闲状态。
单一活跃使用者功能提供 2 个好处
消息按顺序处理一次只有一个使用者。保持消费连续性如果活动消费者停止或崩溃则该组中的消费者将接管。
五、超级流
超级流是一种通过将大型流划分为较小的流来横向扩展的方法。 它们与单个活动使用者集成以保留分区内的消息顺序。 超级流从 RabbitMQ 3.11 开始可用。
超级流是由单个常规流组成的逻辑流。 这是一种使用 RabbitMQ 流横向扩展发布和使用的方法将大型逻辑流划分为分区流将存储和流量拆分到多个集群节点上。
超级流仍然是一个逻辑实体由于客户端库的智能性应用程序将其视为一个“大”流。 超级流的拓扑基于 AMQP 0.9.1 模型即它们之间的交换、队列和绑定。
可以使用任何 AMQP 0.9.1 库或管理插件创建超级流的拓扑它需要创建直接交换即“分区”流并将它们绑定在一起。 不过使用 rabbitmq-streams add_super_stream 命令可能更容易。 以下是如何使用它来创建具有 3 个分区的发票超级流 span stylebackground-color:#232323span stylecolor:#e6e1dcrabbitmq-streams add_super_stream invoices --partitions 3
/span/span 使用 rabbitmq-streams add_super_stream --help 了解有关该命令的更多信息。
与单个流相比超级流增加了复杂性因此不应将其视为涉及流的所有用例的默认解决方案。 仅当您确定已达到单个流的限制时才考虑使用超级流。 六、功能比较常规队列与流
流不是传统意义上的真正队列因此不是 与 AMQP 0.9.1 队列语义非常一致。其他队列类型的许多功能 不支持支持并且永远不会由于队列类型的性质而支持。
可以使用常规队列的 AMQP 0.9.1 客户端库将能够使用流 只要它使用消费者确认。
由于流中许多功能是非破坏性的因此它们永远不会被流支持 读取语义。
功能矩阵 特征 经典 流 非持久性队列 是的 不 排他性 是的 不 每条消息的持久性 每条消息 总是 成员资格变更 自动 手动 TTL的 是的 否但请参阅保留期) 队列长度限制 是的 否但请参阅保留期) 懒惰行为 是的 固有 消息优先级 是的 不 消费者至上 是的 不 死信交换 是的 不 遵守政策 是的 请参阅保留期) 对内存警报做出反应 是的 否使用最少的 RAM 病毒邮件处理 不 不 全局 QoS 预取 是的 不
非持久性队列
根据其假定的用例流始终是持久的 它们不能像常规队列那样是非持久的。
排他性
根据其假定的用例流始终是持久的它们不能像常规队列那样是排他性的。 它们不应用作临时队列。
懒惰模式
写入消息后流将所有数据直接存储在磁盘上 在读取之前它不会使用任何内存。可以这么说流本质上是懒惰的。
全球服务质量
流不支持全局 QoS 预取其中通道设置单个 使用该通道的所有使用者的预取限制。如果尝试 从启用了全局 QoS 的通道中的流中使用 将返回通道错误。
使用每使用者 QoS 预取这是几个常用客户端中的默认设置。