宁波网站推广渠道,程序员做任务的网站,手机画设计图软件,天津市建设信息工程网什么是消息队列#xff1f;
阻塞队列#xff08;Blocking Queue#xff09;- 生产者消费者模型 #xff08;是在一个进程内#xff09;所谓的消息队列#xff0c;就是把阻塞队列这样的数据结构#xff0c;单独提取成了一个程序#xff0c;进行独立部署~ --------
阻塞队列Blocking Queue- 生产者消费者模型 是在一个进程内所谓的消息队列就是把阻塞队列这样的数据结构单独提取成了一个程序进行独立部署~ -------- 生产者消费模型 进程和进程之间/服务和服务之间生产者消费者模型作用
解耦合 本来有个分布式系统A服务器 调用 B服务器A给B发请求B给A返回响应》 A 和 B 的耦合是比较大的引入消息队列后A把请求发送到消息队列B再从消息队列获取到请求 削峰填谷 比如A是入口服务器A 调用 B 完成一些具体业务如果是 A 和 B 直接通信如果突然A 收到一组用户的请求的峰值此时 B 也会随着受到峰值~引入消息队列后A把请求发送到消息队列B再从消息队列获取到请求。 虽然A收到很多请求队列也收到了很多请求但是B仍旧可以按照原来的节奏处理请求。不至于说一下就收到太多的并发量。举个例子高铁火车站进站口。 乘客好比A 进站口好比B是有限的就需要一个队列来排队这样不管人多少就不会影响到乘客进站以后的坐车。
市面上一些知名的消息队列
RabbitMQKafkaRocketMQActiveMQ
需求分析 核心概念1
生产者Producer消费者Consumer中间人Broker发布Push 生产者向中间人这里投递消息的过程订阅Subscribe 哪些消费者要从中间人取数据这个注册的过程称为 “订阅”消费 Consume 消费者从中间人这里取数据的动作 一个生产者一个消费者 N个生产者N个消费者 核心概念2
Broker server 内部也涉及一些关键概念是为了如何进出队列
虚拟主机Virtual Host类似于 MySQL 中的 database算是一个 “逻辑” 上的数据集合。 一个Broker server 上可以组织多种不同类别数据可以使用 Virtual Host 做出逻辑上的区分实际开发中一个 Broker server也可能同时用来管理多个 业务线上的数据就可以使用 Virtual Host 做出逻辑上的区分。 交换机Exchange 生产者把消息投递给 Broker Server实际上是把消息先交给了 公司某一层楼Broker Server 上的交换机再由交换机把消息交给对应的队列。 交换机类似于“前台小姐姐” 队列Queue 真正用来存储处理消息的实体后续消费者也是从对应的队列中取数据一个大的消息队列中可以有很多具体的小队列 绑定Binding 把交换机和队列之间建立关系。可以把 交换机 和 队列 视为数据库中 多对多的关系。可以想象在 MQ 中也是有一个这样的中间表所谓的 “绑定’其实就是中间表中的一项 消息Message 具体来说是 服务器A 发给 B 的请求通过MQ转发 服务器B 给 服务器A返回的响应通过MQ转发一个消息可以视为一个字符串二进制数据具体由程序员自定义 核心API
消息队列服务器Broker Server要提供的核心API
创建队列queueDeclare 此处不用 Create这样的术语原因是Create仅仅是创建而 Declare 起到的效果是不存在则创建存在就啥也不做 销毁队列queueDelete创建交换机exchangeDeclare销毁交换机exchageDelete创建绑定queueBind解除绑定queueUnbind发布消息basicPublish订阅消息basicConsume确认消息basicAck 这个API起到的效果是可以让消费者显式的告诉 broker server,这个消息我处理完毕了提高整个系统的可靠性~保证消息处理没有遗漏RabbitMQ 提供了 肯定 和 否定的 确认此处我们项目就只有 肯定确认
交换机类型
交换机在转发消息的时候有一套转发规则的~提供了几种不同的 交换机类型 ExchangType来描述这里不同的转发规则Rabbit主要实现了四种交换机类型也是由 AMQP协议定义的
Direct 直接交换机Fanout 扇出交换机Topic 主题交换机Header 消息头交换机 项目中实现了前三种 Direct 直接交换机 生产者发送消息时会指定一个目标队列的名字此时的 routingKey就是 队列的名字交换机收到后就看看绑定的队列里面有没有匹配的队列如果有就转发过去把消息塞进对应的队列中如果没有消息直接丢弃 Fanout 扇出交换机 会把消息放到交换机绑定的每个队列只要和这个交换机绑定任何队列都会转发消息 Topic 主题交换机
有两个关键概念
bindingKey把队列和交换机绑定的时候指定一个单词像是一个暗号一样routingKey生产者发送消息的时候也指定一个单词如果当前 bindingKey 和 routingKey 对上了就可以把消息转发到对应的队列上述三种交换机类型就像QQ群发红包
专属红包 直接交换机发个10块钱红包大家都能领 10块钱红包 扇出交换机我发个口令红包只有输入对应口令才能领导红包 主题交换机
持久化
上述 虚拟机、交换机、队列、绑定、消息需要存储起来。此时内存和硬盘各存储一份内存为主硬盘为辅。
交换机、队列、绑定存储在数据库中消息存储在文件中
在内存中存储的原因对于 MQ 来说能够高效的转发处理数据是非常关键的指标 因此对于使用内存来组织数据得到的效率就比放硬盘要高很多在硬盘中存储原因为了防止内存中数据随着进程重启/主机重启而丢失
网络通信
其他的服务器生产者/消费者通过网络和咱们的 Broker Server 进行交互的。此处设定使用 TCP 自定义的应用层协议 实现 生产者/消费者 和 BrokerServer 之间的交互工作 应用层协议主要工作就是让客户端可以通过网络调用 brokerserver 提供的编程接口 因此客户端这边也要提供上述API只有服务器是真正干实事的客户端只是发送/接受响应 虽然调用的客户端的方法但是实际上好像调用了一个远端服务器的方法一样 (远程调用 RPC) 客户端除了提供上述9个方法之外还需要提供 4个 额外的方法支撑其他工作 创建 Connection 关闭 Connection 此处用的 TCP 连接一个 Connection 对象就代表一个 TCP连接 创建 Channel 一个Connection 里面包含多个 Channel每个 Channel 上传输的数据都是互不相干的TCP中建立/断开一个连接成本挺高的因此很多时候不希望频繁建立断开 TCP 连接所以定义一个 Channel 不用的时候销毁 Channel此处 Channel 是逻辑概念比 TCP 轻量很多 关闭 Channel
消息应答模式
自动应答消费者把这个消息取走了就算应答了手动应答basicAck 方法属于手动应答消费者需要主动调用这个 API 来进行应答
总结
需要做哪些工作
需要实现 生产者消费者brokerserver 三个部分针对生产者消费者来说主要编写的是 客户端和服务器的通信部分给客户端提供一组 api让客户端的业务代码来调用从而通过网络通信的方式远程调用 brokerserver 上的方法 比如创建交换机客户端这边只需要提供相关参数即可然后通过 socket 将 request 传入到网卡中然后服务器从 网卡中读取 request 解析。然后计算请求得到 response再通过 socket 写回去网卡。 实现 brokerserver 【重点】持久化
上述的这些关键数据在硬盘中怎么存储啥格式存储存储在数据库还是文件后续服务器重启了如何读取这些数据把内存中内容恢复过来? 模块划分 点击查看【processon】
创建核心类 Exchange MSGQueue Binding Message 数据库操作 建表操作 此处考虑的是更轻量的数据库SQLite, 因为一个完整的 SQLite 数据库只有一个单独的可执行文件不到1M 直接在pom.xml文件中引入 !-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc --dependencygroupIdorg.xerial/groupIdartifactIdsqlite-jdbc/artifactIdversion3.42.0.0/version/dependency然后在 application.yml配置文件中
spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBC上述依赖和配置都弄完后当程序启动时会自动建立数据库。所以我们只需要建表就行。此处我们根据之前的需求分析建立三张表此处我们通过 代码形式来建造三张表
配置application.yml
mybatis:mapper-locations: classpath:mapper/**Mapper.xml创建一个对应的 interface 创建 mapper目录和文件 MetaMapper.xml 交换机操作
在接口先写方法
void insertExchange(Exchange exchange);
ListExchange selectAllExchanges();
void deleteExchange(String exchangeName);在 xml 中写
insert idinsertExchange parameterTypecom.example.mq.mqserver.core.Exchangeinsert into exchange values (#{name},#{type},#{durable},#{autoDelete},#{arguments});
/insertselect idselectAllExchanges resultTypecom.example.mq.mqserver.core.Exchangeselect * from exchange;
/selectdelete iddeleteExchange parameterTypejava.lang.Stringdelete from exchange where name #{exchangeName};
/delete队列操作
在接口中先写方法
void insertQueue(MSGQueue queue);
ListMSGQueue selectAllQueues();
void deleteQueue(String queueName);在xml中写
insert idinsertQueue parameterTypecom.example.mq.mqserver.core.MSGQueueinsert into queue values (#{name},#{durable},#{exclusive},#{autoDelete},#{arguments});
/insertselect idselectAllQueues resultTypecom.example.mq.mqserver.core.MSGQueueselect * from queue;
/selectdelete iddeleteQueue parameterTypejava.lang.Stringdelete from queue where name #{queueName};
/delete绑定操作
在接口中先写方法
void insertBinding(Binding binding);
ListBinding selectAllBindings();
void deleteBinding(Binding binding);在xml中写
insert idinsertBinding parameterTypecom.example.mq.mqserver.core.Bindinginsert into binding values (#{exchangeName},#{queueName},#{bindingKey});
/insertselect idselectAllBindings resultTypecom.example.mq.mqserver.core.Bindingselect * from binding;
/selectdelete iddeleteBinding parameterTypejava.lang.Stringdelete from binding where exchangeName #{exchangeName} and queueName #{queueName};
/delete一个统一的类进行数据库操作
在服务器BrokerServer启动的时候能够做出以下逻辑判定 如果数据库存在表也都有了不做任何操作 如果数据库不存在则创建库创建表构造默认数据
构造一个类 DataBaseManager
package com.example.mq.mqserver.datacenter;
import com.example.mq.MqApplication;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.Exchange;
import com.example.mq.mqserver.core.ExchangeType;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.mapper.MetaMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.File;
import java.lang.reflect.Field;
import java.util.List;/*** 通过这个类来整合数据库操作*/
public class DataBaseManager {private MetaMapper metaMapper;// 针对数据库进行初始化public void init(){// 要做的是从 Spring 获取到现成的对象metaMapper MqApplication.context.getBean(MetaMapper.class);if(!checkDBExists()){// 数据库不存在,就进行建库建表操作// 先创建一个 data 目录File dataDir new File(./data);dataDir.mkdirs();// 创建数据表createTable();// 插入默认数据createDefaultData();System.out.println([DataBaseManager] 数据库初始化完成);}else {// 数据库已经存在则什么都不做System.out.println([DataBaseManager] 数据库已经存在);}}public void deleteDB(){File file new File(./data/meta.db);boolean ret file.delete();if (ret){System.out.println([DataBaseManager] 删除数据库文件成功);}else {System.out.println([DataBaseManager] 删除数据库文件失败);}File dataDir new File(./data);ret dataDir.delete();if (ret){System.out.println([DataBaseManager] 删除数据库目录成功);}else {System.out.println([DataBaseManager] 删除数据库目录失败);}}private boolean checkDBExists() {File file new File(./data/meta.db);if (file.exists()){return true;}return false;}// 这个方法用来建表// 建库操作并不需要手动执行不需要手动创建 meta.db 文件// 首次执行这里的数据库操作的时候就会自动创建 meta.db 文件 mybatis 帮我们完成的private void createTable() {metaMapper.createExchangeTable();metaMapper.createQueueTable();metaMapper.createBindingTable();System.out.println([DataBaseManager] 创建表完成);}// 给数据库表中添加默认的值// 此处主要是添加一个默认的交换机// RabbitMQ 里有一个这样的设定: 带有一个 匿名 的交换机类型是 DIRECTprivate void createDefaultData() {// 构造一个默认交换机Exchange exchange new Exchange();exchange.setName();exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);System.out.println([DataBaseManager] 创建初始数据完成);}// 把其他的数据库操作也在这个类封装下public void insertExchange(Exchange exchange){metaMapper.insertExchange(exchange);}public ListExchange selectAllExchanges(){return metaMapper.selectAllExchanges();}public void deleteExchange(String exchangeName){metaMapper.deleteExchange(exchangeName);}public void insertQueue(MSGQueue queue){metaMapper.insertQueue(queue);}public ListMSGQueue selectAllQueues(){return metaMapper.selectAllQueues();}public void deleteQueue(String queueName){metaMapper.deleteQueue(queueName);}public void insertBinding(Binding binding){metaMapper.insertBinding(binding);}public ListBinding selectAllBindings(){return metaMapper.selectAllBindings();}public void deleteBinding(Binding binding){metaMapper.deleteBinding(binding);}
}消息持久化 消息存储格式
Message如何在硬盘上存储
消息操作并不涉及到复杂的增删改查消息数量可能会非常多数据库的访问效率并不高
所以要把消息直接存储在文件中以下设定消息具体如何在文件中存储~ 消息是依托于队列的因此存储的时候就要把 消息 按照 队列 维度展开 此处已经有了一个 data 目录meta.db就在这个目录中 在 data 中创建一些子目录每个队列对应一个子目录子目录名就是队列名 queue_data.txt这个文件里面存储的是二进制的数据我们约定转发到这个队列的队列所有消息都是以二进制的方式进行存储首先规定前4个字节代表的该消息的长度后面紧跟着的是消息本体。对于BrokerServer来说消息是需要新增和删除的。生产者生产一个消息就是新增一个消息消费者消费一个消息就是删除一个消息对于内存中的消息新增删除就比较容易了使用一些集合类就行对于文件中新增我们采用追加方式直接在当前文件末尾新增就行对于文件中删除如果采用真正的删除效率就会非常低。将文件视为顺序表结构删除就会涉及到一系列的元素搬运。所以我们采用逻辑删除的方式。根据消息中的一个变量 isValid 判断该消息是否有效1 为有效消息0 为无效消息 那么如何找到每个消息对应在文件中的位置呢 我们之前在 Message 中设置了两个变量一个是 offsetBeg,一个是 offsetEnd。 我们存储消息的时候是同时在内存中存一份和硬盘中存一份。而内存中存到那一份消息记录了当前的消息的 offsetBeg 和 offsetEnd。通过先找到内存中的消息再根据该消息的两个变量值就能找到硬盘中的消息数据了。 垃圾回收 随着时间的推移文件中存放的消息可能会越来越多。并且可能很多消息都是无用的所以就要针对当前消息数据文件进行垃圾回收。 此处我们采用的复制算法原理也是比较容易理解的 复制算法比较适用的前提是当前的空间有效数据不多大多数都是无效的数据直接遍历原有的消息数据文件把所有的有效数据数据重新拷贝一份到新的文件中新文件名字和原来文件名字相同再把旧的文件直接删除掉。 那么垃圾回收的算法有了何时触发垃圾回收 此处就要用到我们每个队列目录中所对应的另一个文件 queue_stat.txt了使用这个文件来保存消息的统计信息只存一行数据用 \t 分割 左边是 queue_data.txt 中消息的总数目右边是 queue_data.txt中有效的消息数目。 形如 2000\t1500, 代表该队列总共有2000条消息其中有效消息为1500条所以此处我们就约定当消息总数超过2000条并且有效消息数目低于总消息数的50%就处罚一次垃圾回收GC 如果当一个文件消息数目非常的多而且都是有效信息此时会导致整个消息的数据文件非常庞大后续针对这个文件操作就会非常耗时。假设当前文件已经达到10个G了那么此时如果触发一次GC整个耗时就会非常高。 对于RabbitMQ来说解决方案文件拆分当某个文件长度达到一定的阈值的时候就会拆分成两个文件拆着拆着就成了很多文件文件合并每个单独的文件都会进行GC如果GC之后发现文件变小了就会和相邻的其他文件合并这样做可以保证在消息特别多的时候也能保证性能上的及时响应实现思路
用一个专门的数据结构来存储当前队列中有多少个数据文件每个文件大小是多少消息的数目是多少无效消息是多少设计策略什么时候触发文件拆分什么时候触发文件合并
统计文件读写
需要定义一个内部类在表示该队列的统计消息此处优先考虑 static 静态内部类
static public class Stat {// 此处直接定义成 publicpublic int totalCount; // 总的消息数public int validCount; // 有效消息数
}统计文件的读
private Stat readStat(String queueName) {
Stat stat new Stat();
try (InputStream inputStream new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner new Scanner(inputStream);stat.totalCount scanner.nextInt();stat.validCount scanner.nextInt();return stat;
} catch (IOException e) {e.printStackTrace();
}
return null;
}统计文件的写
private void writeStat(String queueName, Stat stat) {// 使用 PrintWrite 来写文件// OutputStream 打开文件默认情况下会直接把源文件清空此时就相当于 新数据把旧的数据覆盖了// 加个 参数 true就会变成追加 new FileOutputStream(getQueueStatPath(queueName),true)try (OutputStream outputStream new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter new PrintWriter(outputStream);printWriter.write(stat.totalCount \t stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}
} 创建消息目录和文件
先创建队列对应的目录以队列名字为名的目录创建队列里面的消息数据文件创建队列里面的消息统计数据文件给消息统计文件设置初始值
// 创建队列对应的文件目录
public void createQueueFiles(String queueName) throws IOException {// 1. 先创建队列对应的消息目录File baseDir new File(getQueueDir(queueName));if (!baseDir.exists()) {// 不存在就创建这个目录Boolean ok baseDir.mkdirs();if (!ok) {throw new IOException(创建目录失败baseDir baseDir.getAbsolutePath());}}// 2. 创建队列数据文件File queueDataFile new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {Boolean ok queueDataFile.createNewFile();if (!ok) {throw new IOException(创建文件失败 queueDateFile queueDataFile.getAbsolutePath());}}// 3. 创建消息统计文件File queueStatFile new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {Boolean ok queueStatFile.createNewFile();if (!ok) {throw new IOException(创建统计文件失败 queueStatFile queueStatFile.getAbsolutePath());}}// 4. 给消息统计文件设置初始值Stat stat new Stat();stat.totalCount 0;stat.validCount 0;writeStat(queueName, stat);
}删除消息目录和文件
先删除消息的统计文件和消息数据文件再删除队列目录
// 删除队列的目录和文件
// 队列也是可以被删除的当队列删除后对应的目录文件也需要随之删除
public void destroyQueueFiles(String queueName) throws IOException {File queueStatFile new File(getQueueStatPath(queueName));boolean ok1 queueStatFile.delete();File queueDataFile new File(getQueueDataPath(queueName));boolean ok2 queueDataFile.delete();File baseDir new File(getQueueDir(queueName));boolean ok3 baseDir.delete();if (!ok1 || !ok2 || !ok3) {throw new IOException(删除目录和文件失败baseDir baseDir.getAbsolutePath());}
}消息序列化
把一个对象结构化数据转换成一个 字符串/字节数组序列化之后方便 存储和传输
存储一般存储在文件中文件只能存字符串/二进制数据。不能直接存对象传输在网络中传输socket 此处不使用 json 进行序列化由于 Message里面存储是二进制数据。 而jason序列化得到的结果是文本数据里面无法存储二进制的body 针对序列化有很多解决方案
Java标准库提供了序列化方案。 ObjectInputStream 和 ObjectOutputStreamHessian 也是一个解决方案protobufferthrift
此处咱使用第一种 Java 标准库自带的
序列化
// 把一个对象序列化为字节数组
public static byte[] toBytes(Object object) throws IOException {// 这个流对象相当于一个变长字节数组// 就可以把 object 序列化的数据给逐渐写入到 byteArrayOutputStream 中再统一转成 byte[]try (ByteArrayOutputStream byteArrayOutputStream new ByteArrayOutputStream()) {// ObjectOutputStream(byteArrayOutputStream)) 此处括号里的内容可根据实际需求修改如果需要 关联文件就写到文件里面// 如果关联 网络就写到网络此处写入的是内存中的 字节数组try (ObjectOutputStream objectOutputStream new ObjectOutputStream(byteArrayOutputStream)) {// 此处的 writeObject 就会把该对象进行序列化生成二进制数据就会写入到// objectOutputStream 中// 由于 objectOutputStream 又是关联到了 byteArrayOutputStream最终结果就会写入到 byteArrayOutputStreamobjectOutputStream.writeObject(object);}// 这个操作就是把 byteArrayOutputStream 二进制数据取出来 转换成 byte[]return byteArrayOutputStream.toByteArray();}
}反序列化
// 把一个字节数组反序列化成对象
public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object null;try (ByteArrayInputStream byteArrayInputStream new ByteArrayInputStream(data)) {try (ObjectInputStream objectInputStream new ObjectInputStream(byteArrayInputStream)) {// 此处的 readObject 就是从 data 这个 byte[] 中读取数据并进行反序列化object objectInputStream.readObject();}}return object;
}把消息写入到文件中
这个要将消息存入到该队列对应的文件中。需要注意的是此处 写入消息 需要两个参数一个是 队列 MSGQueue一个是消息 Message
先判断当前写入队列的文件在不在把 Message 对象进行序列化转换成二进制的字节数组进行写入操作的操作时候要进行加锁锁对象就是当前 MSGQueue此处如果不加锁。当多个客户端进行发送消息的时候可能会造成数据不对。先获取当前队列消息数据文件的长度用这个长度来计算 offsetBeg 和 offsetEnd 设置该消息 offsetBeg 当前文件长度 4设置该消息 offsetEnd 当前文件长度 4 当前二进制数组长度 把新的 message数据写入到文件的末尾处采用追加方式 先写入4个字节的消息长度再写入消息本体 更新统计文件并重新写入
public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 1. 检查下当前要写入的队列 对应的文件是否存在if (!checkFilesExists(queue.getName())) {throw new MqException([MessageFileManager] 队列对应的文件不存在queueName queue.getName());}// 2. 把对象进行序列化转换成二进制的字节数组byte[] messageBinary BinaryTool.toBytes(message);synchronized (queue) {// 3. 先获取到当前队列数据文件的长度用这个长度来计算该 Message 对象和 offsetBeg offsetEnd// 把新的 message 数据写入到队列的文件末尾// 此时message 对象的 offsetBeg 就是 当前文件长度4// offsetEnd 就是 当前文件长度 4 message自身长度File queueDataFile new File(getQueueDataPath(queue.getName()));// 通过这个方法 queueDataFile.length() 就能获取到长度单位字节message.setOffsetBeg(queueDataFile.length() 4);message.setOffsetEnd(queueDataFile.length() 4 messageBinary.length);// 4. 写入消息数据到文件注意此处是追加try (OutputStream outputStream new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream new DataOutputStream(outputStream)) {// 接下来首先写入的是当前消息的长度占领4个字节dataOutputStream.writeInt(messageBinary.length);// 写入消息本体dataOutputStream.write(messageBinary);// TODO}}// 5. 更新统计文件Stat stat readStat(queue.getName());stat.totalCount 1;stat.validCount 1;// 重新写入writeStat(queue.getName(), stat);}
}从文件中删除消息逻辑删除
先从硬盘中读取出来 此处采用 RamdomAccessFile 来读取可以在文中指定位置进行读写随机访问先定义一个 以消息长度为length【offsetEnd - offsetBeg】的一个字节数组 bufferSrc再根据要删除的 Message 对象中的 offsetBeg 和 offsetEnd 将光标定位那个位置然后将结果读取到 bufferSrc中 然后将读到的bufferSrc数据反序列化成 Message对象修改变量 isValid0x2再将 Message对象 序列化成 bufferDes 重新定位光标到消息的 offserBeg将 bufferDes 写回去 更新统计文件信息写入
// 这是消息删除方法
// 这里删除是逻辑删除也就是把硬盘上的message对象里面的 isValid设置成0
// 1. 先把文件从硬盘中读取出来
// 2. 然后修改 isValid
// 3. 再写回到硬盘中
// 此处这个参数中的 message对象必须包含有效的 offsetBeg 和 offsetEnd
public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {synchronized (queue) {try (RandomAccessFile randomAccessFile new RandomAccessFile(getQueueDataPath(queue.getName()), rw)) {// 1. 先读取对应 数据byte[] bufferSrc new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 2. 读取当前的二进制数据转换成 Message 对象 并修改 isValidMessage diskMessage (Message) BinaryTool.fromBytes(bufferSrc);// 此处不需要给参数的 message 的 isValid 设置成0因为这个参数是在内存中管理的 message对象// 而这个对象 也要被马上删除了diskMessage.setIsValid((byte) 0x0);// 3. 写回去// 需要重新定位光标byte[] bufferDest BinaryTool.toBytes(diskMessage);randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);}// 还要更换统计文件Stat stat readStat(queue.getName());if (stat.validCount 0) {stat.validCount - 1;}writeStat(queue.getName(), stat);}
} 从硬盘中恢复数据到内存
使用这个方法将硬盘中所有的有效数据加载到内中具体来说是一个链表中这个方法是在程序启动的时候调用。这里使用 LinkedList来存储消息方便后续进行头删操作一个文件中会包含多个消息需要循环去读取此处手动记录光标位置
先读取4个字节表示当前消息长度然后根据当前消息长度读取对应的长度到 buffer 字节数组中把读取到 buffer 字节数据 反序列化成 Message 对象判断这个 Message 对象里面的 isValid 是否为 0x1如果不是就 continue是的话执行第六步不是就从第一步开始加入消息之前先设置 offsetBeg, offserEnd然后将消息加入到 LinkedList中如果读到末尾会有异常 EOF会自动结束
// 使用这个方法从文件中读取所有的消息内容加载到内存中具体来说是一个链表中
// 这个方法准备在程序启动的时候进行调用
// 这里 使用一个 LinkedList主要目的是为了后续进行头删操作
// 这个方法的参数只是一个 queueName而不是 MsgQueue对象因为不需要使用加锁
// 不涉及多线程操作
public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedListMessage messages new LinkedList();try (InputStream inputStream new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream new DataInputStream(inputStream)) {// 手动记录光标的位置long currentOffset 0;// 一个文件可能包含多个消息所以要循环读while (true) {// 1. 读取当前消息长度 , 一次读4个字节 (这里的 readInt 可能会读到文件的末尾)// 读到末尾就会抛出 EOFException 异常int messageSize dataInputStream.readInt();// 2. 按照这个长度读取消息内容byte[] buffer new byte[messageSize];int actualSize dataInputStream.read(buffer);if (messageSize ! actualSize) {throw new MqException([MessageFileManager] 文件格式错误queueName queueName);}// 3. 把读到的二进制数据反序列化为 Message 对象Message message (Message) BinaryTool.fromBytes(buffer);// 4. 判定这个消息是不是无效对象 isValid0x2if (message.getIsValid() ! 0x1) {// 无效数据跳过continue;}// 5. 有效数据则需要把这个 Message 对象加入到链表中加入之前要先设置 offsetBeg, offsetEndmessage.setOffsetBeg(currentOffset 4);message.setOffsetEnd(currentOffset 4 messageSize);currentOffset (4 messageSize);messages.add(message);}} catch (EOFException e) {// 这个并非真是处理异常处理正常业务逻辑// 文件读到末尾System.out.println([MessageFileManager] 从硬盘恢复数据到内存完成);}}return messages;
}消息文件垃圾回收
由于当前会不停的往消息文件中写入消息并且删除消息只是逻辑删除这就可能导致消息文件越来越大并且包含大量无用的消息。此处使用的是复制算法。
判定当前文件中消息总数超过2000并且有效消息数不足50%就会触发垃圾回收就把所有的有效消息提取出来单独的在写到一个文件中删除旧文件使用新文件代替注意还要更新统计文件信息
总结
MessageFileManager主要负责管理消息在文件中的存储~
设计目录结构和文件格式实现了目录创建和删除实现统计文件的读写实现了消息的写入按照之前的文件格式实现了消息的删除 随机访问文件实现了所有消息的加载垃圾回收复制算法
统一硬盘存储管理
上述我们存储在硬盘中的数据分为了两个一个是存放数据库中一个是存放在文件中。我们需要统一封装一个类对上面硬盘数据进行管理
package com.example.mq.mqserver.datacenter;import com.example.mq.common.MqException;
import com.example.mq.mqserver.core.Binding;
import com.example.mq.mqserver.core.Exchange;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.core.Message;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;/*** 使用这个类来管理所有硬盘上得数据* 1. 数据库交换机、绑定、队列* 2. 数据文件消息* 上层逻辑如果需要操作硬盘统一都通过这个类来使用。上层代码不关心当前数据是存储在数据库还是文件中*/
public class DiskDataCenter {private DataBaseManager dataBaseManager new DataBaseManager();private MessageFileManager messageFileManager new MessageFileManager();public void init(){// 针对上述两个实例进行初始化, 建库建表创建默认交换机dataBaseManager.init();// 当前这个方法是空的方便以后扩展messageFileManager.init();}// 封装交换机操作public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}public ListExchange selectAllExchanges(){return dataBaseManager.selectAllExchanges();}// 封装队列操作public void insertQueue(MSGQueue queue) throws IOException {dataBaseManager.insertQueue(queue);// 创建队列的同时不仅仅要把队列对象写到数据库中还需要创建出对应的目录和文件messageFileManager.createQueueFiles(queue.getName());}public void deleteQueue(String queueName) throws IOException {dataBaseManager.deleteQueue(queueName);// 删除队列的时候也要同时删除队列对应的目录和文件messageFileManager.destroyQueueFiles(queueName);}public ListMSGQueue selectAllQueues(){return dataBaseManager.selectAllQueues();}// 封装绑定操作public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding){dataBaseManager.deleteBinding(binding);}public ListBinding selectAllBindings(){return dataBaseManager.selectAllBindings();}// 封装消息的操作public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);if (messageFileManager.checkGC(queue.getName())){messageFileManager.GC(queue);}}public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}
} 内存数据管理 设计数据结构
使用内存管理上述的数据对于MQ来说内存存储数据为主硬盘存储数据为辅主要是为了持久化重启之后数据不丢失
交换机Key交换机名字Value交换机
private ConcurrentHashMapString, Exchange exchangeMap new ConcurrentHashMap();队列 Key队列名称 Value队列
private ConcurrentHashMapString, MSGQueue queueMap new ConcurrentHashMap();绑定Key交换机名字 Value.key队列名字Value.value绑定关系
private ConcurrentHashMapString, ConcurrentHashMapString, Binding bindingsMap new ConcurrentHashMap();消息KeyMessageIdValueMessage对象
private ConcurrentHashMapString, Message messageMap new ConcurrentHashMap();表示队列和消息之间的关联Key队列名字Value: 是一个存储消息的链表
private ConcurrentHashMapString, LinkedListMessage queueMessageMap new ConcurrentHashMap();表示未被确认的消息存储了哪些消息被消费者取走但是还没应答key队列名称Value.keyMessageIdValue.valueMessage对象
private ConcurrentHashMapString, ConcurrentHashMapString, Message queueMessageWaitAckMap new ConcurrentHashMap();此处咱们实现的MQ支持两种应答模式ACK 自动应答消费者取走元素这个消息就是被应答了就需要删除 手动应答消费者取走元素还不算应答需要消费者再主动调用一个 basicAck 方法此时才算真正应答了才可以删除消息
实现交换机的管理
添加交换机
public void insertExchange(Exchange exchange) {
exchangeMap.put(exchange.getName(), exchange);
System.out.println([MemoryDataCenter] 新交换机添加成功exchangeName exchange.getName());
}
获取交换机
public Exchange getExchange(String exchangeName) {
return exchangeMap.get(exchangeName);
}
删除交换机
public void deletaExchange(String exchangeName) {
exchangeMap.remove(exchangeName);
System.out.println([MemoryDataCenter] 交换机删除成功exchangeName exchangeName);
} 实现队列的管理
添加队列
public void insertQueue(MSGQueue queue) {
queueMap.put(queue.getName(), queue);
System.out.println([MemoryDataCenter] 新队列添加成功queueName queue.getName());
}
获取队列
public MSGQueue getQueue(String queueName) {
return queueMap.get(queueName);
}
删除队列
public void deleteQueue(String queueName) {
queueMap.remove(queueName);
System.out.println([MemoryDataCenter] 队列删除成功queueName queueName);
} 实现绑定的管理
添加绑定
public void insertBinding(Binding binding) throws MqException {// 先使用 exchangeName 查一下对应的 HashMap 是否存在不存在就创建ConcurrentHashMapString, Binding bindingMap bindingsMap.computeIfAbsent(binding.getExchangeName(),k - new ConcurrentHashMap());// 再根据 queueName 查一下如果 binding 存在就抛出异常不存在才能插入synchronized (bindingMap) {if (bindingMap.get(binding.getQueueName()) ! null) {throw new MqException([MemoryDataCenter] 绑定已经存在exchangeName binding.getExchangeName() , queueName binding.getQueueName());}bindingMap.put(binding.getQueueName(), binding);}System.out.println([MemoryDataCenter] 新绑定添加成功exchangeName binding.getExchangeName() , queueName binding.getQueueName());
}
添加绑定要注意线程安全问题此处需要以当前的 bindMap 为锁对象进行加锁
获取绑定
根据交换机名字和队列名字获取唯一的绑定
public Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMapString, Binding bindMap bindingsMap.get(exchangeName);if (bindMap null) {return null;}return bindMap.get(queueName);
}
获取一个交换机的所有绑定
public ConcurrentHashMapString, Binding getBindings(String exchangeName) {
return bindingsMap.get(exchangeName);
}
删除绑定
public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMapString, Binding bindMap bindingsMap.get(binding.getExchangeName());if (bindMap null) {// 该交换机没有绑定任何队列throw new MqException([MemoryDataCenter] 绑定不存在exchangeName binding.getExchangeName() , queueName binding.getQueueName());}bindMap.remove(binding.getQueueName());System.out.println([MemoryDataCenter] 绑定删除成功exchangeName binding.getExchangeName() , queueName binding.getQueueName());
} 实现消息的管理
添加消息
public void addMessage(Message message) {
messageMap.put(message.getMessageId(), message);
System.out.println([MemoryDataCenter] 新消息添加成功messageId message.getMessageId());
}
根据 id 获取消息
public Message getMessage(String messageId) {
return messageMap.get(messageId);
}
根据 id 删除消息
public void removeMessage(String messageId) {
messageMap.remove(messageId);
System.out.println([MemoryDataCenter] 消息删除成功messageId messageId);
}
发送消息到指定队列 队列和消息之间的关联
public void sendMessage(MSGQueue queue, Message message) {// 把消息放到对应的队列中// 先根据队列名字找到该队列对应的消息链表LinkedListMessage messages queueMessageMap.computeIfAbsent(queue.getName(), k - new LinkedList());synchronized (messages) {messages.add(message);}// 在这里把该消息也往消息中心插入一下// 这里就算消息中心已经存在消息重复插入也没关系// messageId相同对应的 message 的内容也一定是一样的服务器代码不会对 Message 内容做出修改 basicProperties 和 bodyaddMessage(message);System.out.println([MemoryDataCenter] 消息被投递到队列中messageID message.getMessageId() , queueName queue.getName());
}
此处发送消息到指定队列需要进行加锁操作防止重复在该队列中插入消息
从队列中获取指定消息
public Message pollMessage(String queueName) {
// 根据队列名查找一下对应的队列的消息链表
LinkedListMessage messages queueMessageMap.get(queueName);
// 如果没找到说明队列中没有任何消息
if (messages null) {return null;
}
synchronized (messages) {if (messages.size() 0) {return null;}// 链表中有元素就进行头删除Message curMessage messages.remove(0);System.out.println([MemoryDataCenter] 消息从队列中取出messageId curMessage.getMessageId());return curMessage;
}
}
此处需要进行加锁操作两个线程同时获取的时候破坏链表结构
获取指定队列中的消息个数
public int getMessageCount(String queueName) {
LinkedListMessage messages queueMessageMap.get(queueName);
if (messages null) {return 0;
}
if (messages.size() 0){return 0;
}
synchronized (messages) {System.out.println(messageSize messages.size());return messages.size();
}
} 实现待确认消息的管理
添加未确认的消息
public void addMessageWaitAck(String queueName, Message message) {ConcurrentHashMapString, Message messageHashMap queueMessageWaitAckMap.computeIfAbsent(queueName,k - new ConcurrentHashMap());messageHashMap.put(message.getMessageId(), message);System.out.println([MemoryDataCenter] 消息进入待确认队列messageId message.getMessageId());
}
删除待确认的消息已经确认过的消息
// 删除消息已确认过
public void removeMessageWaitAck(String queueName, String messageId) {ConcurrentHashMapString, Message messageHashMap queueMessageWaitAckMap.get(queueName);if (messageHashMap null) {return;}messageHashMap.remove(messageId);System.out.println([MemoryDataCenter] 消息从待确认队列中删除messageId messageId);
}
获取到指定的待确认消息
public Message getMessageWaitAck(String queueName, String messageId) {ConcurrentHashMapString, Message messageHashMap queueMessageWaitAckMap.get(queueName);if (messageHashMap null) {return null;}return messageHashMap.get(messageId);
} 实现数据从硬盘中恢复
从硬盘中读取数据把硬盘之前持久化存储的各个维度的数据恢复到内存中
清空之前集合中的数据恢复所有的交换机数据恢复所有的队列数据恢复所有的绑定数据恢复所有消息数据
注意不需要恢复待确认的消息因为在 当消息在等待 ACK的时候服务器重启了。此时消息就相当于未被取走状态而硬盘中存储的就是消息就是“未被取走”的。
总结
借助内存中的一些列数据结构 保存 交换机、队列、绑定、消息广泛使用了 哈希表、链表、嵌套的数据结构等线程安全要不要加锁锁加到哪里 虚拟主机的设计
类似于 MySQL 的 database把交换机队列绑定消息…进行逻辑上的隔离一个服务器可以有多个虚拟主机~此处我们项目就设计了一个虚拟主机VirtualHost
创建交换机exchangeDelcare
如何表示交换机和虚拟主机之间的从属关系呢
方案一参考数据库设计“一对多”方案比如给交换机表添加个属性虚拟主机 id/name方案二交换机的名字 虚拟主机名字 交换机的真实名字 按照方案二也可以去区分不同的队列进一步由于绑定和队列和交换机都相关直接就隔离开了 再进一步消息和队列是强相关的队列名区分开消息自然区分开。 此时就可以区分不同虚拟主机的不同交换机的关系
把交换机名字加上虚拟主机名字作为前缀判断交换机是否存在直接通过内存查询真正去构造交换机对象当参数为 durable的时候将交换机对象写入硬盘将交换机写入内存
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,MapString, Object arguments) {// 把交换机的名字加上虚拟主机作为前缀exchangeName virtualName exchangeName;try {synchronized (exchangeLocker) {// 1. 判定该交换机是否存在直接通过内存查询Exchange existsExchange memoryDataCenter.getExchange(exchangeName);if (existsExchange ! null) {// 该交换机已经存在System.out.println([VirtualHost] 交换机已经存在exchangeName exchangeName);return true;}// 2. 真正创建交换机先构造 Exchange 对象Exchange exchange new Exchange();exchange.setName(exchangeName);exchange.setDurable(durable);exchange.setType(exchangeType);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3. 把交换机对象写入硬盘, durable为true时才写入if (durable) {diskDataCenter.insertExchange(exchange);}// 4. 把交换机写入内存memoryDataCenter.insertExchange(exchange);System.out.println([VirtualHost] 交换机创建完成exchangeName exchangeName);// 上述逻辑先写硬盘再写内存。// 因为硬盘更容易写失败一旦失败就不写内存了// 要是先写内存内存写成功了硬盘写失败了还需要把内存数据清理了就比较麻烦}return true;} catch (Exception e) {System.out.println(([VirtualHost] 交换机创建失败exchangeName exchangeName));e.printStackTrace();return false;}
} 删除交换机exchangeDelete
根据交换机的名字找到对应的交换机删除硬盘数据删除内存中数据
// 删除交换机
public boolean exchangeDelete(String exchangeName) {
exchangeName virtualName exchangeName;
try {synchronized (exchangeLocker) {// 1. 先找到对应的交换机Exchange toDelete memoryDataCenter.getExchange(exchangeName);if (toDelete null) {throw new MqException([VirtualHost] 交换机不存在无法删除exchangeName exchangeName);}// 2. 删除硬盘数据if (toDelete.isDurable()) {diskDataCenter.deleteExchange(exchangeName);}// 3. 从内存中删除memoryDataCenter.deletaExchange(exchangeName);System.out.println([VirtualHost] 交换机删除成功exchangeName exchangeName);}return true;
} catch (Exception e) {System.out.println(([VirtualHost] 交换机删除失败exchangeName exchangeName));e.printStackTrace();return false;
}
} 创建队列queueDelcare
判断队列是否存在不存在则创建队列设定参数队列参数 durable 为 true的时候存入硬盘将队列写入到内存
// 创建队列
public boolean queueDelcare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,MapString, Object arguments) {// 把队列的名字拼接上虚拟主机名字queueName virtualName queueName;try {synchronized (queueLocker) {// 1. 判定队列是否存在MSGQueue existsQueue memoryDataCenter.getQueue(queueName);if (existsQueue ! null) {System.out.println([VirtualHost] 队列已经存在queueName queueName);return true;}// 2. 不存在则创建队列MSGQueue queue new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3. 将队列写入到硬盘durable为true时才写入if (durable) {diskDataCenter.insertQueue(queue);}// 4. 将队列写入到内存memoryDataCenter.insertQueue(queue);System.out.println([VirtualHost] 队列创建成功queueName queueName);}return true;} catch (Exception e) {System.out.println(([VirtualHost] 队列创建失败queueName queueName));e.printStackTrace();return false;}
} 删除队列queueDelete
判断队列是否存在存在则删除先在硬盘删除在内存中删除
// 队列删除
public boolean queueDelete(String queueName) {
queueName virtualName queueName;
try {synchronized (queueLocker) {// 1. 查询队列是否存在MSGQueue existsQueue memoryDataCenter.getQueue(queueName);if (existsQueue null) {throw new MqException([VirtualHost] 队列不存在无法删除);}// 2. 存在进行先在硬盘删除if (existsQueue.isDurable()) {diskDataCenter.deleteQueue(queueName);}// 3. 在内存中删除memoryDataCenter.deleteQueue(queueName);System.out.println([VirtualHost] 队列删除成功queueName queueName);}return true;
} catch (Exception e) {System.out.println(([VirtualHost] 队列删除失败queueName queueName));e.printStackTrace();return false;
}
} 创建绑定queueBind
判断当前绑定在不在验证当前的 routingKey 合不合法如果合法就创建绑定设置参数从内存中获取下绑定关系的队列和交换机是否存在都存在再次判定队列和交换机的durable是否都为 true都为 true 则存入硬盘再写入内存
public boolean queueBind(String queueName, String exchangeName, String bindingKey) {queueName virtualName queueName;exchangeName virtualName exchangeName;try { synchronized (exchangeLocker) {synchronized (queueLocker) {// 1. 判断当前的绑定是否已经存在Binding existsBinding memoryDataCenter.getBinding(exchangeName, queueName);if (existsBinding ! null) {throw new MqException([VirtualHost] binding已经存在exchangeName exchangeName , queueName queueName);}// 2. 验证 bindingKey 是否合法if (!router.checkBindingKey(bindingKey)) {throw new MqException([VirtualHost] bindingKey非法bindingKey bindingKey);}// 3. 不存在就创建绑定Binding binding new Binding();binding.setQueueName(queueName);binding.setExchangeName(exchangeName);binding.setBindingKey(bindingKey);// 4. 获取下绑定 对应的 队列和交换机是否存在Exchange existsExchange memoryDataCenter.getExchange(exchangeName);MSGQueue existsQueue memoryDataCenter.getQueue(queueName);if (existsExchange null) {throw new MqException([VirtualHost] 交换机不存在exchangeName exchangeName);}if (existsQueue null) {throw new MqException([VirtualHost] 队列不存在queueName queueName);}// 5. 先写入硬盘需要判断当前 交换机和队列是否都持久化if (existsQueue.isDurable() existsExchange.isDurable()) {diskDataCenter.insertBinding(binding);}// 6. 再写入内存memoryDataCenter.insertBinding(binding);System.out.println([VirtualHost] 绑定创建成功 exchangeName exchangeName , queueName queueName);}}return true;} catch (Exception e) {System.out.println([VirtualHost] 绑定创建失败 exchangeName exchangeName , queueName queueName);e.printStackTrace();return false;}
}删除绑定queueUnbind 有个依赖关系问题就是 如果 线程A 先删除了队列而此时另一个线程B 再去删除绑定消息时候, 就会失败因为此时队列已经不存在了此时需要解决方案 方案一参考类似于 MySQL 的外键一样删除交换机/队列的时候判定一下当前队列/交换机是否存在对应的绑定如果存在则禁止删除要求先解除绑定再尝试删除方案二直接删除不判断 交换机和队列是否存在 获取绑定是否存在删除硬盘上的数据需要判断该绑定 durable 是否为 true从内存中删除绑定
// 删除绑定
public boolean queueUnbind(String exchangeName, String queueName) {exchangeName virtualName exchangeName;queueName virtualName queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1. 获取绑定是否存在Binding existsBinding memoryDataCenter.getBinding(exchangeName, queueName);if (existsBinding null) {throw new MqException([VirtualHost] 绑定不存在无法删除exchangeName exchangeName , queueName queueName);}// // 2. 获取 对应的队列和交换机// Exchange existsExchange memoryDataCenter.getExchange(exchangeName);// MSGQueue existsQueue memoryDataCenter.getQueue(queueName);// if (existsExchange null) {// throw new MqException([VirtualHost] 交换机不存在exchangeName exchangeName);// }// if (existsQueue null) {// throw new MqException([VirtualHost] 队列不存在queueName queueName);// }// 3. 删除硬盘上的数据 需要判断当前 交换机和队列都是持久化 diskDataCenter.deleteBinding(existsBinding);// 4. 从内存中删除绑定memoryDataCenter.deleteBinding(existsBinding);System.out.println([VirtualHost] 删除绑定成功);}}return true;} catch (Exception e) {System.out.println([VirtualHost] 删除绑定失败);return false;}
} 注意考虑线程安全问题 发送消息basicPublish
发送消息的时候会往 ConsumerManager类中的 阻塞队列中 BlockingQueue tokenQueue存在该队列名表示该队列存在消息~
// 发送消息到指定的交换机/队列中
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {// 1. 转换交换机名字exchangeName virtualName exchangeName;// 2. 检查 routingKey 是否合法if (!router.checkRoutingKey(routingKey)) {throw new MqException([VirtualHost] routingKey 非法! routingKey routingKey);}// 3. 查找交换机对象Exchange exchange memoryDataCenter.getExchange(exchangeName);if (exchange null) {throw new MqException([VirtualHost] 交换机不存在! exchangeName exchangeName);}// 4. 判断交换机类型if (exchange.getType() ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 为队列名直接将 消息写入 队列中// 此时可以无视绑定消息String queueName virtualName routingKey;// 5. 构造消息对象Message message Message.createMessageWithId(routingKey, basicProperties, body);// 6. 查找该队列名所对应的 队列是否存在MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new MqException([VirtualHost] 队列不存在queueName queueName);}// 7. 队列存在则直接写入消息sendMessage(queue, message);} else {// 按照 FANOUT 和 TOPIC 方式// 5. 找到该交换机关联的所有绑定消息并遍历这些绑定消息ConcurrentHashMapString, Binding bindings memoryDataCenter.getBindings(exchangeName);for (Map.EntryString, Binding entry : bindings.entrySet()) {// 1) 获取绑定对象判断该队列是否存在Binding binding entry.getValue();// 2) 查看当前绑定里面的队列名能不能查到相应队列 (此处不设计转发规则)MSGQueue queue memoryDataCenter.getQueue(binding.getQueueName());if (queue null) {System.out.println([VirtualHost] basicPublish 发送消息时发现队列不存在queueName binding.getQueueName());continue;}// 2) 构造消息对象Message message Message.createMessageWithId(routingKey, basicProperties, body);// 3) 判断这个消息能否转发给这个队列// 如果是 fanout, 所有绑定的队列都要转发的// 如果是 topic, 还需要 判断 routingKey 和 BindingKey 是否匹配if (!router.route(exchange.getType(), binding, message)) {continue;}// 4) 真正转发消息给队列sendMessage(queue, message);}}return true;} catch (Exception e) {return false;}
}private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 1. 此处发送消息就是把消息 写入到 硬盘 和 内存int deliverMode message.getDeliverMode();// deliverMode 为2持久化 为1不持久化if (deliverMode 2) {diskDataCenter.sendMessage(queue, message);}// 写入内存memoryDataCenter.sendMessage(queue, message);// 通知消费者可以消费消息了consumerManager.notifyConsume(queue.getName());
} Topic交换机转发规则 bindingKey创建绑定的时候给绑定指定的特殊字符串 数字、字母、下划线 使用 . 把整个 routingKey 分成若干个部分 形如aaa.vvv.eewe 支持两种特殊符号作为通配符 一个是 * 形如aaa.*.bbb 只能作为被 . 分割单独的存在一个 # 形如aaa.#.bbb routingKey 发布消息的时候给消息上指定字符串 数字、字母、下划线 使用 . 把整个 routingKey 分成若干个部分 形如aaa.vvv.eew
上述规则是根据 AMQP 协议规定的
验证 bindingKey 是否合法checkBindingKey public boolean checkBindingKey(String bindingKey) {
// bindingKey的构造规则
// 1. 数字、字母、下划线
// 2. 使用 . 进行分割
// 3. 允许存在 * 和 # 作为通配符但是只能作为独立的存在
if (bindingKey.length() 0) {// 空字符串也是合法情况比如使用 DIRECT 或者 FANOUT bindingKey 是用不上return true;
}
// 检查字符串中不存在非法字符
for (int i 0; i bindingKey.length(); i) {char ch bindingKey.charAt(i);// 判定该字母是否是大写字母if (ch A ch Z) {continue;}// 判定该字母是否是小写字母if (ch a ch z) {continue;}// 判定该字母是否是阿拉伯数字if (ch 0 ch 9) {continue;}// 判定是否是 _ 或者 .if (ch _ || ch . || ch # || ch *) {continue;}return false;
}
// 检查 * # 是否是独立的部分
// 由于 . 在正则表达式中是一种特殊符号需要转义 用 \. 但是在 Java中这又是个特殊字符所以要用 \\.
String[] words bindingKey.split(\\.);
for (String word : words) {// 检查 word 长度 是否大于1并且包含了 * 或者 # 就是非法if (word.length() 1 (word.contains(#) || word.contains(*))) {return false;}
}
// 约定下通配符之间的相邻关系
// 为啥这么约定因为前三种相邻的时候实现逻辑非常繁琐同时功能性提升不大
// 1. aaa.#.#.bbb - 非法
// 2. aaa.#.*.bbb - 非法
// 3. aaa.*.#.bbb - 非法
// 4. aaa.*.*.bbb - 合法
for (int i 0; i words.length - 1; i) {// 连续两个 #if (words[i].equals(#) words[i].equals(#)) {return false;}// # *if (words[i].equals(#) words[i].equals(*)) {return false;}// * #if (words[i].equals(*) words[i].equals(#)) {return false;}
}
return true;
}验证 routingKey 是否合法checkRoutingKey // routingKey的构造规则
// 1. 数字、字母、下划线
// 2. 使用 . 分割成若干个部分
public boolean checkRoutingKey(String routingKey) {
if (routingKey.length() 0) {// 空字符串合法情况。比如使用 FANOUT 交换机的时候routingKey 用不上就可以设置成 return true;
}
for (int i 0; i routingKey.length(); i) {char ch routingKey.charAt(i);// 判定该字母是否是大写字母if (ch A ch Z) {continue;}// 判定该字母是否是小写字母if (ch a ch z) {continue;}// 判定该字母是否是阿拉伯数字if (ch 0 ch 9) {continue;}// 判定是否是 _ 或者 .if (ch _ || ch .) {continue;}// 该字符不是上述任何一种就不合法直接返回 falsereturn false;
}
return true;
} 匹配规则 private boolean routeTopic(Binding binding, Message message) {// 先把两个 Key 进行拆分String[] bindingTokens binding.getBindingKey().split(\\.);String[] routingTokens message.getRoutingKey().split(\\.);// 引入两个下标指向两个数组初始情况下都为 0int bindingIndex 0;int routingIndex 0;while (bindingIndex bindingTokens.length routingIndex routingTokens.length) {// 【情况二】 遇到 *if (bindingTokens[bindingIndex].equals(*)) {bindingIndex;routingIndex;// 【情况三】 遇到 #} else if (bindingTokens[bindingIndex].equals(#)) {bindingIndex;// 【情况四】 # 后面没有内容if (bindingIndex bindingTokens.length) {// 说明该 # 后面没有东西了 匹配成功return true;}// 后面还有东西拿着这个内容去 routingTokens 中找找到对应位置// 使用 findNextMatch 这个方法用来查找该部分 在 routingTokens 中的位置并返回下标; 没找到返回 -1routingIndex findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex -1) {// 没找到匹配结果返回 falsereturn false;}// 找到匹配结果继续往下匹配bindingIndex;routingIndex;} else {// 【情况一】普通字符串需要一模一样if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {return false;}bindingIndex;routingIndex;}}// 判定双方是否同时到达末尾 【情况五】if (bindingIndex bindingTokens.length routingIndex routingTokens.length) {return true;}return false;
} 订阅消息basicComsume 什么是函数式接口
由于 Java的函数不能脱离类的存在 为了实现 lambda, Java 引入了函数式接口lambda的本质底层实现
interface只能有一个方法还需要加 FunctionalInterface 注解 一个虚拟主机中有很多队列每个队列上都有很多条消息。 那么针对是哪个消费者订阅了哪条队列的消息需要进行一个管理。 推送给消费者消息的基本实现思路
让 brokerserver把哪些消费者管理好收到对应的消息把消息推送给消费者
消费者是以队列为维度来订阅消息的一个队列可以有多个消费者此处我们约定按照轮询的方式来进行消费。
实现一个类完成消费者消费消息核心逻辑 package com.example.mq.mqserver.datacenter;import com.example.mq.common.Consumer;
import com.example.mq.common.ConsumerEnv;
import com.example.mq.common.MqException;
import com.example.mq.mqserver.VirtualHost;
import com.example.mq.mqserver.core.MSGQueue;
import com.example.mq.mqserver.core.Message;import java.util.concurrent.*;/*** 通过这个类来实现消费消息的核心逻辑*/
public class ConsumerManager {// 持有上层的 VirtualHost 对象的引用用来操作数据private VirtualHost parent;// 指定一个线程池执行具体的回调任务private ExecutorService workPool Executors.newFixedThreadPool(4);// 存放一个 令牌queueName的队列private BlockingQueueString tokenQueue new LinkedBlockingQueue();// 扫描线程private Thread scanThread null;public ConsumerManager(VirtualHost p) {parent p;scanThread new Thread(() - {while (true) {try {// 1. 从阻塞队列中 拿到 队列名字String queueName tokenQueue.take();// 2. 根据队列名字找到队列MSGQueue queue parent.getMemoryDataCenter().getQueue(queueName);if (queue null) {throw new MqException([ConsumerManager] 取令牌后发现队列名不存在queueName queueName);}// 3. 从队列中消费消息synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程scanThread.setDaemon(true);scanThread.start();}//public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}// 记录当前队列有哪些消费者订阅了public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列MSGQueue queue parent.getMemoryDataCenter().getQueue(queueName);if (queue null) {throw new MqException([ConsumerManager] 队列不存在queueName queueName);}ConsumerEnv consumerEnv new ConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有消息了需要立即消费掉int messageCount parent.getMemoryDataCenter().getMessageCount(queueName);for (int i 0; i messageCount; i) {// 这个方法调用一次就消费一条消息consumeMessage(queue);}}}private void consumeMessage(MSGQueue queue) {// 1. 先按照轮询的方式找个消费者出来ConsumerEnv luckyDog queue.chooseConsumer();if (luckyDog null) {// 当前队列没有消费者暂时不消费return;}// 2. 从队列中取出一个消息Message message parent.getMemoryDataCenter().pollMessage(queue.getName());if (message null) {// 当前队列中没有消息就不消费return;}// 3. 把消息带入到消费者的回调函数中丢给线程池执行workPool.submit(() - {try {// 1. 把消息放入到待确认集合中, 这个操作在执行回调之前parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调luckyDog.getConsumer().handleDelivery(luckyDog.getConsumeTag(), message.getBasicProperties(), message.getBody());// 3. 如果当前是自动应答此时就可以消息删除// 手动应答先不处理交给后续消费者调用 basicAck来处理if (luckyDog.isAutoAck()) {// 1) 删除硬盘if (message.getDeliverMode() 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除待确认集合parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println([ConsumerManager] 消息被成功消费queueName queue.getName());}} catch (Exception e) {e.printStackTrace();}});}
} 订阅消息的核心逻辑就是调用 consumerManager.addConsumer方法并传入参数consumerTag、queueName、autoAck、consumer【回调函数】。 这个方法的底层是 根据传入的 queueName查到该队列然后创一个身份者表示 ConsumerEnv,存入到该队列的 ConsumerEnvList中判断该队列中时候存在消息已经存在的话就consumeMessage消费完全部消息(按照轮询方式) 关于消息确认
能够确保消息是被正确的消费掉了消费者的回调函数顺利执行完了中间没有抛出异常这条消息就可以被删除了。消息确认也就是为了保证“消息不丢失”为了达成消息不丢失这样的效果这样处理
在真正执行回调之前把这个消息先放到 “待确认的集合”中~真正回调当前消费者采取的是 autoAcktrue,就认为回调执行完毕不抛异常就算消费成功然后就可以删除消息 硬盘内存消息中心哈希表待确认消息集合 当前消费者采取的是 autoAckfalse手动应答就需要消费者再回调函数内部显式调用 basicAck这个核心API basicAck实现原理比较简单当传入参数 autoAckfalse, 就手动再回调函数的时候调用 basicAck 就行 传入queueName和messageId获取到队列和消息删除硬盘中数据删除内存中心的消息数据删除待确认集合中的消息数据 // 确认消息
public boolean basicAck(String queueName, String messageId) {try {// 1. 获取到消息和队列queueName virtualName queueName;Message message memoryDataCenter.getMessage(messageId);if (message null) {throw new MqException([VirtualHost] 要确认的消息不存在messageId messageId);}MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new MqException([VirtualHost] 要确认的队列不存在messageId messageId);}// 2. 删除硬盘上数据if (message.getDeliverMode() 2) {diskDataCenter.deleteMessage(queue, message);}// 3. 删除内存中心数据memoryDataCenter.removeMessage(messageId);// 4. 删除待确认集合中的数据memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println([VirtualHost] basicAck成功消息被成功确认queueName queueName , messageId messageId);return true;} catch (Exception e) {System.out.println([VirtualHost] basicAck失败消息确认失败queueName queueName , messageId messageId);e.printStackTrace();return false;}
}消息确认是为了保证消息的不丢失而需要的逻辑 执行回调方法的过程中抛异常了~ 当回调函数异常后续逻辑执行不到了。此时这个消费就会始终待在待确认集合中。RabbitMQ中会设置一个死信队列每一个队列都会绑定一个死信队列。应用场景当消息在消费过程中出现异常就会把消息投入到死信队列中当消息设置了过期时间如果在过期时间内没有被消费就会投入到死信队列中当队列达到最大长度时新的消息将无法被发送到队列中。此时RabbitMQ可以选择将这些无法发送的消息发送到死信队列中以便进行进一步处理。 执行回调过程中, Broker Server崩溃了~内存数据都没了但是硬盘数据还在正在消费的这个消息在硬盘中仍然存在。BrokerServer重启后这个消息就又被加载到内存了就像从来没被消费过一样。消费者就会有机会重新得到这个消息。 网络通信设计 基于TCP自定义应用层协议 type描述当前这个请求和响应是干啥的。用四个字节来存储 在MQ中客户端生产者 消费者和 服务器 Broker Server之间要进行哪些操作就是VirtualHost中的那些核心API希望客户端能通过网络远程调用这些API此处的type就是描述当前这个请求/响应是在调用哪个APITCP是有连接的. Channel 是 Connection 内部的逻辑连接。此时一个 Connection 中可能有多个连接存在的意义是让 TCP 连接得到复用创建/断开TCP连接成本挺高【需要三次握手四次挥手~】 length里面存储的是 payload的长度。用4个字节来存储payload会根据当前是请求还是响应以及当前的 type 有不同的值 比如 type 是 0x3(创建交换机)同时当前是个请求此时 payload 的内容就相当于是 exchangeDelcare 的参数的序列化结果比如 type 是 0x3(创建交换机)同时当前是个响应此时 payload 的内容就相当于是 exchangeDelcare 的返回结果的序列化内容
ExchangeDelcare 请求 Request 响应 Response ExchangeDelete 请求 Request 响应 Response QueueDelcare 请求 Request 响应 Response QueueDelete 请求 Request 响应 Response QueueBind 请求 Request 响应 Response QueueUnBind 请求 Request 响应 Response BasicPublish 请求 Request 响应 Response BasicConsume 请求 Request 响应 Response BasicAck 请求 Request 响应 Response 创建BrokerServer类 消息队列本体服务器本质上就是一个 TCP 的服务器 实现读取请求和写回响应 读取请求
private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload new byte[request.getLength()];int n dataInputStream.read(payload);if (n ! request.getLength()) {throw new IOException(读取格式出错);}request.setPayload(payload);return request;
}写回响应
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());dataOutputStream.flush();
}清理过期和会话 private void clearClosedSession(Socket clientSocket) {
// 这里要做的主要遍历 上述 sessions 哈希表把该关闭的 socket 对应的键值对全部删掉
ListString toDeleteChannelId new ArrayList();
for (Map.EntryString, Socket entry : sessions.entrySet()) {if (entry.getValue() clientSocket) {// 不能直接删除// 这属于集合类的大忌一边遍历一边删除toDeleteChannelId.add(entry.getKey());}
}
for (String channelId : toDeleteChannelId) {sessions.remove(channelId);
}
System.out.println([BrokerServer] 清理 session 完成被清理的 channelId toDeleteChannelId);
}客服端代码(mqclient) ConnectionFactory 连接工厂
这个类持有服务器的地址主要的功能就是创建出连接 Connection 对象
Data
public class ConnectionFactory {// brokerserver 的ip地址private String host;// brokerserver 的portprivate int port;public Connection newConnection() throws IOException {Connection connection new Connection(host,port);return connection;}
}Connection 表示一个TCP连接
持有 Socket 对象发送请求读取响应创建一个扫描线程由这个线程负责不停地从 socket 中读取响应数据把这个响应数据再交给对应的 channel 进行处理 如果 response.type 0xc则是服务器推送的消息利用 SubScribeReturns 来接收根据 channelId 找到相应的 channel对象利用线程池执行 channel 里面的回调函数如果是 response.type ! 0xc则当前响应是针对控制请求的响应利用 BasicReturns 来接收根据 BasicReturns 对象中的 channelId 在 channelMap中找到 channel对象并将 BasicReturns 存到 channel对象中的 basicReturnsMap 哈希表中 创建一个 channel 随机生成 CUUID将当前对象存放到 Connection 管理 channel 的哈希表中然后将 这个命令 通过 connection 发送给 服务器 管理多个 channel 对象 ConcurrentHashMapString,Channel channelMap每次创建一个 channel的时候就存进去
channel 表示一个逻辑上的连接 一个客户端可以有多个模块。 每个模块都可以和 brokerserver之间建立”逻辑上的连接“ channel 这几个模块的 channel 彼此之间是相互不影响的 但是这几个 channel 复用了同一个 TCP 连接 还需要提供一系列的方法去和服务器提供的核心API对应客户端提供的方法方法的内部就是发了一个特定的请求 对于一个客户端的一次 Connection下可能会有多个 channel就是多个逻辑上的连接那么如何区分响应 例如有 channelA 和 channelB 。channelA发送的请求AchannelB发送的请求B。此时响应的顺序不会按照顺序返回而且channelA也不用关系其他响应只关心是否收到响应A。 所以此时需要在 channel 下用一个 basicReturns来存储当前 channle 的收到服务器的响应。当客户端connetion读取到响应时候添加到 channel中 basicReturns 项目总结
写了一个消费者队列服务器。核心功能就是提供了虚拟机、交换机、队列、消息等概念的管理实现了三种典型的消息转发方式。基于上述内容就可以实现 跨主机/服务器 之间的 生产者消费者模型了。
项目扩展
虚拟主机的管理建立虚拟主机表用户管理/用户认证建立用户表可在建立连接的时候或者建立channel交换机/队列独占/自动删除/扩展参数发送方确认服务器返回响应生产者收到后触发回调拒绝应答死信队列针对消息可靠性管理接口 管理页面 源码地址MQ源码地址 可以配合文档一起看更能快速了解