北京城建集团官网,成都seo网站建设,wordpress无法安装这个包,哪家网站开发公司好rabbitmq的stream#xff1a; 4、对比 autoTrackingstrategy方式:始终监听Stream中的新消息(狗狗看家#xff0c;忠于职守)指定偏移量方式:针对指定偏移量的消息消费之后就停止(狗狗叼飞盘#xff0c;回来就完) 这两种分别怎么写#xff1f;java 在 RabbitMQ 中#xff0c…rabbitmq的stream 4、对比 autoTrackingstrategy方式:始终监听Stream中的新消息(狗狗看家忠于职守)指定偏移量方式:针对指定偏移量的消息消费之后就停止(狗狗叼飞盘回来就完) 这两种分别怎么写java 在 RabbitMQ 中Stream 是一种新的消息传递机制允许高吞吐量和低延迟的消息传递。你提到的两种消费方式分别是自动跟踪策略Auto Tracking Strategy和指定偏移量Offset的消费方式。下面我们将分别演示这两种方式的 Java 实现。
### 1. 自动跟踪策略Auto Tracking Strategy
这种方式下消费者会一直监听 Stream 中的新消息并不断处理这些消息。
#### 示例代码
java import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.ConsumerBuilder;
public class AutoTrackingConsumer { public static void main(String[] args) { Environment environment Environment.builder().host(localhost).build(); String streamName test_stream; // 创建消费者 Consumer consumer environment.consumerBuilder() .stream(streamName) .offset(OffsetSpecification.first()) // 从第一个消息开始 .messageHandler((context, message) - { String body new String(message.getBodyAsBinary()); System.out.println(Received message: body); }) .build(); // 消费者会一直运行监听新的消息 System.out.println(Consumer is listening for new messages...); } }
### 2. 指定偏移量Offset的消费方式
这种方式下消费者会从指定的偏移量开始消费消息并在消费完这些消息后停止。
#### 示例代码
java import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.ConsumerBuilder;
public class OffsetConsumer { public static void main(String[] args) { Environment environment Environment.builder().host(localhost).build(); String streamName test_stream; long offset 10; // 指定偏移量 // 创建消费者 Consumer consumer environment.consumerBuilder() .stream(streamName) .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始 .messageHandler((context, message 为什么没写.autoTrackingStrategy()
你提到的 .autoTrackingStrategy() 是 RabbitMQ Stream API 中的一个重要方法它用于指定消费者的自动跟踪策略。让我们详细解释一下如何在代码中使用这个方法。
### 自动跟踪策略Auto Tracking Strategy
自动跟踪策略允许消费者始终监听 Stream 中的新消息并自动跟踪消息的偏移量。这种方式下消费者会持续消费新消息而不会停止。
#### 示例代码
java import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.ConsumerBuilder;
public class AutoTrackingConsumer { public static void main(String[] args) { Environment environment Environment.builder().host(localhost).build(); String streamName test_stream; // 创建消费者 Consumer consumer environment.consumerBuilder() .stream(streamName) .offset(OffsetSpecification.first()) // 从第一个消息开始 .autoTrackingStrategy() // 启用自动跟踪策略 .messageHandler((context, message) - { String body new String(message.getBodyAsBinary()); System.out.println(Received message: body); }) .build(); // 消费者会一直运行监听新的消息 System.out.println(Consumer is listening for new messages...); } }
### 指定偏移量Offset的消费方式
这种方式下消费者会从指定的偏移量开始消费消息并在消费完这些消息后停止。
#### 示例代码
java import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.ConsumerBuilder;
public class OffsetConsumer { public static void main(String[] args) { Environment environment Environment.builder().host(localhost).build(); String streamName test_stream; long offset 10; // 指定偏移量 // 创建消费者 Consumer consumer environment.consumerBuilder() .stream(streamName) .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始 .messageHandler((context, message) - { .offset(OffsetSpecification.first()) // 从第一个消息开始 .autoTrackingStrategy() / -----这里指定了 .autoTrackingStrategy() /就不用指定.offset(OffsetSpecification.first())了吧 确实如果你使用 .autoTrackingStrategy()那么你不需要再指定 .offset(OffsetSpecification.first())因为自动跟踪策略会自动从最新的消息开始消费并持续监听新的消息。让我们分别看看两种方式的完整代码示例。
### 自动跟踪策略Auto Tracking Strategy
这种方式下消费者会自动从最新的消息开始消费并持续监听新的消息。
#### 示例代码
java import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.ConsumerBuilder;
public class AutoTrackingConsumer { public static void main(String[] args) { Environment environment Environment.builder().host(localhost).build(); String streamName test_stream; // 创建消费者 Consumer consumer environment.consumerBuilder() .stream(streamName) .autoTrackingStrategy() // 启用自动跟踪策略 .messageHandler((context, message) - { String body new String(message.getBodyAsBinary()); System.out.println(Received message: body); }) .build(); // 消费者会一直运行监听新的消息 System.out.println(Consumer is listening for new messages...); } }
### 指定偏移量Offset的消费方式
这种方式下消费者会从指定的偏移量开始消费消息并在消费完这些消息后停止。
#### 示例代码
java import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.ConsumerBuilder;
public class OffsetConsumer { public static void main(String[] args) { Environment environment Environment.builder().host(localhost).build(); String streamName test_stream; long offset 10; // 指定偏移量 // 创建消费者 Consumer consumer environment.consumerBuilder() .stream(streamName) .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始 .messageHandler((context, message) - { String body new String(message.getBodyAsBinary()); System.out.println(Received message: body); 那里之前为什么要写 .offset(OffsetSpecification.first()) // 从第一个消息开始 .autoTrackingStrategy() /
你说得对如果你使用 .autoTrackingStrategy()就不需要再指定 .offset(OffsetSpecification.first())。这是一个错误的示例。使用 .autoTrackingStrategy() 时消费者会自动从最新的消息开始消费并持续监听新的消息。
让我们修正并简化这两种方式的示例代码
### 自动跟踪策略Auto Tracking Strategy
这种方式下消费者会自动从最新的消息开始消费并持续监听新的消息。
#### 示例代码
java import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.ConsumerBuilder;
public class AutoTrackingConsumer { public static void main(String[] args) { Environment environment Environment.builder().host(localhost).build(); String streamName test_stream; // 创建消费者 Consumer consumer environment.consumerBuilder() .stream(streamName) .autoTrackingStrategy() // 启用自动跟踪策略 .messageHandler((context, message) - { String body new String(message.getBodyAsBinary()); System.out.println(Received message: body); }) .build(); // 消费者会一直运行监听新的消息 System.out.println(Consumer is listening for new messages...); } }
### 指定偏移量Offset的消费方式
这种方式下消费者会从指定的偏移量开始消费消息并在消费完这些消息后停止。
#### 示例代码
java import com.rabbitmq.stream.Consumer; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.ConsumerBuilder;
public class OffsetConsumer { public static void main(String[] args) { Environment environment Environment.builder().host(localhost).build(); String streamName test_stream; long offset 10; // 指定偏移量 // 创建消费者 Consumer consumer environment.consumerBuilder() .stream(streamName) .offset(OffsetSpecification.offset(offset)) // 从指定偏移量开始 .messageHandler((context, message) - { String body new String(message.getBodyAs