当前位置: 首页 > news >正文

商业网站开发 流程深圳网页设计与制作工资多少钱

商业网站开发 流程,深圳网页设计与制作工资多少钱,加强校园网站建设方案,wordpress页面构建编辑插件前言#xff1a;本项目是仿照RabbitMQ并基于SpringBoot Mybatis SQLite3实现的消息队列#xff0c;该项目实现了MQ的核心功能#xff1a;生产者、消费者、中间人、发布、订阅等。 源码链接#xff1a;仿Rabbit MQ实现消息队列 目录 前言#xff1a;本项目是仿照Rabbi…前言本项目是仿照RabbitMQ并基于SpringBoot Mybatis SQLite3实现的消息队列该项目实现了MQ的核心功能生产者、消费者、中间人、发布、订阅等。 源码链接仿Rabbit MQ实现消息队列 目录 前言本项目是仿照RabbitMQ并基于SpringBoot Mybatis SQLite3实现的消息队列该项目实现了MQ的核心功能生产者、消费者、中间人、发布、订阅等。 一、核心概念 二、模块划分  三、创建核心实体类  3.1 创建交换机Exchange 3.2 创建队列实体类MSGQueue 3.3 创建绑定实体类Binding 3.4 创建消息实体类Message 四、数据库操作 五、封装对数据库的操作 六、消息的存储设计 6.1 设计思路及设定 6.2 设定存储消息的格式 6.3 实现消息序列化工具  七、实现文件管理消息 7.1 读取消息统计文件 7.2 写消息统计文件 7.3 创建队列对应的文件和目录 7.4 删除队列对应的文件和目录 7.5 判断队列的目录和消息文件是否存在 7.6 新的消息写入到文件中  7.7 删除队列对应的消息数据文件中的消息 7.8 从文件中读取消息到内存中 7.9 判断垃圾回收时机 7.10 获取新消息数据文件的路径 7.11 消息的垃圾回收 八、统一硬盘处理 九、内存管理 9.1 交换机相关操作的API 9.2 队列相关操作的API 9.3 绑定相关操作的API 9.4 消息相关操作的API 十、虚拟机 VirtualHost 10.1 创建交换机 10.2 删除交换机 10.3 创建队列 10.4 删除队列 10.5 创建绑定 10.6 删除绑定 10.7 发送消息 10.8 订阅消息 10.9 消息确认 十一、网络通信协议设计 11.1 设计应用层协议  11.2 定义Request/Response 11.3 定义参数父类 11.4 定义返回值父类 11.5 定义其他参数类 十二、实现 BrokerServer 类 12.1 启动/停止服务器 12.2 实现处理连接 12.3 实现 readRequest / writeResponse 12.4 实现处理请求 12.5 实现清理过期的会话 十三、实现客户端 13.1 创建 ConnectionFactory 13.2 Connection 和 Channel 定义 十四、样例演示 一、核心概念 关于消息队列有几个重要的核心概念 生产者Producer 负责将应用程序产生的数据转换成消息并将这些消息推送到消息队列服务器上以便消费者Consumer可以接收并处理这些消息。消费者Consumer它的主要职责是监听特定的队列或主题并对到达的消息执行必要的业务逻辑。中间人Broker作为生产者Producer和消费者Consumer之间的中介负责管理和协调消息的传递过程。发布Publish生产者向中间人投递消息的过程。订阅Subscribe哪些消费者要从这个中间人获取数据这个注册的过程称为订阅。 在中间人Broker模块又有以下几个概念 虚拟机 (VirtualHost)类似于 MySQL 的 database, 是⼀个逻辑上的集合⼀个 BrokerServer 上可以存在多个 VirtualHost。 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上再根据不同的规则, 把消息转发给不同的 Queue。队列 (Queue): 真正⽤来存储消息的部分每个消费者决定⾃⼰从哪个 Queue 上读取消息。 绑定 (Binding): Exchange 和 Queue 之间的关联关系Exchange 和 Queue 可以理解成 多对多 关系使⽤⼀个关联表就可以把这两个概念联系起来。 二、模块划分  明确需要做的工作 实现生产者、消费者、Broker Server这三个部分。 针对生产者、消费者主要实现的是客户端和服务器的网络通信部分。 给客户端提供一组 API让客户端的业务代码来调用通过网络通信的方式远程调用Broker Server上的方法。 实现Broker Server 内部的一些基本概念和API虚拟主机、交换机、队列、绑定、消息。 持久化考虑到 SQLite 相比 MySQL 来说比较轻量因此存储交换机、队列等这些实体用 SQLite消息的存储使用文件进行管理。 针对于上述所需要实现的模块进行划分 三、创建核心实体类  3.1 创建交换机Exchange nametypedurableautoDeletearguments交换机身份标识交换机类型是否持久化是否自动删除 额外参数选项 Data public class Exchange {//交换机的身份标识(唯一)private String name;//交换机类型 direct fanout topicprivate ExchangeType type ExchangeType.DIRECT;//表示该交换机是否要持久化存储. true 表示需要持久化. false 表示不需要持久化private boolean durable false;//如果当前交换机无客户端使用就自动删除private boolean autoDelete false;//表示创建交换机时指定的一些额外的参数选项private MapString,Object arguments new HashMap(); } 此处省略 arguments 存储数据库时的Json转换只需要使用 ObjectMapper即可实现。关于交换机的类型此次主要实现了以下三种DIRECT、FANOUT、TOPIC。并使用枚举类定义 public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type){this.type type;}public int getType() {return type;} } 3.2 创建队列实体类MSGQueue namedurableexclusiveautoDeleteargumentsconsumerEnvListconsumerSeq队列标识是否持久化是否独占是否自动删除额外参数选项当前订阅的消费者列表记录当前取到第几个消费者 Data public class MSGQueue {//表示队列的身份标识private String name;//表示队列是否持久化 true:需要持久化 false:不需要持久化private boolean durable false;//表示是否独占true:独占 false:都可以使用private boolean exclusive false;//表示无客户端使用是是否自动删除private boolean autoDelete false;//表示扩展参数private MapString,Object arguments new HashMap();//当前队列都有哪些消费者订阅了.private ListConsumerEnv consumerEnvList new ArrayList();//记录当前取到的第几个消费者,方便实现轮询策略private AtomicInteger consumerSeq new AtomicInteger(0); } 3.3 创建绑定实体类Binding exchangeNamequeueNamebindingKey交换机名字队列名字绑定(和 routingKey匹配) Data public class Binding {// 交换机名字private String exchangeName; //队列名字private String queueName;// 只在交换机类型为 TOPIC 时才有效. ⽤于和消息中的 routingKey 进⾏匹配private String bindingKey; } 3.4 创建消息实体类Message 消息存储为二进制形式因此对消息的存储需要进行序列化处理所以 Message 类要实现Serializable 接口。 basicProperties bodyoffsetBegoffsetEndisValid消息的属性消息的正文消息开头在文件中的偏移量消息末尾在文件中的偏移量是否有效 Data public class Message implements Serializable {private BasicProperties basicProperties new BasicProperties();private byte[] body;//辅助属性,后续消息要存储在文件中//一个文件存储很多消息 [offsetBeg, offsetEnd)private transient long offsetBeg 0;//消息数据的开头距离文件开头的位置偏移(字节)private transient long offsetEnd 0;//消息数据的结尾距离文件开头的位置偏移(字节)private byte isValid 0x1;//表示该消息在文件中是否是有效的消息,逻辑删除, 0x1:有效 0x0:无效//创建一个工厂方法,让工厂方法去封装一下创建 Message 对象的过程//这个方法创建的 Message 会自动生成一个唯一的 MessageIdpublic static Message createMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body){Message message new Message();if(basicProperties ! null){message.setBasicProperties(basicProperties);}//此处生成的 MessageId 以 M- 为前缀, 方便区分message.setMessageId(M- UUID.randomUUID());message.setRoutingKey(routingKey);message.body body;return message;}public String getMessageId(){return basicProperties.getMessageId();}public void setMessageId(String messageId){basicProperties.setMessageId(messageId);}public String getRoutingKey(){return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey){basicProperties.setRoutingKey(routingKey);}public int getDeliverMode(){return basicProperties.getDeliverMode();}public void setDeliverMode(int deliverMode){basicProperties.setDeliverMode(deliverMode);} } 对于消息的属性使用一个实体类去表示 messageIdroutingKeydeliverMode消息的唯一身份标识和(bindingKey匹配)是否持久化 Data public class BasicProperties implements Serializable {//消息的唯一身份标识,使用 UUID 作为 messageIdprivate String messageId;/*** 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名* 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用)* 如果当前的交换机类型是 TOPIC, 此时 routingKey 就表示和 bindingKey 进行匹配*/private String routingKey;//表示消息是否持久化, 1: 不持久化; 2: 持久化private int deliverMode 1; } 四、数据库操作 对于 Exchange, MSGQueue, Binding, 需要使⽤数据库进⾏持久化保存这里使用 SQLite 进行存储直接去 Maven 中央仓库复制依赖到项目的POM文件再配置数据库文件即可。 SQLite 只是把数据单纯的存储到⼀个⽂件中因此在这里设定存储到 “./data/meta.db”文件。 实现创建表以及数据库操作这里不再展示具体的SQL语句 Mapper public interface MetaMapper {//三个核心建表方法void createExchangeTable();void createQueueTable();void createBindingTable();//针对上述三个基本概念进行插入和删除void insertExchange(Exchange exchange);ListExchange selectAllExchanges();void deleteExchange(Param(exchangeName) String exchangeName);void insertQueue(MSGQueue queue);ListMSGQueue selectAllQueues();void deleteQueue(Param(queueName) String queueName);void insertBinding(Binding binding);ListBinding selectAllBindings();void deleteBinding(Binding binding);} 五、封装对数据库的操作 创建 DataBaseManager 类通过这个类来封装针对数据库的操作。 public class DataBaseManager {//数据库初始化public void init(){...}//删除数据库public void deleteDB(){...}//判断数据库是否存在private boolean checkDBExists(){...}//建表操作private void createTable(){...}//创建默认数据RabbitMQ 里默认也带有一个 匿名 的交换机类型是 DIRECTprivate void createDefaultData(){...}//交换机的数据库操作:增删查public void insertExchange(Exchange exchange){...}public void deleteExchange(String exchangeName){...}public ListExchange selectAllExchanges(){...}//队列的数据库操作:增删查public void insertQueue(MSGQueue queue){...}public void deleteQueue(String queueName){...}public ListMSGQueue selectAllQueues(){...}//Binding的数据库操作:增删查public void insertBinding(Binding binding){...}public void deleteBinding(Binding binding){...}public ListBinding selectAllBindings(){...} } 六、消息的存储设计 6.1 设计思路及设定 设计思路消息需要在硬盘上存储考虑到对于消息的操作并不需要复杂的增删改查而⽂件的操作效率比数据库会高很多因此这里设定用文件来管理消息。 同时因为队列用来存储消息因此这里约定 给每个队列分配⼀个目录目录的名字为 data 队列名形如 ./data/queueName该目录中包含两个固定名字的⽂件: queue_data.txt 消息数据⽂件, 用来保存消息内容。  queue_stat.txt 消息统计⽂件, 用来保存消息统计信息。消息总个数/t有效消息数这样设计主要时考虑到后续进行垃圾回收方便判断进行GC的时机。 6.2 设定存储消息的格式 消息数据文件以二进制的形式存储在 queue_data.txt 文件中 为了方便进行消息的读取这里进行这样的设定 每个消息分成两个部分前四个字节, 表示 Message 对象的长度(字节数)后面若干字节表示 Message 内容消息和消息之间首尾相连。同时每个 Message 基于 Java 标准库进行序列化。 Message 对象中的 offsetBeg 和 offsetEnd 正是⽤来描述每个消息体所在的位置。 6.3 实现消息序列化工具  对于实现消息序列化首先 Message 实体类要实现  Serializable 接口接下来需要借助ByteArrayOutputStream 和 ObjectOutputStream 实现消息的序列化和反序列化 public class BinaryTool {/*** 把对象序列化成一个字节数组* param object* return*/public static byte[] toBytes(Object object) throws IOException {// 这个流对象相当于一个变长的字节数组,// 可以把 object 对象序列化的数据逐步写入导 byteArrayOutputStream 中,// 再统一转成 byte[]try (ByteArrayOutputStream byteArrayOutputStream new ByteArrayOutputStream()){try (ObjectOutputStream objectOutputStream new ObjectOutputStream(byteArrayOutputStream)){// 此处的 writeObject 就会把 object 进行序列化,生成的字节数据就会写入到 objectOutputStream// objectOutputStream 又关联到了 byteArrayOutputStream,最终结果写入到 byteArrayOutputStream 里objectOutputStream.writeObject(object);}return byteArrayOutputStream.toByteArray();}}/*** 把一个字节数组反序列化成一个对象* param data* return*/public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object null;try (ByteArrayInputStream byteArrayInputStream new ByteArrayInputStream(data)){try (ObjectInputStream objectInputStream new ObjectInputStream(byteArrayInputStream)){object objectInputStream.readObject();}}return object;} } 七、实现文件管理消息 创建 MessageFileManager 类这个类主要去实现消息统计文件的读写、消息数据文件的读写、创建存储消息的文件、消息的垃圾回收等等。 下面这段代码是 MessageFileManager 的基础代码实现文件的读写和垃圾回收都需要调用下面的方法 public class MessageFileManager {//定义一个内部类,来表示该队列的统计信息static public class Stat{public int totalCount; // 总消息数量public int validCount; // 有效消息数量}public void init(){}//约定消息文件所在的目录和文件名// 所在路径及文件名: ./data/队列名/queue_data.txt(消息数据文件)// ./data/队列名/queue_stat.txt(消息统计文件)/*** 这个方法用来获取指定队列对应的消息文件所在路径* param queueName* return*/private String getQueueDir(String queueName){return ./data/ queueName;}/*** 这个方法用来获取该队列的消息数据文件路径* param queueName* return*/private String getQueueDataPath(String queueName){return getQueueDir(queueName) /queue_data.txt;}/*** 这个方法用来获取指定队列的消息统计文件的路径* param queueName* return*/private String getQueueStatPath(String queueName){return getQueueDir(queueName) /queue_stat.txt;}7.1 读取消息统计文件 private Stat readStat(String queueName){Stat stat new Stat();try (InputStream inputStream new FileInputStream(getQueueStatPath(queueName))){Scanner scanner new Scanner(inputStream);stat.totalCount scanner.nextInt();stat.validCount scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;} 7.2 写消息统计文件 private void writeStat(String queueName, Stat stat){// 使用 PrintWriter 写文件try (OutputStream outputStream new FileOutputStream(getQueueStatPath(queueName))){PrintWriter printWriter new PrintWriter(outputStream);printWriter.write(stat.totalCount \t stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}} 7.3 创建队列对应的文件和目录 public void createQueueFiles(String queueName) throws IOException {//1.创建队列对应的消息目录File baseDir new File(getQueueDir(queueName));if(!baseDir.exists()){boolean success baseDir.mkdirs();if(!success){throw new IOException(创建目录失败! baseDir : baseDir.getAbsolutePath());}}//2.创建消息数据文件File queueDataFile new File(getQueueDataPath(queueName));if(!queueDataFile.exists()){boolean success queueDataFile.createNewFile();if(!success){throw new IOException(创建消息数据文件失败! queueDataFile: queueDataFile.getAbsolutePath());}}//3.创建消息统计文件File queueStatFile new File(getQueueStatPath(queueName));if(!queueStatFile.exists()){boolean success queueStatFile.createNewFile();if(!success){throw new IOException(创建消息统计文件失败! queueStatFile: queueStatFile.getAbsolutePath());}}//4.给消息统计文件设置初始值Stat stat new Stat();stat.totalCount 0;stat.validCount 0;writeStat(queueName,stat);} 7.4 删除队列对应的文件和目录 public void destroyQueueFiles(String queueName) throws IOException {//先删除文件,再删除目录File queueDataFile new File(getQueueDataPath(queueName));boolean success1 queueDataFile.delete();File queueStatFile new File(getQueueStatPath(queueName));boolean success2 queueStatFile.delete();File baseDir new File(getQueueDir(queueName));boolean success3 baseDir.delete();if(!success1 || !success2 || !success3){//删除失败throw new IOException(删除队列目录和消息文件失败! baseDir: baseDir.getAbsolutePath());}} 7.5 判断队列的目录和消息文件是否存在 public boolean checkFileExists(String queueName){//判断队列的 消息数据文件 和 消息统计文件 是否都存在File queueDataFile new File(getQueueDataPath(queueName));File queueStatFile new File(getQueueStatPath(queueName));if(queueDataFile.exists() queueStatFile.exists()){return true;}return false;} 7.6 新的消息写入到文件中  步骤 检查要写入的队列对应的文件是否存在对 Message 对象进行序列化获取当前消息数据文件的长度由此来设置当前要写入的消息的 offsetBeg 和 offsetEnd。 offsetBeg 消息数据文件长度 4 offsetEnd 消息数据文件长度 4 该消息序列化后的 byte 数组的长度 写入消息数据文件更新消息统计文件  public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 检查当前要写入的队列对应的文件是否存在if(!checkFileExists(queue.getName())){throw new MqException([MessageFileManager] 队列对应的文件不存在! queueName: queue.getName());}// 把 Message 对象进行序列化,转成二进制的字节数组byte[] messageBinary BinaryTool.toBytes(message);// 避免出现线程安全问题,即多个消息同时都往一个消息队列里面写消息synchronized (queue){// 获取到队列数据文件的长度,计算出该 Message 对象的 offsetBeg 和 offsetEnd// 把新的 Message 数据写入到数据文件的末尾, 此时 Message 对象的 offsetBeg, 就是当前文件长度 4// offsetEnd 就是当前文件长度 4 message长度File queueDateFile new File(getQueueDataPath(queue.getName()));message.setOffsetBeg(queueDateFile.length() 4);message.setOffsetEnd(queueDateFile.length() 4 messageBinary.length);// 写入消息数据文件, 此处是追加写try (OutputStream outputStream new FileOutputStream(queueDateFile, true)){try (DataOutputStream dataOutputStream new DataOutputStream(outputStream)){// 先写消息长度,占据 4 个字节dataOutputStream.writeInt(messageBinary.length);// 写入消息本体dataOutputStream.write(messageBinary);}}// 更新消息统计文件Stat stat readStat(queue.getName());stat.totalCount 1;stat.validCount 1;writeStat(queue.getName(),stat);}} 7.7 删除队列对应的消息数据文件中的消息 这里的删除采用逻辑删除即把 Message 对象从文件中读取出来之后把 valid 属性设置成 0 再重新写入并更新消息统计文件。 public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {synchronized (queue){try (RandomAccessFile randomAccessFile new RandomAccessFile(getQueueDataPath(queue.getName()),rw)){// 读取对应的 Message 数据.byte[] bufferSrc new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 转换为 Message 对象Message diskMessage (Message) BinaryTool.fromBytes(bufferSrc);diskMessage.setIsValid((byte) 0x0);// 重新写入文件byte[] bufferDest BinaryTool.toBytes(diskMessage);randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);}// 更新统计文件Stat stat readStat(queue.getName());if(stat.validCount 0){stat.validCount - 1;}writeStat(queue.getName(),stat);}} 7.8 从文件中读取消息到内存中 public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedListMessage messages new LinkedList();long currentOffset 0; // 使用这个变量记录光标的位置try (InputStream inputStream new FileInputStream(getQueueDataPath(queueName))){try (DataInputStream dataInputStream new DataInputStream(inputStream)){while (true){//读取一条消息长度int messageSize dataInputStream.readInt();// 按照长度读取消息内容byte[] buffer new byte[messageSize];int actualSize dataInputStream.read(buffer);if(messageSize ! actualSize){//不匹配说明文件有问题throw new MqException([MessageFileManager] 文件格式错误! queueName: queueName);}//反序列化Message message (Message) BinaryTool.fromBytes(buffer);//判断是否是无效数据if (message.getIsValid() ! 0x1){currentOffset (4 messageSize);continue;}//有效数据,加入到链表中message.setOffsetBeg(currentOffset 4);message.setOffsetEnd(currentOffset 4 messageSize);currentOffset (4 messageSize);messages.add(message);}}catch (EOFException e){System.out.println([MessageFileManager] 恢复 Message 数据完成!);}return messages;}} 7.9 判断垃圾回收时机 这里的数字是拍脑门写的当消息总数大于 2000 并且 消息的有效个数小于 50% 时进行垃圾回收。 public boolean checkGC(String queueName){// 读取消息统计文件的数据Stat stat readStat(queueName);if(stat.totalCount 2000 (double)stat.validCount / stat.totalCount 0.5){return true;}return false;} 7.10 获取新消息数据文件的路径 public String getQueueDataNewPath(String queueName){return getQueueDir(queueName) /queue_data_new.txt;} 7.11 消息的垃圾回收 这里我采用了复制算法进行垃圾回收具体实现步骤 创建一个新的文件,命名为 queue_data_new.txt把之前消息数据文件的有效消息读取并写到新的文件中删除旧的消息数据文件进行文件重命名同时记录消息统计文件 public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {synchronized (queue){long gcBeg System.currentTimeMillis();//创建一个新的文件File queueDataNewFile new File(getQueueDataNewPath(queue.getName()));if(queueDataNewFile.exists()){throw new MqException([MessageFileManager] gc 时发现该队列的 queue_data_new.txt 已经存在! queueName: queue.getName());}boolean success queueDataNewFile.createNewFile();if(!success){throw new MqException([MessageFileManager] 创建文件失败! queueDataNewFile: queueDataNewFile.getAbsolutePath());}// 从旧的消息文件中读取所有的有效数据文件LinkedListMessage messages loadAllMessageFromQueue(queue.getName());// 把有效的消息全部写入到新的文件中try (OutputStream outputStream new FileOutputStream(queueDataNewFile)){try (DataOutputStream dataOutputStream new DataOutputStream(outputStream)){for (Message message : messages){byte[] buffer BinaryTool.toBytes(message);//先写四个字节消息的长度dataOutputStream.writeInt(buffer.length);// 再写消息的内容dataOutputStream.write(buffer);}}}// 删除旧的数据文件,进行文件重命名File queueDataOldFile new File(getQueueDataPath(queue.getName()));success queueDataOldFile.delete();if(!success){throw new MqException([MessageFileManager] 旧的数据文件删除失败! queueDataOldFile: queueDataOldFile.getAbsolutePath());}success queueDataNewFile.renameTo(queueDataOldFile);if(!success){throw new MqException([MessageFileManager] 文件重命名失败! queueDataNewFile: queueDataNewFile.getAbsolutePath() , queueDataOldFile: queueDataOldFile.getAbsolutePath());}// 更新消息统计文件Stat stat readStat(queue.getName());stat.totalCount messages.size();stat.validCount messages.size();writeStat(queue.getName(),stat);long gcEnd System.currentTimeMillis();System.out.println([MessageFileManager] gc 执行完毕! queueName: queue.getName() , time: (gcEnd - gcBeg) ms);}} 八、统一硬盘处理 消息存储在文件中交换机、绑定、队列存储在数据库中对此进行统一处理。也就是说使用一个类管理所有硬盘上的数据。 public class DiskDataCenter {//这个实例用来管理数据库的数据private DataBaseManager dataBaseManager new DataBaseManager();//这个实例用来管理文件中的数据private MessageFileManager messageFileManager new MessageFileManager();public void init(){dataBaseManager.init();messageFileManager.init();}/*** 封装交换机、绑定、队列操作* param exchange*/public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}public ListExchange selectAllExchanges(){return dataBaseManager.selectAllExchanges();}//封装队列操作public void insertQueue(MSGQueue queue) throws IOException {messageFileManager.createQueueFiles(queue.getName());dataBaseManager.insertQueue(queue);}public void deleteQueue(String queueName) throws IOException {messageFileManager.destroyQueueFiles(queueName);dataBaseManager.deleteQueue(queueName);}public ListMSGQueue selectAllQueues(){return dataBaseManager.selectAllQueues();}//封装绑定操作public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding){dataBaseManager.deleteBinding(binding);}public ListBinding selectAllBindings(){return dataBaseManager.selectAllBindings();}/*** 封装消息操作*/public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);if(messageFileManager.checkGC(queue.getName())){messageFileManager.gc(queue);}}public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);} } 九、内存管理 这里主要使用了线程安全的哈希表保存内存中的消息、交换机、绑定、队列等。 public class MemoryDataCenter {// key 是 exchangeName value 是 Exchange 对象private ConcurrentHashMapString, Exchange exchangeMap new ConcurrentHashMap();// key 是 queueName, value 是 MSGQueue 对象private ConcurrentHashMapString, MSGQueue queueMap new ConcurrentHashMap();// 第一个 key 是 exchangeName, 第二个 key 是 queueNameprivate ConcurrentHashMapString,ConcurrentHashMapString, Binding bindingsMap new ConcurrentHashMap();// key 是 messageId, value 是 Message 对象private ConcurrentHashMapString, Message messageMap new ConcurrentHashMap();// key 是 queueName, value 是 Message 的链表private ConcurrentHashMapString, LinkedListMessage queueMessageMap new ConcurrentHashMap();// 第一个 key 是 queueName 第二个 key 是 messageIdprivate ConcurrentHashMapString,ConcurrentHashMapString,Message queueMessageWaitAckMap new ConcurrentHashMap(); } 9.1 交换机相关操作的API 新增交换机 public void insertExchange(Exchange exchange){exchangeMap.put(exchange.getName(), exchange);System.out.println([MemoryDataCenter] 新交换机添加成功! exchangeName: exchange.getName());} 查询交换机 public Exchange getExchange(String exchangeName){return exchangeMap.get(exchangeName);} 删除交换机 public void deleteExchange(String exchangeName){exchangeMap.remove(exchangeName);System.out.println([MemoryDataCenter] 删除交换机成功! exchangeName: exchangeName);} 9.2 队列相关操作的API 新增队列  public void insertQueue(MSGQueue queue){queueMap.put(queue.getName(), queue);System.out.println([MemoryDataCenter] 新队列添加成功! queueName: queue.getName());} 查询队列 public MSGQueue getQueue(String queueName){return queueMap.get(queueName);} 删除队列 public void deleteQueue(String queueName){queueMap.remove(queueName);System.out.println([MemoryDataCenter] 删除队列成功成功! queueName: queueName);} 9.3 绑定相关操作的API 新增绑定  public void insertBinding(Binding binding) throws MqException {//先使用 exchangeName 查一下对应的哈希表是否存在ConcurrentHashMapString,Binding bindingMap bindingsMap.computeIfAbsent(binding.getExchangeName(), k - new ConcurrentHashMap());synchronized (bindingMap){//再根据 queueName 查一下如果已经存在就抛出异常不存在才能插入if(bindingMap.get(binding.getQueueName()) ! null){throw new MqException([MemoryDataCenter] 绑定已经存在! exchangeName: binding.getExchangeName() , queueName: binding.getQueueName());}bindingMap.put(binding.getQueueName(),binding);}System.out.println([MemoryDataCenter] 新绑定添加成功! queueName: binding.getQueueName() , exchangeName: binding.getExchangeName());} 根据交换机名和队列名查询绑定 public Binding getBinding(String exchangeName,String queueName){ConcurrentHashMapString,Binding bindingMap bindingsMap.get(exchangeName);if(bindingMap null){return null;}return bindingMap.get(queueName);} 查询交换机绑定的所有队列 public ConcurrentHashMapString,Binding getBindings(String exchangeName){return bindingsMap.get(exchangeName);} 删除绑定 public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMapString,Binding bindingMap bindingsMap.get(binding.getExchangeName());if(bindingMap null){//无法删除throw new MqException([MemoryDataCenter] 绑定不存在! exchangeName: binding.getExchangeName() , queueName: binding.getQueueName());}bindingMap.remove(binding.getQueueName());System.out.println([MemoryDataCenter] 绑定删除成功! queueName: binding.getQueueName() , exchangeName: binding.getExchangeName());} 9.4 消息相关操作的API 添加消息  public void addMessage(Message message){messageMap.put(message.getMessageId(), message);System.out.println([MemoryDataCenter] 新消息添加成功! messageId: message.getMessageId());} 查询消息 public Message getMessage(String messageId){return messageMap.get(messageId);} 删除消息 public void removeMessage(String messageId){messageMap.remove(messageId);System.out.println([MemoryDataCenter] 消息成功移除! messageId: messageId);} 发送消息到指定队列 public void sendMessage(MSGQueue queue, Message message){//把消息放到指定的数据结构中LinkedListMessage messages queueMessageMap.computeIfAbsent(queue.getName(), k - new LinkedList());synchronized (messages){messages.add(message);}addMessage(message);System.out.println([MemoryDataCenter] 消息被投递到队列中! messageId: message.getMessageId());} 从队列中获取消息 public Message pollMessage(String queueName){LinkedListMessage messages queueMessageMap.get(queueName);if(messages null){return null;}synchronized (messages){if(messages.size() 0){return null;}//取头元素Message curMessage messages.remove(0);System.out.println([MemoryDataCenter] 消息从队列中取出! messageId: curMessage.getMessageId());return curMessage;}} 获取指定队列中的消息个数 public int getMessageCount(String queueName){LinkedListMessage messages queueMessageMap.get(queueName);if(messages null){return 0;}synchronized (messages){return messages.size();}} 添加未确认的消息 public void addMessageWaitAck(String queueName,Message message){ConcurrentHashMapString,Message messageHashMap queueMessageWaitAckMap.computeIfAbsent(queueName,k - new ConcurrentHashMap());messageHashMap.put(message.getMessageId(),message);System.out.println([MemoryDataCenter] 消息进入待确认队列! messageId: message.getMessageId());} 删除已经确认的消息 public void removeMessageWaitAck(String queueName, String messageId){ConcurrentHashMapString,Message messageHashMap queueMessageWaitAckMap.get(queueName);if(messageHashMap null){return;}messageHashMap.remove(messageId);System.out.println([MemoryDataCenter] 消息从待确认队列删除! messageId: messageId);} 获取未确认的消息 public Message getMessageWaitAck(String queueName, String messageId){ConcurrentHashMapString,Message messageHashMap queueMessageWaitAckMap.get(queueName);if(messageHashMap null){return null;}return messageHashMap.get(messageId);} 从硬盘读取数据恢复到内存中 public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {// 1. 恢复所有的交换机数据exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageMap.clear();ListExchange exchanges diskDataCenter.selectAllExchanges();for (Exchange exchange : exchanges){exchangeMap.put(exchange.getName(), exchange);}// 2. 恢复所有的队列数据ListMSGQueue queues diskDataCenter.selectAllQueues();for(MSGQueue queue : queues){queueMap.put(queue.getName(), queue);}// 3. 恢复所有的绑定数据ListBinding bindings diskDataCenter.selectAllBindings();for(Binding binding : bindings){ConcurrentHashMapString,Binding bindingMap bindingsMap.computeIfAbsent(binding.getExchangeName(),k - new ConcurrentHashMap());bindingMap.put(binding.getQueueName(),binding);}// 4. 恢复所有的消息数据// 遍历所有的队列 根据每个队列的名字获取到所有的消息for(MSGQueue queue : queues){LinkedListMessage messages diskDataCenter.loadAllMessageFromQueue(queue.getName());queueMessageMap.put(queue.getName(), messages);for(Message message : messages){messageMap.put(message.getMessageId(), message);}}// 针对未确认的消息不需要从硬盘上恢复,一旦在等待 ack 的过程中服务器重启,此时被恢复成未被取走的消息} 十、虚拟机 VirtualHost 每个虚拟机下都管理着自己的交换机、队列、绑定、消息这些数据同时供上层API进行调用本项目目前只支持单个交换机。 public class VirtualHost {private String virtualHostName;private MemoryDataCenter memoryDataCenter new MemoryDataCenter();private DiskDataCenter diskDataCenter new DiskDataCenter();private Router router new Router();private ConsumerManager consumerManager new ConsumerManager(this);private final Object exchangeLocker new Object();// 操作交换机的锁对象private final Object queueLocker new Object(); // 操作队列的锁对象public VirtualHost(String name){this.virtualHostName name;//对于 MemoryDataCenter 来说, 不需要初始化//针对 DiskDataCenter 来说,需要进行初始化操作,建库建表和初始数据的设定//此外,针对硬盘的数据,恢复到内存中diskDataCenter.init();try {memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();System.out.println([VirtualHost] 恢复内存数据失败!);}}public String getVirtualHostName() {return virtualHostName;}public MemoryDataCenter getMemoryDataCenter() {return memoryDataCenter;}public DiskDataCenter getDiskDataCenter() {return diskDataCenter;} 其中Router 类规定了交换机转发的规则 public class Router {/*** bindingKey 的构造规则* 1.数字、字母、下划线* 2.使用 . 分割成若干部分* 3.允许存在 * 和 # 作为通配符,但是通配符只能作为一个独立的分段* param bindingKey* return*/public boolean checkBindingKey(String bindingKey){}/*** routingKey 的构造规则:* 1.数字、字母、下划线* 2.使用 . 分割成若干部分*/public boolean checkRoutingKey(String routingKey){}/*** 判定该消息是否可以转发给这个绑定对应的队列* param exchangeType* param binding* param message* return*/public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException{}/*** 校验 bindingKey 和 routingKey 是否匹配* param binding* param message* return*/private boolean routeTopic(Binding binding,Message message){} } 10.1 创建交换机 public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,MapString,Object arguments){//把交换机的名字加上虚拟主机的名字exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker){//判定该交换机是否已经存在,直接通过内存查询Exchange existsExchange memoryDataCenter.getExchange(exchangeName);if(existsExchange ! null){//该交换机已经存在System.out.println([VirtualHost] 交换机已经存在! exchangeName: exchangeName);return true;}Exchange exchange new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);if(durable){diskDataCenter.insertExchange(exchange);}memoryDataCenter.insertExchange(exchange);System.out.println([VirtualHost] 交换机创建完成! exchangeName: exchangeName);}return true;}catch (Exception e){System.out.println([VirtualHost] 交换机创建失败! exchangeName: exchangeName);e.printStackTrace();return false;}} 10.2 删除交换机 public boolean exchangeDelete(String exchangeName){exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker){Exchange toDelete memoryDataCenter.getExchange(exchangeName);if(toDelete null){throw new MqException([VirtualHost] 交换机不存在,无法删除!);}if(toDelete.isDurable()){diskDataCenter.deleteExchange(exchangeName);}memoryDataCenter.deleteExchange(exchangeName);System.out.println([VirtualHost] 交换机删除成功! exchangeName: exchangeName);}return true;}catch (Exception e){System.out.println([VirtualHost] 交换机删除失败! exchangeName: exchangeName);e.printStackTrace();return false;}} 10.3 创建队列 public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,MapString,Object arguments){queueName virtualHostName queueName;try {synchronized (queueLocker){//1.判断队列是否存在MSGQueue existsQueue memoryDataCenter.getQueue(queueName);if(existsQueue ! null){System.out.println([VirtualHost] 队列已经存在! queueName: queueName);return false;}//2.构造队列对象MSGQueue queue new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);//3.插入硬盘if(queue.isDurable()){diskDataCenter.insertQueue(queue);}//4.插入内存memoryDataCenter.insertQueue(queue);System.out.println([VirtualHost] 队列创建成功! queueName: queueName);}return true;}catch (Exception e){System.out.println([VirtualHost] 创建队列失败! queueName: queueName);e.printStackTrace();return false;}} 10.4 删除队列 public boolean queueDelete(String queueName){queueName virtualHostName queueName;try {synchronized (queueLocker){MSGQueue toDelete memoryDataCenter.getQueue(queueName);if (toDelete null){throw new MqException([VirtualHost] 队列不存在,无法删除!);}if(toDelete.isDurable()){diskDataCenter.deleteQueue(queueName);}memoryDataCenter.deleteQueue(queueName);System.out.println([VirtualHost] 队列删除成功! queueName: queueName);}return true;}catch (Exception e){System.out.println([VirtualHost] 队列删除失败! queueName: queueName);e.printStackTrace();return false;}} 10.5 创建绑定 public boolean queueBind(String queueName,String exchangeName,String bindingKey){queueName virtualHostName queueName;exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker){synchronized (queueLocker){//1.判断绑定是否存在Binding existsBinding memoryDataCenter.getBinding(exchangeName,queueName);if(existsBinding ! null){throw new MqException([VirtualHost] 绑定已经存在! queueName: queueName , exchangeName: exchangeName);}//2.判断绑定是否合法if(!router.checkBindingKey(bindingKey)){throw new MqException([VirtualHost] bindingKey 不合法! bindingKey: bindingKey);}//3.创建 Binding 对象Binding binding new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);//4.获取到对应的交换机和队列,如果对应的交换机和队列不存在,这样的绑定是无法创建的MSGQueue queue memoryDataCenter.getQueue(queueName);if(queue null){throw new MqException([VirtualHost] 该绑定对应的队列不存在! queueName: queueName);}Exchange exchange memoryDataCenter.getExchange(exchangeName);if(exchange null){throw new MqException([VirtualHost] 该绑定对应的交换机不存在! exchangeName: exchangeName);}if(queue.isDurable() exchange.isDurable()){diskDataCenter.insertBinding(binding);}memoryDataCenter.insertBinding(binding);System.out.println([VirtualHost] 创建绑定成功! queueName: queueName , exchangeName: exchangeName);}}return true;}catch (Exception e){System.out.println([VirtualHost] 绑定创建失败! queueName: queueName , exchangeName: exchangeName);e.printStackTrace();return false;}} 10.6 删除绑定 public boolean queueUnbind(String queueName, String exchangeName){queueName virtualHostName queueName;exchangeName virtualHostName exchangeName;try {synchronized (exchangeLocker){synchronized (queueLocker){//1.判断绑定是否存在Binding toDelete memoryDataCenter.getBinding(exchangeName,queueName);if(toDelete null){throw new MqException([VirtualHost] 绑定不存在,无法删除! queueName: queueName , exchangeName: exchangeName);}//2.无论绑定是否持久化,都尝试在硬盘上删一下,就算不存在,这个删除操作也没有副作用diskDataCenter.deleteBinding(toDelete);memoryDataCenter.deleteBinding(toDelete);System.out.println([VirtualHost] 删除绑定成功! queueName: queueName , exchangeName: exchangeName);}}return true;}catch (Exception e){System.out.println([VirtualHost] 删除绑定失败!);return false;}} 10.7 发送消息 流程图 实现步骤 检查 routingKey 是否合法判断交换机是否存在判断交换机的类型根据不同的类型决定如何进行后续转发 public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body){try {//1.转换交换机的名字exchangeName virtualHostName exchangeName;//2.检查 routingKey 是否合法if(!router.checkRoutingKey(routingKey)){throw new MqException([virtualHost] routingKey 非法! routingKey: routingKey);}//3.查找交换机对象Exchange exchange memoryDataCenter.getExchange(exchangeName);if(exchange null){throw new MqException([virtualHost] 交换机不存在! exchangeName: exchangeName);}//4.判定交换机的类型if(exchange.getType() ExchangeType.DIRECT){//按照直接交换机的方式转发消息//以 routingKey 作为队列的名字,直接把消息写入指定的队列中//此时,可以无视绑定关系String queueName virtualHostName routingKey;//5.构造消息对象Message message Message.createMessageWithId(routingKey,basicProperties,body);//6.查找队列名对应的对象MSGQueue queue memoryDataCenter.getQueue(queueName);if(queue null){throw new MqException([virtualHost] 队列不存在! queueName: queueName);}//7.队列存在,给队列中写入消息sendMessage(queue,message);}else{//按照 fanout 和 topic 的方式来转发//5.找到该交换机关联的所有绑定ConcurrentHashMapString,Binding bindingsMap memoryDataCenter.getBindings(exchangeName);for(Map.EntryString,Binding entry : bindingsMap.entrySet()){// 1) 获取到绑定队列,判定对应的队列是否存在Binding binding entry.getValue();MSGQueue queue memoryDataCenter.getQueue(binding.getQueueName());if(queue null){System.out.println([virtualHost] basicPublish 发送消息时,发现队列不存在! queueName: binding.getQueueName());continue;}// 2) 构造消息对象Message message Message.createMessageWithId(routingKey,basicProperties,body);// 3) 判定这个消息是否能转发给该队列// 如果是 fanout, 所有绑定的队列都要进行转发// 如果是 topic, 需要判定 bindingKey 和 routingKey 是否匹配if(!router.route(exchange.getType(),binding,message)){continue;}// 4) 真正转发消息给队列sendMessage(queue,message);}}return true;}catch (Exception e){System.out.println([virtualHost] 消息发送失败!);e.printStackTrace();return false;}} 上述过程中涉及到的被调用的 API private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {//此处发送消息,就是调用之前封装好的 api, 写到内容和硬盘上int deliverMode message.getDeliverMode();//deliverMode 为 1 不持久化//deliverMode 为 2 要持久化if(deliverMode 2){diskDataCenter.sendMessage(queue,message);}//写入内存memoryDataCenter.sendMessage(queue,message);//通知消费者可以消费消息了consumerManager.notifyConsume(queue.getName());} 10.8 订阅消息 流程图 /*** 订阅消息* 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者* param consumerTag 消费者的身份标识* param queueName* param autoAck 消息被消费完成后,应答的方式 true:自动应答; false:手动应答* param consumer 是一个回调函数,此处类型设定为函数式接口,后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 表达式* return*/public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer){//构造一个 ConsumerEnv 对象,把这个对应的队列找到,再把这个 ConsumerEnv 对象添加到该队列中queueName virtualHostName queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println([virtualHost] basicConsume 成功! queueName: queueName);return true;}catch (Exception e){System.out.println([virtualHost] basicConsume 失败! queueName: queueName);e.printStackTrace();return false;}} 上述过程涉及到了 ConsumerManager 类 public class ConsumerManager {//持有上层 VirtualHost 对象的引用,用来操作数据private VirtualHost parent;//指定一个线程池,执行具体的回调任务private ExecutorService workerPool Executors.newFixedThreadPool(4);//存放令牌的队列private BlockingDequeString tokenQueue new LinkedBlockingDeque();//扫描线程private Thread scannerThread null;public ConsumerManager(VirtualHost p){this.parent p;scannerThread new Thread(() -{while (true){try {//1.拿到令牌String queueName tokenQueue.take();//2.根据令牌找到队列MSGQueue queue parent.getMemoryDataCenter().getQueue(queueName);if(queue null){throw new MqException([ConsumerManager] 取令牌时队列名不存在! queueName: queueName);}//3.从队列中消费一个消息synchronized (queue){consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});//线程设为后台线程scannerThread.setDaemon(true);scannerThread.start();}/*** 调用时机:发送消息的时候* param queueName* throws InterruptedException*/public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {//找到对应的队列MSGQueue queue parent.getMemoryDataCenter().getQueue(queueName);if(queue null){throw new MqException([ConsumerManager] 队列不存在! queueName: queueName);}ConsumerEnv consumerEnv new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumerEnv(consumerEnv);int n parent.getMemoryDataCenter().getMessageCount(queueName);for(int i 0; i n; i){//调用一次就消费一条消息consumeMessage(queue);}}}/*** 消费一个消息* param queue*/private void consumeMessage(MSGQueue queue) {// 1.按照轮询的方式找个消费者出来ConsumerEnv lucyDog queue.chooseConsumer();if(lucyDog null){return;//当前队列没有消费者,暂时不消费}//2.从队列中取出一个消息Message message parent.getMemoryDataCenter().pollMessage(queue.getName());if(message null){return;//当前队列中没有消息,也不需要消费}//3.把消息带入到消费者的回调方法中,让线程池去执行workerPool.submit(() -{try {// 1) 先把消息放到待确认的集合中parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);// 2) 执行回调lucyDog.getConsumer().handleDelivery(lucyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());// 3) 如果当前是 自动应答,直接把消息删除// 如果当前是手动应答,交给后续消费者调用 basicAck 来处理if(lucyDog.isAutoAck()){//删除硬盘上的消息if(message.getDeliverMode() 2){//当前这个消息是持久化存储,需要删除硬盘上的消息parent.getDiskDataCenter().deleteMessage(queue,message);}//删除等待应答的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());//删除内存中的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println([ConsumerManager] 消息被成功消费! queueName: queue.getName());}}catch (Exception e){e.printStackTrace();}});} } ConsumeMessage方法流程图 10.9 消息确认 该方法只是手动应答的时候才会使用应答成功, 则把消息删除掉。 public boolean basicAck(String queueName, String messageId){queueName virtualHostName queueName;try {Message message memoryDataCenter.getMessage(messageId);if(message null){//要确认的消息不存在throw new MqException([VirtualHost] 要确认的消息不存在! messageId: messageId);}MSGQueue queue memoryDataCenter.getQueue(queueName);if(queue null){throw new MqException([VirtualHost] 要确认的队列不存在! queueName: queueName);}//1.删除硬盘上的数据if(message.getDeliverMode() 2){diskDataCenter.deleteMessage(queue,message);}//2.删除消息中心中的数据memoryDataCenter.removeMessage(messageId);//3.删除待确认集合中的消息memoryDataCenter.removeMessageWaitAck(queueName,messageId);System.out.println([VirtualHost] basicAck 成功! 消息被成功确认! queueName: queueName , messageId: messageId);return true;}catch (Exception e){System.out.println([VirtualHost] basicAck 失败! 消息被确认失败! queueName: queueName , messageId: messageId);e.printStackTrace();return false;}} 十一、网络通信协议设计 生产者和消费者都是客户端程序并且需要通过网络远程调用 BrokerServer 提供的 API 这里我使用 TCP 作为底层协议在这个基础上自定义应用层协议简单来说就是约定一下生产者以及消费者和 BrokerServer 之间交互的规范或者是传输数据的格式。 客户端要调用的功能有以下几个部分 创建 channel 关闭 channel 创建 exchange 删除 exchange 创建 queue 删除 queue 创建 binding 删除 binding 发送 message 订阅 message 发送 ack 返回 message (服务器 - 客户端) 11.1 设计应用层协议  因为 Message 本身就是二进制数据因此这里同样使用二进制的方式设定协议。 其中 type 表示请求响应不同的功能取值如下 0x1 创建 channel 0x2关闭 channel0x3 创建 exchange 0x4 销毁 exchange 0x5 创建 queue 0x6 销毁 queue 0x7 创建 binding 0x8 销毁 binding 0x9 发送 message 0xa 订阅 message 0xb 返回 ack 0xc 服务器给客户端推送的消息 payload 部分, 会根据不同的 type, 存在不同的格式 对于请求来说, payload 表示这次方法调用的各种参数信息对于响应来说, payload 表示这次方法调用的返回值 11.2 定义Request/Response /*** description:表示一个请求对象,按照自定义协议的格式展开* created by 清风 on 2024/7/30 21:21*/ Data public class Request {private int type;private int length;private byte[] payload; } /*** description:这是一个响应,也是根据自定义应用层协议来的* created by 清风 on 2024/7/30 21:22*/ Data public class Response {private int type;private int length;private byte[] payload; } 11.3 定义参数父类 /*** description:使用这个类来表示方法的公共参数/辅助的字段* 后续每个方法就会有不同的参数,不同的参数使用不同的子类表示* created by 清风 on 2024/7/30 21:24*/ Data public class BasicArguments implements Serializable{// 表示一次请求和一次响应的身份标识,可以把请求和响应对上protected String rid;//本次通信使用的 channel 的身份标识protected String channelId; } 11.4 定义返回值父类 /*** description:表示各个远程的调用的方法的返回值的公共信息* created by 清风 on 2024/7/30 21:28*/ Data public class BasicReturns implements Serializable {//用来标识唯一的请求和响应protected String rid;//用来标识一个 channelprotected String channelId;//表示远程调用方法的返回值protected boolean ok; } 11.5 定义其他参数类 针对每个 VirtualHost 提供的方法, 都需要有⼀个类表⽰对应的参数。这里写一个创建交换机的请求参数类其他的相关请求参数都大同小异想看的可以去源码链接看。 /*** description:这个类是创建交换机的请求参数的类* created by 清风 on 2024/7/30 21:33*/ Data public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private boolean durable;private boolean autoDelete;private MapString,Object arguments; } ⼀个创建交换机的请求, 形如: 可以把 ExchangeDeclareArguments 转成 byte[], 就得到了下列图片的结构按照 length 长度读取出 payload, 就可以把读到的⼆进制数据转换成 ExchangeDeclareArguments 对象 十二、实现 BrokerServer 类 public class BrokerServer {private ServerSocket serverSocket null;//一个 BrokerServer 上只有一个虚拟主机private VirtualHost virtualHost new VirtualHost(default);//存储当前的所有会话,也就是有哪些客户端正在和服务器进行通信//此处的 key 是 channelId; value 为对应的 socket 对象private ConcurrentHashMapString, Socket sessions new ConcurrentHashMap();//引入线程池来处理多个客户端的请求private ExecutorService executorService null;//引入一个 boolean 变量控制服务器是否继续运行private volatile boolean runnable true; } 12.1 启动/停止服务器 public BrokerServer(int port) throws IOException {serverSocket new ServerSocket(port);}public void start() throws IOException {System.out.println([BrokerServer] 启动!);executorService Executors.newCachedThreadPool();try {while (runnable){Socket clientSocket serverSocket.accept();//把处理连接的逻辑丢给线程池executorService.submit(() -{processConnection(clientSocket);});}}catch (SocketException e){System.out.println([BrokerServer] 服务器停止运行!);}}/*** 停止服务器*/public void stop() throws IOException {runnable false;executorService.shutdownNow();//把线程池中的任务都放弃,让线程都销毁serverSocket.close();} 12.2 实现处理连接 /*** 通过这个方法来处理一个客户端的连接* 在这个连接中,可能会涉及到多个请求和响应* param clientSocket*/private void processConnection(Socket clientSocket) {try (InputStream inputStream clientSocket.getInputStream();OutputStream outputStream clientSocket.getOutputStream()){//这里需要按照特定格式读取并解析,此时需要用到 DataInputStream 和 DataOutputStreamtry (DataInputStream dataInputStream new DataInputStream(inputStream);DataOutputStream dataOutputStream new DataOutputStream(outputStream)){while (true){//1.读取请求并解析Request request readRequest(dataInputStream);//2.根据请求计算响应Response response process(request,clientSocket);//3.把响应写回给客户端writeResponse(dataOutputStream,response);}}catch (EOFException | SocketException e){//对于当前代码, DataInputStream 如果读到 EOF, 就会抛出一个 EOFException 异常//需要借助这个异常来结束循环System.out.println([BrokerServer] 连接关闭! 客户端地址: clientSocket.getInetAddress().toString() : clientSocket.getPort());}}catch (IOException | ClassNotFoundException | MqException e){System.out.println([BrokerServer] Connection 出现异常!);e.printStackTrace();}finally {try {//关闭 socketclientSocket.close();//一个 TCP 连接中,可能包含多个 channel, 需要把当前这个 socket 对应的所有 channel 清理掉clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}} 12.3 实现 readRequest / writeResponse private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload new byte[request.getLength()];int n dataInputStream.read(payload);if(n ! request.getLength()){throw new IOException(读取请求格式出错!);}request.setPayload(payload);return request;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());dataOutputStream.flush();} 12.4 实现处理请求 先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid 再根据不同的 type, 分别处理不同的逻辑(主要是调⽤ virtualHost 中不同的方法) 针对消息订阅操作, 则需要在存在消息的时候通过回调, 把响应结果写回给对应的客户端最后构造成统⼀的响应 private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {//1.把 request 中的 payload 做一个初步的解析BasicArguments basicArguments (BasicArguments) BinaryTool.fromBytes(request.getPayload());System.out.println([Request] rid: basicArguments.getRid() , channelId: basicArguments.getChannelId() , type: request.getType() , length: request.getLength());//2.根据 type 的值来进一步区分接下来这次请求要干什么boolean ok true;if(request.getType() 0x1){//创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);System.out.println([BrokerServer] 创建 channel 完成! channelId: basicArguments.getChannelId());}else if(request.getType() 0x2){//销毁 channelsessions.remove(basicArguments.getChannelId());System.out.println([BrokerServer] 销毁 channel 完成! channelId: basicArguments.getChannelId());}else if(request.getType() 0x3){//创建一个交换机//此时 payload 就是 ExchangeDeclareArguments 对象ExchangeDeclareArguments arguments (ExchangeDeclareArguments) basicArguments;ok virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(),arguments.isAutoDelete(),arguments.getArguments());}else if(request.getType() 0x4){//销毁交换机ExchangeDeleteArguments arguments (ExchangeDeleteArguments) basicArguments;ok virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() 0x5) {//创建队列QueueDeclareArguments arguments (QueueDeclareArguments) basicArguments;ok virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(),arguments.isAutoDelete(),arguments.getArguments());}else if(request.getType() 0x6){//销毁队列QueueDeleteArguments arguments (QueueDeleteArguments) basicArguments;ok virtualHost.queueDelete(arguments.getQueueName());}else if(request.getType() 0x7){//创建 BindingQueueBindArguments arguments (QueueBindArguments) basicArguments;ok virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey());}else if(request.getType() 0x8){//删除 BindingQueueUnbindArguments arguments (QueueUnbindArguments) basicArguments;ok virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());}else if(request.getType() 0x9){//发送消息BasicPublishArguments arguments (BasicPublishArguments) basicArguments;ok virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(),arguments.getBody());}else if(request.getType() 0xa){//订阅消息BasicConsumeArguments arguments (BasicConsumeArguments) basicArguments;ok virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {//这个回调函数要做的工作:把服务器收到的消息直接推送给对应的客户端//先知道当前这个收到的消息要发给哪个客户端//此处 consumerTag 其实是 channelId//1.根据 channel 找到 Socket 对象Socket clientSocket sessions.get(consumerTag);if(clientSocket null || clientSocket.isClosed()){throw new MqException([BrokerServer] 订阅消息的客户端已经关闭!);}//2.构造响应数据//此处 response 的 payload 就是 SubScribeReturnsSubScribeReturns subScribeReturns new SubScribeReturns();subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid();subScribeReturns.setOk(true);subScribeReturns.setBody(body);byte[] payload BinaryTool.toBytes(subScribeReturns);Response response new Response();response.setType(0xc);//0xc 表示服务器给消费者客户端推送的数据response.setLength(payload.length);response.setPayload(payload);//3.把数据写回给客户端DataOutputStream dataOutputStream new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream,response);}});}else if(request.getType() 0xb){//调用 basicAck 来确认消息BasicAckArguments arguments (BasicAckArguments) basicArguments;ok virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());}else {//当前的 type 是非法的throw new MqException([BrokerServer] 未知的 type: request.getType());}//3.构造响应BasicReturns basicReturns new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload BinaryTool.toBytes(basicReturns);Response response new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println([Response] rid: basicReturns.getRid() , channelId: basicReturns.getChannelId() , type: response.getType() , length: response.getLength());return response;} 12.5 实现清理过期的会话 /*** 清理过期的会话* param clientSocket*/private void clearClosedSession(Socket clientSocket) {//遍历 sessions 哈希表ListString toDeleteChannelId new ArrayList();for(Map.EntryString,Socket entry : sessions.entrySet()){if(entry.getValue() clientSocket){toDeleteChannelId.add(entry.getKey());}}for(String channelId : toDeleteChannelId){sessions.remove(channelId);}System.out.println([BrokerServer] 清理 session 完成! 被清理的 channelId: toDeleteChannelId);} 十三、实现客户端 13.1 创建 ConnectionFactory 用来创建连接的工厂类 Data public class ConnectionFactory {// broker server 的 ip 地址private String host;// broker server 的端口号private int port;// 访问 broker server 的哪个虚拟主机.// 下列几个属性暂时不搞了. // private String virtualHostName; // private String username; // private String password;public Connection newConnection() throws IOException {Connection connection new Connection(host, port);return connection;} } 13.2 Connection 和 Channel 定义 一个客户端可以创建多个 Connection一个 Connection 对应一个 Socket一个TCP 连接一个 Connection 可以包含多个 Channel Connection 定义 这个类中其他的方法在我的项目源码中自行观看主要包括处理响应、统一封装写请求和读取响应以及创建 Channel Data public class Connection {private Socket socket null;// 需要管理多个 channel. 使用一个 hash 表把若干个 channel 组织起来.private ConcurrentHashMapString, Channel channelMap new ConcurrentHashMap();private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;private ExecutorService callbackPool null; } Socket 是客户端持有的套接字InputStream OutputStream DataInputStream DataOutputStream 均为 socket 通信的接口channelMap 用来管理该连接中所有的 ChannelcallbackPool 是用来在客户端这边执行用户回调的线程池 Channel 定义这个类中主要包括的方法就是构造请求参数和服务器交互进行相关的操作也就是远程调用服务器提供的 API可在项目源码自行观看 Data public class Channel {private String channelId;// 当前这个 channel 属于哪个连接.private Connection connection;// 用来存储后续客户端收到的服务器的响应.private ConcurrentHashMapString, BasicReturns basicReturnsMap new ConcurrentHashMap();// 如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥. 当该队列的消息返回回来的时候, 调用回调.// 此处约定一个 Channel 中只能有一个回调.private Consumer consumer null;public Channel(String channelId, Connection connection) {this.channelId channelId;this.connection connection;} }十四、样例演示 生产者  /*** description:这个类用来表示一个生产者* 通常这是一个单独的服务器程序* created by 清风 on 2024/8/3 19:38*/ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println(启动生产者!);ConnectionFactory factory new ConnectionFactory();factory.setHost(127.0.0.1);factory.setPort(9090);Connection connection factory.newConnection();Channel channel connection.createChannel();//创建交换机和队列channel.exchangeDeclare(testExchange, ExchangeType.DIRECT,true,false,null);channel.queueDeclare(testQueue,true,false,false,null);//创建一个消息并发送byte[] body hello.getBytes();boolean ok channel.basicPublish(testExchange,testQueue,null,body);System.out.println(消息投递完成! ok : ok);Thread.sleep(500);channel.close();connection.close();} } 消费者 /*** description:这个类表示一个消费者* 通常这个类也应该是在一个独立的服务器中被执行* created by 清风 on 2024/8/3 19:39*/ public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {System.out.println(启动消费者!);ConnectionFactory factory new ConnectionFactory();factory.setHost(127.0.0.1);factory.setPort(9090);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(testExchange, ExchangeType.DIRECT,true,false,null);channel.queueDeclare(testQueue,true,false,false,null);channel.basicConsume(testQueue, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println([消费数据] 开始!);System.out.println(consumerTag: consumerTag);System.out.println(basicProperties: basicProperties);String bodyString new String(body,0, body.length);System.out.println(body: bodyString);System.out.println([消费数据] 结束!);}});//模拟一直等待消费while (true){Thread.sleep(500);}} } 启动项目之后再先后启动消费者和生产者  启动项目建库建表 启动消费者 启动生产者  查看消费者端控制台  至此一个简易版本的MQ实现。文章篇幅太长可能过于繁琐还请各位读者有不满意的地方多多指教
http://www.dnsts.com.cn/news/82731.html

相关文章:

  • 德州极速网站建设源代码查看WordPress文件
  • 宁波网站建设七米网上给别人做网站
  • 做外贸建网站多少钱工信部域名备案管理系统
  • 巴中+网站建设桂林网站设计制作
  • 突泉建设局三务公开网站实惠福步外贸论坛
  • 2019年做网站怎么看一个网站用什么程序做的
  • 企业网站是什么微商怎么做
  • 金华网站建设价格wordpress动态订单
  • 手机网站建设市场塘沽网站开发
  • 南充房产信息网站优化网站建设
  • 网站 分站深圳app定制开发外包公司
  • 网站推广途径及要点网站更改备案信息吗
  • 网站 跳出率 多少沧州营销软件
  • 网站开发需要配置哪些人员增城新塘网站建设
  • 淘宝网站是谁做的好处网站建设开题报告数据库建立
  • 江门网站建设推广策划设计好的装修公司
  • html网站免费模板胶南网站建设
  • 福建省建设厅网站林瑞良北京网页设计公司
  • wordpress读取父分类列表企业网站改版seo
  • 网站建设维护资质做水果蔬菜生意网站
  • c 网站开发社区网站制作
  • 招远网站建设招聘手机网站作用
  • 2003访问网站提示输入用户名密码wordpress 上传附件按钮美化
  • 西安建网站的公司wordpress 清除cookie
  • 建设银行贷款官方网站电子商务网站特色
  • 网站建设模板个人泰州seo
  • 闵行网站设计有限责任公司成立条件
  • 做网站注意的问题做网站找哪家好要钱吗
  • 家装行业网站建设网站建设网站营销网站托管一体化
  • 深圳傻瓜式网站建设公司好吗中国建设银行青岛分行网站