个人网站备案注意事项,wordpress树形导航注册,上线了建站,手机上怎么分享wordpress学习贴#xff1a;参考https://blog.csdn.net/zhiyikeji/article/details/138286088 文章目录 普通消息顺序消息延迟消息批量消息事务消息 SpringBoot整合RocketMQ 3.1 NameServer NameServer是一个简单的路由注册中心#xff0c;支持Topic和Broker的动态注册和发现。作用主…学习贴参考https://blog.csdn.net/zhiyikeji/article/details/138286088 文章目录 普通消息顺序消息延迟消息批量消息事务消息 SpringBoot整合RocketMQ 3.1 NameServer NameServer是一个简单的路由注册中心支持Topic和Broker的动态注册和发现。作用主要包括两点
Broker管理Broker会把集群信息注册到NameServer上NameServer会把这些信息记录下来作为路由信息的基本数据。然后还会提供心态检测机制检查Broker是否还存活 路由信息因为NameServer存放的有Broker集群的基本信息例如有哪些Borker可用以及Broker下的队列信息所以Producer和Consumer就可以通过NameServer知道整个Broker集群信息生产者生产的信息就可以知道往哪个Broker下的哪个Message queue队列中投递消息消费者也可以通过自己的配置信息去哪个broker下哪个队列中去拉取消息进行消费。 NameServer是无状态的且NameServer集群下各个NameServer是互相不通信的没有任何信息同步操作。每个Broker都会与NameServer集群下的每一个NameServer节点建立长链接然后会向每一个NameServer注册自己的路由信息所以每个NamerServer下都保存了Broker集群的完整路由信息。当某个NameServer节点挂掉后消费者和生产者也可以通过其他NameServer获取到Broker的完整信息所以大部分情况下NameServer通常会部署多个实例
Broker代理服务器 主要负责生产者生产消息的存储为消费者拉取信息提供查询也会存储消息相关的一些其他数据例如Topic信息、队列信息、消费进度偏移量等。 而且Broker是服务高可用的保证相对于NameServer来说Broker的部署会相对于复杂一些。
普通主从集群模式 这种集群下会给每个节点分配特定的角色分为Master和Slave一个Master可以对应多个Slave但是一个Slave只能对应一个Mastermaster负责响应生成存储消息的请求并存储消息。slave负责储存从主节点同步过来的数据(可以选择同步或者异步)我们可以通过配置conf目录下的配置文件来选择如何配置。我们可以通过指定相同的BrokerName不同的BrokerId来制定一个Broker集群。BrokerId中0代表master非0代表slave。但是这种模式下的弊端就是各个节点的角色无法切换如果一个master挂掉后这一组的Broker就不可用了 Dledger高可用集群 这个是RocketMQ4.5版本后提供的一种集群高可用模式这个模式下会随机选举出一个节点作为master当master节点挂了后会通过Raft机制然后会从slave节点中选择一个节点升级为master。
普通消息
发送消息的方式有同步、异步、单向三种方式
同步方式同步方式是最常用的方式通常是发送一条消息后早收到服务端同步响应之后然后再发送下一条消息可以用于一些比较重要的场景例如短信发送 异步方式发送一条消息到服务端后不需要等到服务端响应就可以继续发送下一条消息但是需要注册回调接口然后通过回调接口获取服务端的响应经常用于一些链路执行过长的场景 单向方式不需要等待服务端响应也不需要注册回调接口就可以继续发送下一条消息耗时非常短通常适用于耗时短但是可靠性要求没那么高的场景。 发送消息的步骤大概如下:
创建生产者普通消息通常都是DefaultProducer然后设置一个produceGroup的名字注册NameServer地址可以setNameServer(),如果是多个中间以;分开构建消息体Topic、tag、keys、消息体等信息发送消息通过send()方法发送消息
顺序消息
顺序消息是对生产者生产消息和消费者消费消息的顺序有严格要求的。 但是RocketMQ并不能保证所有消息的有序性因为默认情况下一个Topic下的消息会发送到不同的Message queue上消费者也会从不同的Message queue上拉取消息这种情况下是不能保证有序的。 RocketMQ的有序性是要保证Producer、Broker、Consumer三者的有序性严格按照FIFO方式来对消息进行处理我们可以从生产者把消息发送到同一个Message queue上所以只能有一个生产者因为多个生产者的生产的消息是无法有序的并且生成者发送消息不能选择多线程的方式。然后消费者可以注册一个MessageListenerOrderly()在RocketMQ内部就会通过锁队列的方式保证消息是一个一个队列来取的。 int orderId i 10;try {Message message new Message(TopicTest,tags[i % tags.length],KEYSi,BODY.getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置一个MessageQueueSelector队列选择器SendResult sendResult orderMessageProducer.send(message, new MessageQueueSelector() {//list 消息队列列表,args 我们传入的orderIdOverridepublic MessageQueue select(ListMessageQueue list, Message message, Object args) {Integer id (Integer)args;//这样可以确保相同orderId的消息总是发送到相同的队列实现消息的顺序性return list.get( id%list.size() );}}, orderId);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();}延迟消息
延时消息是生产者生产的消息发送到服务端后并不希望马上被消费而是希望延迟一段时间后才被消费 try {Message message new Message(TopicTest,tags,OrderId i,test.getBytes(RemotingHelper.DEFAULT_CHARSET));//设置延迟级别message.setDelayTimeLevel(3);SendResult sendResult scheduledMessageProducer.send(message);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();}批量消息
可以把消息整合到一批后在进行发送可以增加吞吐率并减少API和网络调用次数 ArrayListMessage messages new ArrayListMessage();messages.add(new Message(TopicTest, tag1, 1, test1.getBytes()));messages.add(new Message(TopicTest, tag2, 2, test.getBytes()));messages.add(new Message(TopicTest, tag3, 3, test3.getBytes()));SendResult sendResult batchProducer.send(messages);
事务消息
事务消息是在分布式系统中 保证最终一致性的两阶段提交的消息实现。他可以保证本地事务执行与消息发送两 个操作的原子性也就是这两个操作一起成功或者一起失败。 对于一些数据具有强一致性场景的情况下例如上游订单付款成功后下游才可以进行积分变更、物流发货、购物车状态变更。这种类似场景下可以选择事务消息。
SpringBoot整合RocketMQ
创建一个MAVEN项目引入依赖:
dependenciesdependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.1/versionexclusionsexclusiongroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/exclusionexclusiongroupIdorg.springframework/groupIdartifactIdspring-core/artifactId/exclusionexclusiongroupIdorg.springframework/groupIdartifactIdspring-webmvc/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactIdversion2.3.10.RELEASE/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactIdversion2.3.10.RELEASE/version/dependency
/dependencies
创建一个配置类:application.properties #NameServer地址 rocketmq.name-serverip地址:9876 #默认的消息生产者组 rocketmq.producer.groupspringBootGroup
创建一个启动类
SpringBootApplication
public class RocketMqApplication {public static void main(String[] args) {SpringApplication.run(RocketMqApplication.class,args);}
}
创建一个生产者类
Component
public class SpringProducer {ResourceRocketMQTemplate rocketMqTemplate;public void sendMessage(String topic, String message){rocketMqTemplate.convertAndSend(topic,message);}
}
创建一个消费者类
Component
RocketMQMessageListener(consumerGroup MyConsumerGroup, topic TopicTest)
public class SpringConsumer implements RocketMQListenerString {Overridepublic void onMessage(String s) {System.out.println(s);}
}
创建一个Controller类
RestController
public class MqSendController {Resourceprivate SpringProducer springProducer;Value(${producer.topic})String topic;GetMapping(/send)public void sendMessage(RequestParam(message) String message){springProducer.sendMessage(topic,message);}
}
然后访问http://localhost:8080/send?messagetest