班级建设网站设计方案,百度网站排名优化工具,品牌查询官网,手机ppt免费制作软件消息储存服务加载
入口:org.apache.rocketmq.store.DefaultMessageStore#load 在创建brokerContriller时会调用初始化方法初始化brokerController,在初始化方法中会进行消息储存服务的加载 this.messageStore.load(); 加载方法主要是加载一些必要的参数和数据#xff0c;如配…消息储存服务加载
入口:org.apache.rocketmq.store.DefaultMessageStore#load 在创建brokerContriller时会调用初始化方法初始化brokerController,在初始化方法中会进行消息储存服务的加载 this.messageStore.load(); 加载方法主要是加载一些必要的参数和数据如配置文件、消费进度、commitlog文件、consumequeu文件等 加载commitlog等文件实际上就是创建MappedFile对象MappedFile会创建与物理文件之间的映射buffer与通道channel 除此之外, 还会恢复commitlog文件、consumequeue文件, 恢复是指, 检索文件中的每一条消息, 查看是否有效: commitlog中的消息是否符合规范 , consumeQueue中的数据是否符合规范每条是否为20字节。因为消息、数据都是从文件最后开始添加的故只需文件尾部不符合规范的数据删除掉即可。 在broker启动的时候会在broker储存目录下创建一个abort文件, 正常退出时则会删除abort文件,broker启动的时候会判断是否存在这个文件来判断是否是正常退出,还是意外宕机。 是否正常退出, 只关系到broker恢复commitlog文件的过程: 若为正常退出则从倒数第三个文件开始向后检索若不足三个则从第一个开始向后遍历。找到第一条错误数据删除其之后的数据并重新设置刷盘点 若为异常退出则从最后一个文件开始向前根据文件魔数等找到一个正常的文件从该文件开始向后遍历正常的消息重新发送到consumequeque和indexFile。若为错误数据则删除其之后的数据并重新设置刷盘点 同时两种恢复路径的最后都会以恢复的commitlog文件的最大有效消息的偏移量为准来删除consumequeue那边的无效数据 public boolean load() {boolean result true;try {// 通过判断 abort 文件是否存在判断是否上次为正常退出。不存在 - 正常boolean lastExitOK !this.isTempFileExist();log.info(last shutdown {}, lastExitOK ? normally : abnormally);// 加载延时队列延时消息, 包括延时等级、配置文件if (null ! scheduleMessageService) {result result this.scheduleMessageService.load();}// load Commit Log// 加载commitlog mappedFile(commitlog文件夹下的)result result this.commitLog.load();// load Consume Queue// 加载consumeQueue mappedFile, (store\consumequeue目录下的)result result this.loadConsumeQueue();if (result) {this.storeCheckpoint new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));this.indexService.load(lastExitOK);// 恢复文件this.recover(lastExitOK);log.info(load over, and the max phy offset {}, this.getMaxPhyOffset());}} catch (Exception e) {log.error(load exception, e);result false;}if (!result) {this.allocateMappedFileService.shutdown();}return result;
}加载commitlog文件
加载commitlog文件 org.apache.rocketmq.store.CommitLog#load
public boolean load() {// commitlog文件在代码是一个MappedFile通过MappedFileQueue进行管理// consumequeue、indexFile文件同样也是MappedFileboolean result this.mappedFileQueue.load();log.info(load commit log (result ? OK : Failed));return result;
}public boolean load() {// 遍历文件夹下面的文件File dir new File(this.storePath);File[] files dir.listFiles();if (files ! null) {// ascending order// 按文件名从大到小排序rocketmq种的文件都是以数字、文件的最小偏移量命名Arrays.sort(files);for (File file : files) {if (file.length() ! this.mappedFileSize) {log.warn(file \t file.length() length not matched message store config value, please check it manually);return false;}try {// 实例化mappedFileMappedFile mappedFile new MappedFile(file.getPath(), mappedFileSize);mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);this.mappedFiles.add(mappedFile);log.info(load file.getPath() OK);} catch (IOException e) {log.error(load file file error, e);return false;}}}return true;
}构造MappedFile
public MappedFile(final String fileName, final int fileSize) throws IOException {// 根据文件名和文件大小初始化mappedFileinit(fileName, fileSize);
}javaprivate void init(final String fileName, final int fileSize) throws IOException {this.fileName fileName;this.fileSize fileSize;this.file new File(fileName);this.fileFromOffset Long.parseLong(this.file.getName());boolean ok false;ensureDirOK(this.file.getParent());try {// 根据文件创建channelthis.fileChannel new RandomAccessFile(this.file, rw).getChannel();this.mappedByteBuffer this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);// 文件总内存计数增加TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);// 文件总数计数增加TOTAL_MAPPED_FILES.incrementAndGet();ok true;} catch (FileNotFoundException e) {log.error(Failed to create file this.fileName, e);throw e;} catch (IOException e) {log.error(Failed to map file this.fileName, e);throw e;} finally {if (!ok this.fileChannel ! null) {this.fileChannel.close();}}
}恢复文件
恢复文件 org.apache.rocketmq.store.DefaultMessageStore#recover
private void recover(final boolean lastExitOK) {/*** maxPhyOffsetOfConsumeQueue 最大的有效的commitlog消息物理偏移量就是指的最后一个有效条目中保存的commitlog文件中的物理偏移量** 文件组自身的最大有效数据偏移量processOffset就是指的最后一个有效条目在自身文件组中的偏移量* (比如:queueID文件夹下的连续的文件,或commitlog目录下的连续commitlog文件)。*/// 恢复ConsumeQueue文件,并返回consumeQueue最大有效commitlog文件偏移量long maxPhyOffsetOfConsumeQueue this.recoverConsumeQueue();if (lastExitOK) {// 正常退出,恢复文件this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);} else {// 异常退出,恢复文件this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);}// 恢复topicQueueTablethis.recoverTopicQueueTable();
}恢复consumequeue文件
org.apache.rocketmq.store.DefaultMessageStore#recoverConsumeQueue 恢复consumequeue文件会遍历每个topic然后遍历topic下的所有queue的文件夹然后以consumequeue文件夹为维度恢复 从倒数第三个文件开始恢复依次检查每条数据是否符合8字节偏移量、4字节消息大小、8字节tag的规则找到异常数据则删除其之后的数据。 private long recoverConsumeQueue() {long maxPhysicOffset -1;// 遍历topicfor (ConcurrentMapInteger, ConsumeQueue maps : this.consumeQueueTable.values()) {// 遍历topic下的queuefor (ConsumeQueue logic : maps.values()) {// 恢复这个queueId下的文件logic.recover();if (logic.getMaxPhysicOffset() maxPhysicOffset) {maxPhysicOffset logic.getMaxPhysicOffset();}}}// 返回所有queue中消息,在commitLog文件中的最大的偏移量// 也就是,commitLog同步到queue中的最后一条消息的偏移量return maxPhysicOffset;
}public void recover() {final ListMappedFile mappedFiles this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// 从倒数第三个开始,如果少于三个就从第一个开始int index mappedFiles.size() - 3;if (index 0)index 0;int mappedFileSizeLogics this.mappedFileSize;MappedFile mappedFile mappedFiles.get(index);ByteBuffer byteBuffer mappedFile.sliceByteBuffer();long processOffset mappedFile.getFileFromOffset();long mappedFileOffset 0;long maxExtAddr 1;while (true) {// 循环每一条消息索引,大小为 20 字节for (int i 0; i mappedFileSizeLogics; i CQ_STORE_UNIT_SIZE) {// 这条在commitLog文件中的的偏移量long offset byteBuffer.getLong();// 大小int size byteBuffer.getInt();long tagsCode byteBuffer.getLong();// 如果offset和size都大于0则表示当前条目有效if (offset 0 size 0) {mappedFileOffset i CQ_STORE_UNIT_SIZE;this.maxPhysicOffset offset size;if (isExtAddr(tagsCode)) {maxExtAddr tagsCode;}} else {log.info(recover current consume queue file over, mappedFile.getFileName() offset size tagsCode);break;}}// 如果当前ConsumeQueue文件中的有效数据偏移量和文件大小一样则表示该ConsumeQueue文件的所有条目都是有效的if (mappedFileOffset mappedFileSizeLogics) {// 下一个文件index;// 如果是最后一个了,就退出if (index mappedFiles.size()) {log.info(recover last consume queue file over, last mapped file mappedFile.getFileName());break;} else {// 获取下一个文件的数据mappedFile mappedFiles.get(index);byteBuffer mappedFile.sliceByteBuffer();processOffset mappedFile.getFileFromOffset();mappedFileOffset 0;log.info(recover next consume queue file, mappedFile.getFileName());}} else {log.info(recover current consume queue queue over mappedFile.getFileName() (processOffset mappedFileOffset));break;}}// 文件名偏移量 这个文件最大有效的数据的偏移量, 就是这个queue最后一条消息的整体的偏移量processOffset mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);// 删除这个偏移量之后的数据this.mappedFileQueue.truncateDirtyFiles(processOffset);if (isExtReadEnable()) {this.consumeQueueExt.recover();log.info(Truncate consume queue extend file by max {}, maxExtAddr);this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);}}
}正常恢复commitlog文件
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {boolean checkCRCOnRecover this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final ListMappedFile mappedFiles this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Began to recover from the last third fileint index mappedFiles.size() - 3;if (index 0)index 0;MappedFile mappedFile mappedFiles.get(index);ByteBuffer byteBuffer mappedFile.sliceByteBuffer();long processOffset mappedFile.getFileFromOffset();long mappedFileOffset 0;while (true) {// 生成DispatchRequest验证本条消息是否合法DispatchRequest dispatchRequest this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size dispatchRequest.getMsgSize();// Normal data// 正常,且size0if (dispatchRequest.isSuccess() size 0) {// 更新mappedFileOffset的值加上本条消息长度mappedFileOffset size;}// Come the end of the file, switch to the next file Since the// return 0 representatives met last hole,// this can not be included in truncate offset// 正常,size0,说明到了文件末尾else if (dispatchRequest.isSuccess() size 0) {index;if (index mappedFiles.size()) {// Current branch can not happenlog.info(recover last 3 physics file over, last mapped file mappedFile.getFileName());break;} else {mappedFile mappedFiles.get(index);byteBuffer mappedFile.sliceByteBuffer();processOffset mappedFile.getFileFromOffset();mappedFileOffset 0;log.info(recover next physics file, mappedFile.getFileName());}}// Intermediate file read error// 消息错误else if (!dispatchRequest.isSuccess()) {log.info(recover physics file end, mappedFile.getFileName());break;}}// 即最后一条消息的尾部位置processOffset mappedFileOffset;// 设置刷盘点this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);/** 删除文件最大有效数据偏移量processOffset之后的所有数据*/this.mappedFileQueue.truncateDirtyFiles(processOffset);// Clear ConsumeQueue redundant data// 如果consumequeue那边得到的commitlog最大的有效消息物理偏移量 大于等于 commitlog这边自己得出的自身的最大有效消息物理偏移量// 则以commitlog文件的有效数据为准再次清除consumequeue文件中的脏数据if (maxPhyOffsetOfConsumeQueue processOffset) {log.warn(maxPhyOffsetOfConsumeQueue({}) processOffset({}), truncate dirty logic files, maxPhyOffsetOfConsumeQueue, processOffset);this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}} else {// Commitlog case files are deleted//如果不存在commitlog文件//那么重置刷盘最新位置提交的最新位置并且清除所有的consumequeue索引文件log.warn(The commitlog files are deleted, and delete the consume queue files);this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();}
}异常恢复commitlog文件
Deprecated
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {// recover by the minimum time stampboolean checkCRCOnRecover this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final ListMappedFile mappedFiles this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Looking beginning to recover from which fileint index mappedFiles.size() - 1;MappedFile mappedFile null;// 从最后一个文件开始从后往前检查、恢复for (; index 0; index--) {mappedFile mappedFiles.get(index);// isMappedFileMatchedRecover 检查index对应的mappedFile文件是否正常if (this.isMappedFileMatchedRecover(mappedFile)) {log.info(recover from this mapped file mappedFile.getFileName());break;}}if (index 0) {index 0;mappedFile mappedFiles.get(index);}ByteBuffer byteBuffer mappedFile.sliceByteBuffer();long processOffset mappedFile.getFileFromOffset();long mappedFileOffset 0;/*** 从最后一个正常的文件开始遍历*/while (true) {DispatchRequest dispatchRequest this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {// Normal dataif (size 0) {mappedFileOffset size;// 如果消息允许重复复制默认为 falseif (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {// 如果 消息物理偏移量 小于 CommitLog的提交指针// 则调用CommitLogDispatcher重新构建当前消息的indexfile索引和consumequeue索引if (dispatchRequest.getCommitLogOffset() this.defaultMessageStore.getConfirmOffset()) {this.defaultMessageStore.doDispatch(dispatchRequest);}} else {this.defaultMessageStore.doDispatch(dispatchRequest);}}// Come the end of the file, switch to the next file// Since the return 0 representatives met last hole, this can// not be included in truncate offset// 如果消息正常但是size为0表示到了文件的末尾则尝试跳到下一个commitlog文件进行检测else if (size 0) {index;if (index mappedFiles.size()) {// The current branch under normal circumstances should// not happenlog.info(recover physics file over, last mapped file mappedFile.getFileName());break;} else {mappedFile mappedFiles.get(index);byteBuffer mappedFile.sliceByteBuffer();processOffset mappedFile.getFileFromOffset();mappedFileOffset 0;log.info(recover next physics file, mappedFile.getFileName());}}} else {log.info(recover physics file end, mappedFile.getFileName() pos byteBuffer.position());break;}}processOffset mappedFileOffset;this.mappedFileQueue.setFlushedWhere(processOffset);this.mappedFileQueue.setCommittedWhere(processOffset);/** 删除文件最大有效数据偏移量processOffset之后的所有数据*/this.mappedFileQueue.truncateDirtyFiles(processOffset);// Clear ConsumeQueue redundant data// 如果consumequeue那边得到的commitlog最大的有效消息物理偏移量 大于等于 commitlog这边自己得出的自身的最大有效消息物理偏移量// 则以commitlog文件的有效数据为准再次清除consumequeue文件中的脏数据if (maxPhyOffsetOfConsumeQueue processOffset) {log.warn(maxPhyOffsetOfConsumeQueue({}) processOffset({}), truncate dirty logic files, maxPhyOffsetOfConsumeQueue, processOffset);this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}}// Commitlog case files are deletedelse {log.warn(The commitlog files are deleted, and delete the consume queue files);this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);this.defaultMessageStore.destroyLogics();}
}