如何做好品牌网站建设,南昌定制网站开发费用,网站建设竞价托管服务,专注于响应式网站开发上篇讲述高并发情况下的数据库处理方式#xff1a;分布式事务管理机制。即使我们做到这一步并发情况只能稍微得到缓解#xff0c;当然千万级别的问题不大#xff0c;但在面对双十一淘宝这类的达上亿的并发的时候仅仅靠分布式事务管理还是远远不够#xff0c;即使数据库可以… 上篇讲述高并发情况下的数据库处理方式分布式事务管理机制。即使我们做到这一步并发情况只能稍微得到缓解当然千万级别的问题不大但在面对双十一淘宝这类的达上亿的并发的时候仅仅靠分布式事务管理还是远远不够即使数据库可以抗住压力但从前端访问上并不能根本解决数据库可重复读的问题。 解决并发问题的方式很多单都无法彻底解决并发问题因为并发问题以现有的技术无法解决但我们可以尽可能的使系统无线扩大并发量以达到完全处理并发的效果限流、熔断都是不错的选择但会出现系统无法访问的情况当并发进入到系统的时候直接处理业务功能显然不合适如果把并发进行队列化操作显然更加合适公平。举个栗子做核酸检测的医护人员就几个而做核酸检测的人却多的数不过来如果这些人一块围着谁也做不了核酸此时排队等待是最适合的先到先排先做。MQ就是维持秩序的管理员将并发进行排队。
1、MQ介绍 当今市面上有很多主流的消息中间件如老牌的ActiveMQ、RabbitMQ炙手可热的Kafka阿里巴巴自主开发的Notify、MetaQ、RocketMQ等。 本文使用ActiveMQ无论哪种MQ技术都会涉及到消息的存储性、可靠性、传递性等基本属性都会包含消息生产者、消费者两部分推送方式一般为订阅广播、点对点的方式。字面意思广播就是通过网络广播发送消息MQ不关注接收方是否在线点对点是A端到B端双方进行握手通信后再传输数据。两者比较广播效率更高但数据丢失率也高数据安全性低点对点传输效率相对广播低、数据丢失率低、如果B端掉线消息可以做持久化存储等待B端上线握手后再次发送。
2、windows系统MQ服务部署 官网https://activemq.apache.org/ 官网提供两个版本的MQ供下载Classic版本为当前稳定版本Artemis版本为下代版本。本文使用Classic版本。下载windows版本的压缩包ActiveMQ 5.16.5.zip。各版本的MQ对应Java JDK版本要看清楚。 下载完成后解压zip进入到MQ文件夹下如下 进入bin目录下根据系统选择32位或者64位进入文件夹运行activemq.bat脚本。 本地访问http://localhost:8161账号/密码为admin 点击进入管理界面至此MQ服务启动成功。消息管理中心有QueuesTopicsSubscribers等我们可以查看队列、订阅等信息。 Topics是基于“订阅-发布”模式当操作者发布一条消息后所有对这条消息感兴趣的订阅者都可以收到它——也就是说这条消息会被拷贝成多份进行分发。只有当前“活动的”订阅者能够收到消息。 Queues是一种“负载均衡模式”的实现。一个消息能且只能被一个消费者接受。如果当前JMS-Queue中没有任何的消费者那么这条消息将会被Queue存储起来实际应用中可以存储在磁盘上也可以存储在数据库中看软件的配置直到有一个消费者连接上。另外如果消费者在接受到消息后在他断开与JMS-Queue连接之前没有发送ack信息可以是客户端手动发送也可以是自动发送那么这条消息将被发送给其他消费者。
比较项目Topic 模式队列Queue 模式队列工作模式“订阅-发布”模式如果当前没有订阅者消息将会被丢弃。如果有多个订阅者那么这些订阅者都会收到消息“负载均衡”模式如果当前没有消费者消息也不会丢弃如果有多个消费者那么一条消息也只会发送给其中一个消费者并且要求消费者ack信息。有无状态无状态Queue数据默认会在mq服务器上以文件形式保存比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。传递完整性如果没有订阅者消息会被丢弃消息不会丢弃处理效率由于消息要按照订阅者的数量进行复制所以处理性能会随着订阅者的增加而明显降低并且还要结合不同消息协议自身的性能差异由于一条消息只发送给一个消费者所以就算消费者再多性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的
3、系统集成 服务启动后我们需要在系统中集成MQ并进行收发消息测试在父工程下创建MQ子工程。在子工程pom中添加MQ依赖。
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-activemq/artifactId
/dependency
dependencygroupIdorg.apache.activemq/groupIdartifactIdactivemq-pool/artifactId
/dependency 在启动类上添加注解EnableJms。
在需要使用MQ进行业务处理的服务中依赖MQ服务并添加MQ配置信息
server:port: 8081
spring:activemq:broker-url: tcp://127.0.0.1:61616user: adminpassword: adminjms:pub-sub-domain: true # 默认为false点对点模式queue true订阅模式topicpool:enabled: false # 是否创建 JmsPoolConnectionFactory 连接池idle-timeout: 30s # 空闲连接超时时间max-connections: 50 # 连接池中最大连接数max-sessions-per-connection: 100 # 每个连接最大会话 编写MQ配置类
Configuration
public class BeanConfig {Value(${spring.activemq.broker-url})private String brokerUrl;Value(${spring.activemq.user})private String userName;Value(${spring.activemq.password})private String password;//定义存放消息的队列Beanpublic ActiveMQQueue queue() {return new ActiveMQQueue(MyQueue);}Beanpublic ActiveMQTopic topic() {return new ActiveMQTopic(MyTopic);}Beanpublic ConnectionFactory connectionFactory() {return new ActiveMQConnectionFactory(userName, password, brokerUrl);}/*** 在 Queue 模式中对消息的监听需要对containerFactory进行配置*/Bean(queueListener)public JmsListenerContainerFactory? queueJmsListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleJmsListenerContainerFactory factory new SimpleJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPubSubDomain(false);return factory;}/*** 在 topic 模式中对消息的监听需要对containerFactory进行配置*/Bean(topicListener)public JmsListenerContainerFactory? topicJmsListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleJmsListenerContainerFactory factory new SimpleJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPubSubDomain(true);return factory;}
} 编写Queue监听类和Topic监听类
Component
public class MyQueueListener {JmsListener(destination MyQueue, containerFactoryqueueListener)public void handleMessage(String name) {System.err.println(name);}} Component
public class MyTopicListener {JmsListener(destination MyTopic, containerFactorytopicListener)public String handleMessage1(String name) {System.out.println(topic 成功接受Name name);}
} 编写消息发送接口 Controller
RestController(/demo)
public class Demo {Autowiredprivate Queue queue;Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;/*** 发送消息接口* param msg*/GetMapping(/{msg})void demo(PathVariable String msg) {// 消息入队列jmsMessagingTemplate.convertAndSend(queue, msg);}} 访问接口传入消息参数 可以看到控制台打印输出消息内容。至此系统集成MQ完成。