电影网站如何优化,专属头像制作免费,建筑网图集,网站内容更新外包RocketMQ源码阅读-Message消息存储 1. CommitLog的作用2. CommitLog 存储消息3. 时序图4. 小结 在Broker消息接收一篇中#xff0c;分析到Broker接收到消息#xff0c;最终会调用CommitLong#putMessage方法存储消息。
本篇来分析CommitLong#putMessage存储消息的流程。 1. C… RocketMQ源码阅读-Message消息存储 1. CommitLog的作用2. CommitLog 存储消息3. 时序图4. 小结 在Broker消息接收一篇中分析到Broker接收到消息最终会调用CommitLong#putMessage方法存储消息。
本篇来分析CommitLong#putMessage存储消息的流程。 1. CommitLog的作用 Store all metadata downtime for recovery, data protection reliability 翻译为存储元数据提供在停机时恢复和数据保护的能力
CommitLog是针对 MappedFileQueue 的封装使用。
其中有一个属性
protected final MappedFileQueue mappedFileQueue;MappedFileQueue是用来存储消息的文件队列对上层提供可无限使用的文件容量有一个属性
private final CopyOnWriteArrayListMappedFile mappedFiles new CopyOnWriteArrayListMappedFile();是实际存储消息的链表MappedFile是消息文件实体每个 MappedFile 统一文件大小。
他们三个的关系是他们的比例为CommitLog : MappedFileQueue : MappedFile 1 : 1 : N
CommitLog 存储在 MappedFile 有两种内容类型
MESSAGE 消息。BLANK 文件不足以存储消息时的空白占位。 2. CommitLog 存储消息
CommitLog存储消息的入口方法为CommitLog#putMessage(…)
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result null;StoreStatsService storeStatsService this.defaultMessageStore.getStoreStatsService();String topic msg.getTopic();int queueId msg.getQueueId();// 事务相关final int tranType MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() 0) {if (msg.getDelayTimeLevel() this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic ScheduleMessageService.SCHEDULE_TOPIC;queueId ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}long eclipseTimeInLock 0;// 获取写入映射文件MappedFile unlockMappedFile null;MappedFile mappedFile this.mappedFileQueue.getLastMappedFile();// 获取写入锁putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {long beginLockTimestamp this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 当不存在映射文件时进行创建if (null mappedFile || mappedFile.isFull()) {mappedFile this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null mappedFile) {log.error(create mapped file1 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);}// 存储消息result mappedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// 当文件尾时获取新的映射文件并进行插入unlockMappedFile mappedFile;// Create a new file, re-write the messagemappedFile this.mappedFileQueue.getLastMappedFile(0);if (null mappedFile) {// XXX: warn and notify melog.error(create mapped file2 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result mappedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipseTimeInLock this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock 0;} finally {// 释放写入锁putMessageLock.unlock();}if (eclipseTimeInLock 500) {log.warn([NOTIFYME]putMessage in lock cost time(ms){}, bodyLength{} AppendMessageResult{}, eclipseTimeInLock, msg.getBody().length, result);}if (null ! unlockMappedFile this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());// 刷新磁盘handleDiskFlush(result, putMessageResult, msg);// 数据同步主节点同步到从节点可以看到handleHA(result, putMessageResult, msg);return putMessageResult;
}可以看到此方法获取到映射文件然后将消息写入文件并刷入磁盘其中会使用到写入锁进行并发控制。最后进行数据同步将主节点的数据同步到从节点。刷新磁盘的方法为CommitLog#handleDiskFlush
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 进行同步||异步 flush||commit// Synchronization flushif (FlushDiskType.SYNC_FLUSH this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request new GroupCommitRequest(result.getWroteOffset() result.getWroteBytes());service.putRequest(request);boolean flushOK request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error(do groupcommit, wait for flush failed, topic: messageExt.getTopic() tags: messageExt.getTags() client address: messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {service.wakeup();}}// Asynchronous flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();} else {commitLogService.wakeup();}}
}主从数据同步的方法为CommitLog#handleHA
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 如果是Master节点同步到从节点if (BrokerRole.SYNC_MASTER this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {HAService service this.defaultMessageStore.getHaService();if (messageExt.isWaitStoreMsgOK()) {// Determine whether to waitif (service.isSlaveOK(result.getWroteOffset() result.getWroteBytes())) {GroupCommitRequest request new GroupCommitRequest(result.getWroteOffset() result.getWroteBytes());service.putRequest(request);service.getWaitNotifyObject().wakeupAll();boolean flushOK request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error(do sync transfer other node, wait return, but failed, topic: messageExt.getTopic() tags: messageExt.getTags() client address: messageExt.getBornHostNameString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}}// Slave problemelse {// Tell the producer, slave not availableputMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);}}}}对于获取文件和文件刷新落盘先不做深究后面再深入分析。
3. 时序图 4. 小结
本篇主要分析了CommitLog的作用以及其进行消息存储的流程CommitLog主要是对MappedFileQueue的封装使用MappedFileQueue是对消息文件的封装。存储流程主要是
获取映射文件获取写锁写文件释放写锁刷新文件到磁盘分为同步刷新和异步刷新如果是Master节点需要同步数据到从节点
Master同步数据到从节点的过程放在下一篇《RocketMQ源码阅读-Master数据同步从节点》进行分析。