浏览有关小城镇建设的网站6,免费软件的定义,wordpress建站赚钱,南昌网站建设联系方式文章目录 前言小小消息管家1.项目介绍2. 需求分析2.1 API2.2 消息应答2.3 网络通信协议设计 3. 开发环境4. 项目结构介绍4.1 配置信息 5. 项目演示 前言
消息队列的本质就是阻塞队列#xff0c;它的最大用途就是用来实现生产者消费者模型#xff0c;从而实现解耦合以及削峰填… 文章目录 前言小小消息管家1.项目介绍2. 需求分析2.1 API2.2 消息应答2.3 网络通信协议设计 3. 开发环境4. 项目结构介绍4.1 配置信息 5. 项目演示 前言
消息队列的本质就是阻塞队列它的最大用途就是用来实现生产者消费者模型从而实现解耦合以及削峰填谷。
在分布式系统中不再是单个服务器而是服务器“集群”如果我们我们直接A服务器给B服务器发送请求B服务器给A服务器返回响应这样的话我们AB的耦合较大如果A或者B服务器挂了我们业务也就崩溃了。引入消息队列之后我们将请求和响应都通过消息队列这个中间人来传递就降低了耦合度。
同样的如果我们AB服务器直接进行通信如果A服务器突然发送许多请求我们B服务器也会收到巨多请求的影响AB由于硬件资源限制可能都会崩溃。如果我们引入消息队列消息队列可以将许多请求接收存储下来B服务器依然可以按照原有节奏取请求不会一下子接收大量请求。这也就是我们所说的削峰。 填谷也是类似的就算请求过少我们的服务器依然可以从消息队列中取出挤压得请求。
在分布式系统中跨主机之间使用生产者消费者模型就显得非常重要了。 我们通常把阻塞队列封装成⼀个独立的服务器程序并且新增一些功能。这样的程序我们就称为 消息队列。
所以此次就仿照市面上得RabbitMQ实现一个简易的消息队列。
小小消息管家
1.项目介绍
将消息队列分成服务器模块、客户端模块、公共模块来实现。
服务器模块通过虚拟主机实现交换机、队列、绑定、消息等相关操作的隔离。虚拟主机主要负责对上述内容的数据进行硬盘数据库和文件以及内存管理、消息的三种转发方式如Direct、Fanout、Topic、为提供客户端API的实现。客户端模块实现连接管理提供建立连接、信道管理通过信道实现TCP连接的复用。在信道管理中实现客户端调用的API。公共模块约定客户端服务器的通信协议、数据传输过程中的序列化以及反序列化。 2. 需求分析
由于是实现一个生产者消费者模型在消息队列中我们要实现的逻辑如下我们的生产者客户端向服务器发送消息消费者客户端向服务器订阅消息服务器负责消息的存储和转发。 概念解读
虚拟主机 (VirtualHost)是⼀个逻辑上的集合⼀个 BrokerServer 上可以存在多个 VirtualHost。交换机 (Exchange)生产者把消息先发送到 BrokerServer 的 Exchange 上。 再根据不同的规则把消息转发给不同的 Queue。队列 (Queue)真正用来存储消息每个消费者决定自己从哪个 Queue 上读取消息。绑定 (Binding)Exchange 和 Queue 之间的关联关系。Exchange 和 Queue 可以理解成 “多对多” 关 系。消息 (Message): 传递的内容。
2.1 API
我们的服务器提供以下API是西安消息队列的基本功能。
创建队列 (queueDeclare)销毁队列 (queueDelete)创建交换机 (exchangeDeclare)销毁交换机 (exchangeDelete)创建绑定 (queueBind)解除绑定 (queueUnbind)发布消息 (basicPublish)订阅消息 (basicConsume)确认消息 (basicAck)
我们的客户端提供以下API供客户使用消息队列为了复用TCP连接我们提供了一个Channel逻辑通道。所以在客户端我们还需要提供Channel的创建和关闭
创建 Connection关闭 Connection创建 Channel关闭 Channel创建队列 (queueDeclare)销毁队列 (queueDelete)创建交换机 (exchangeDeclare)销毁交换机 (exchangeDelete)创建绑定 (queueBind)解除绑定 (queueUnbind)发布消息 (basicPublish)订阅消息 (basicConsume)确认消息 (basicAck)
2.2 消息应答
被消费者消费的消息需要进行应答来确定我们消费者正确消费了消息。我们设置两种应答模式
自动应答消费者只要消费了消息就算应答完毕。Broker直接删除这个消息。手动应答消费者手动调用应答接口确认消息Broker收到应答请求后删除这个消息。
2.3 网络通信协议设计
我们使用TCP 协议来作为通信的底层协议。在这个基础上自定义应用层协议实现客户端对服务器提供的功能远程调用。所以我们在协议中约定type标记调用的功能length为消息的长度payload为消息的具体内容。
请求和响应的格式如下
其中 type取值如下 • 0x1 创建 channel • 0x2 关闭 channel • 0x3 创建 exchange • 0x4 销毁 exchange • 0x5 创建 queue • 0x6 销毁 queue • 0x7 创建 binding • 0x8 销毁 binding • 0x9 发送 message • 0xa 订阅 message • 0xb 返回 ack • 0xc 服务器给客户端推送的消息(被订阅的消息)
其中 payload 部分会根据不同的 type存在不同的格式。对于请求来说, payload 表示这次方法调用的各种参数信息我们定义对应的类实现。 对于响应payload 表示这次方法调用的返回值。
3. 开发环境 数据库SQLite 开发语言Java 技术框架SpringBoot、SpringMVC、Mybatis 管理工具Maven 开发工具Intellij IDEA 2020.1.4 操作系统Windows10 4. 项目结构介绍 common 包中约定通信协议包括请求响应格式以及不同的payload对应的数据格式创建自定义异常类实现序列化反序列化定义消费者以及处理消息调用的函数接口。demo 创建了一个消费者和生产者用于测试项目。mqclient 客户端模块提供创建连接的工厂类定义完整连接的内容定义channel实现客户端apimqserver.core 定义了交换机、队列、消息、交换机类型、转发规则、消费消息的逻辑。mqserver.datacenter 定义了交换机、队列、消息、绑定的存储以及管理。mqserver.mapper:实现对sqlite数据库的操作。
4.1 配置信息
由于我们对于数据库的存储只涉及一小部分所以此处我们利用sqlite进行数据管理。
pom.xml:
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.7.14/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.example/groupIdartifactIdmq/artifactIdversion0.0.1-SNAPSHOT/versionnamemq/namedescriptionmq/descriptionpropertiesjava.version1.8/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.mybatis.spring.boot/groupIdartifactIdmybatis-spring-boot-starter/artifactIdversion2.3.1/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-devtools/artifactIdscoperuntime/scopeoptionaltrue/optional/dependency!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc --dependencygroupIdorg.xerial/groupIdartifactIdsqlite-jdbc/artifactIdversion3.41.0.1/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.mybatis.spring.boot/groupIdartifactIdmybatis-spring-boot-starter-test/artifactIdversion2.3.1/versionscopetest/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdconfigurationexcludesexcludegroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/exclude/excludes/configuration/plugin/plugins/build/project
application.yml: 配置数据库信息
5. 项目演示 创建一个生产者客户端向服务器发送一个消息 package com.example.mq.demo;import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.ExchangeType;import java.io.IOException;/*** 这个类用来表示一个生产者.* 通常这是一个单独的服务器程序.*/
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println(启动生产者);//创建一个连接ConnectionFactory factory new ConnectionFactory();factory.setHost(127.0.0.1);factory.setPort(9090);//得到逻辑链接这个就类似于socketConnection connection factory.newConnection();Channel channel connection.createChannel();// 通过逻辑连接创建交换机和队列channel.exchangeDeclare(testExchange, ExchangeType.DIRECT, true, false, null);channel.queueDeclare(testQueue, true, false, false, null);// 创建一个消息并发送byte[] body hello 欢迎消费消息.getBytes();boolean ok channel.basicPublish(testExchange, testQueue, null, body);System.out.println(消息投递完成! ok ok);Thread.sleep(500);channel.close();connection.close();}
}创建一个消费者客户端订阅消费消息 package com.example.mq.demo;import com.example.mq.common.Consumer;
import com.example.mq.common.MqException;
import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;import java.io.IOException;/*** author zq* date 2023-08-05 19:29*//** 这个类表示一个消费者.* 通常这个类也应该是在一个独立的服务器中被执行*/
public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {System.out.println(启动消费者!);ConnectionFactory factory new ConnectionFactory();factory.setHost(127.0.0.1);factory.setPort(9090);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(testExchange, ExchangeType.DIRECT, true, false, null);channel.queueDeclare(testQueue, true, false, false, null);//订阅消息并定义如何消费消息channel.basicConsume(testQueue, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println([消费数据] 开始!);System.out.println(consumerTag consumerTag);System.out.println(basicProperties basicProperties);String bodyString new String(body, 0, body.length);System.out.println(body bodyString);System.out.println([消费数据] 结束!);}});// 通过这个循环模拟一直等待消费完生产者生产的所有消息while (true) {Thread.sleep(500);}}
}实现结果如下 通过结果我们可以看出我们消费者成功取出订阅的队列中的消息进行消费