网站制作设计收费,在国际网站上做贸易怎么发货,新会网站设计,深圳华强北封闭了吗目录 一、简介1.1、消费模式 二、消费者2.1、maven依赖2.2、application配置2.3、消费监听 三、生产者3.1、发送消息3.2、运行结果 四、其他 一、简介 在之前的文章中#xff0c;我们讲过了#xff0c;同步发送单条消息#xff0c;异步发送单条消息#xff0c;发送单向消息… 目录 一、简介1.1、消费模式 二、消费者2.1、maven依赖2.2、application配置2.3、消费监听 三、生产者3.1、发送消息3.2、运行结果 四、其他 一、简介 在之前的文章中我们讲过了同步发送单条消息异步发送单条消息发送单向消息发送顺序消息批量发送消息事务消息我们使用的模式都是 集群消费模式Cluster本文就来讲另外一种消息消费模式也就是广播消费模式Broadcast
1.1、消费模式 在 Apache RocketMQ 中实现消息消费的方式主要是两种 集群消费模式Cluster 在集群消费模式下同一个消费者组Consumer Group中的每个消费者都会消费消息的一个副本。消息会被分发到不同的消费者实例上但是同一个消息只会被同一个消费者组中的一个消费者消费。 广播消费模式Broadcast 在广播消费模式下同一个消费者组中的每个消费者都会收到消息的一个副本即每个消费者都会独立地消费消息。消息会被广播到同一个消费者组中的所有消费者实例上。 那么怎么使用广播消费模式呢其实很简单通过在消费者的 RocketMQMessageListener 注解中设置 messageModel 参数为 MessageModel.BROADCASTING即可将消费者设置为广播模式。在广播模式下同一个消费者组中的每个消费者都会收到消息的一个副本每个消费者都会独立地消费消息从而实现了消息的广播消费。接下里看看具体操作吧。
二、消费者
2.1、maven依赖
pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdparentartifactIdrocketmq/artifactIdgroupIdcom.alian/groupIdversion1.0.0-SNAPSHOT/version/parentmodelVersion4.0.0/modelVersionartifactId11-broadcasting-message-one/artifactIdpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/properties/project2.2、application配置
application.properties
server.port8011# rocketmq地址
rocketmq.name-server192.168.0.234:9876
# 默认的消费者组
rocketmq.consumer.groupBROADCASTING_CONSUMER_GROUP
# 批量拉取消息的数量
rocketmq.consumer.pull-batch-size10
# 广播消费模式
rocketmq.consumer.message-modelBROADCASTING实际上对于本文来说下面两个配置不用配置也不会生效。
# 默认的消费者组
rocketmq.consumer.groupBROADCASTING_CONSUMER_GROUP
# 广播消费模式
rocketmq.consumer.message-modelBROADCASTING因为优先的是RocketMQMessageListener 注解中设置 consumerGroup 和messageModel 参数。
2.3、消费监听 RocketMQMessageListener是RocketMQ提供的注解用于配置消费者监听器的相关属性。
package com.alian.broadcasting;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;Slf4j
Service
RocketMQMessageListener(topic broadcasting_string_message_topic,consumerGroup BROADCASTING_CONSUMER_GROUP,messageModel MessageModel.BROADCASTING)
public class StringMessageConsumer implements RocketMQListenerString {Overridepublic void onMessage(String message) {log.info(第一个消费者接收到的字符串消息: {}, message);// 处理消息的业务逻辑}
}关于这里RocketMQMessageListener的参数做个简单解释
topic必填指定该消费者订阅的Topic名称consumerGroup必填指定该消费者所属的消费者组名称,同一个组内的消费者实例通常进行负载均衡消费messageModel设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
MessageModel.java
public enum MessageModel {BROADCASTING(BROADCASTING),CLUSTERING(CLUSTERING);private final String modeCN;MessageModel(String modeCN) {this.modeCN modeCN;}public String getModeCN() {return this.modeCN;}
}三、生产者 生产者我就复用前面批量消息发送的模块了
3.1、发送消息
Slf4j
SpringBootTest
public class SendBatchedBroadcastingMessageTest {Autowiredprivate RocketMQTemplate rocketMQTemplate;Testpublic void syncSendStringMessagesWithBuilder() {String topic broadcasting_string_message_topic;for (int i 0; i 10; i) {String message 广播消息 i;MessageString rocketMessage MessageBuilder.withPayload(message).build();rocketMQTemplate.convertAndSend(topic, rocketMessage);}}Testpublic void syncSendBatchStringMessagesWithBuilder() {String topic string_message_topic;String message 批量广播消息;ListMessageString messageList new ArrayList();for (int i 0; i 10; i) {MessageString rocketMessage MessageBuilder.withPayload(message i)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, text/plain).build();// 加入到列表messageList.add(rocketMessage);}// 使用syncSend发送批量消息SendResult sendResult rocketMQTemplate.syncSend(topic, messageList);log.info(批量消息发送结果{},sendResult);}AfterEachpublic void waiting() {try {Thread.sleep(3000L);} catch (InterruptedException e) {e.printStackTrace();}}}我们先启动消费者然后生产者发送消息。
3.2、运行结果
运行结果
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息1
[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息0
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息3
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息2
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 广播消息9
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息0
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息2
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息4
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息5
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息3
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息1
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息6
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息9
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第一个消费者接收到的字符串消息: 批量广播消息8[NSUMER_GROUP_11] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息0
[NSUMER_GROUP_12] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息1
[NSUMER_GROUP_13] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息2
[NSUMER_GROUP_14] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息3
[NSUMER_GROUP_15] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息4
[NSUMER_GROUP_16] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息5
[NSUMER_GROUP_17] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息6
[NSUMER_GROUP_18] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息7
[NSUMER_GROUP_19] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息8
[NSUMER_GROUP_20] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 广播消息9
[ONSUMER_GROUP_5] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息4
[ONSUMER_GROUP_7] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息6
[ONSUMER_GROUP_3] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息2
[ONSUMER_GROUP_4] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息3
[ONSUMER_GROUP_8] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息7
[ONSUMER_GROUP_9] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息8
[ONSUMER_GROUP_2] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息1
[ONSUMER_GROUP_1] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息0
[NSUMER_GROUP_10] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息9
[ONSUMER_GROUP_6] c.a.broadcasting.StringMessageConsumer : 第二个消费者接收到的字符串消息: 批量广播消息5四、其他 RocketMQ 通过消费者组Consumer Group来维护不同消费者的消费进度。每个消费者组都有一个消费进度offset用于标记该组下的消费者在某个主题Topic和队列Queue上已经消费到的位置。所以不同的消费者组会被视为不同的消费者如果消费者重启或重新加入组就能从对应Queue的offset处继续消费。 不过使用广播消费模式时Consumer Group 的概念基本上没有作用因为每个消费者实例都会独立地收到消息的一个副本。在广播模式下同一个消费者组中的每个消费者都会收到消息的一个副本每个消费者都会独立地消费消息而不像集群消费模式中那样一个消费者组中的消费者会共同消费消息。 广播消费模式在RocketMQ中最好的好处就是消费者解耦不同的消费者可以独立消费消息相互之间不受影响提高了系统的扩展性它的适用场景有
日志收集 - 需要将日志数据分发给多个日志收集系统每个系统都需要收到全量日志。数据备份 - 实时备份数据到多个存储系统确保数据有冗余副本。信息推送 - 向多个推送通道投递并发送消息通知如站内信、短信、Push等。状态同步 - 将数据变更实时同步到集群的所有节点保证集群节点状态一致。负载均衡 - 将任务或请求广播给所有服务实例由每个实例独立处理实现负载分担。监控告警 - 将系统监控数据广播给多个监控系统多视角分析。