当前位置: 首页 > news >正文

江苏省建设招标网站最新创建的网站

江苏省建设招标网站,最新创建的网站,网站基本常识,免费关键词挖掘工具RocketMQ源码阅读-Message消息存储 1. CommitLog的作用2. CommitLog 存储消息3. 时序图4. 小结 在Broker消息接收一篇中#xff0c;分析到Broker接收到消息#xff0c;最终会调用CommitLong#putMessage方法存储消息。 本篇来分析CommitLong#putMessage存储消息的流程。 1. C… RocketMQ源码阅读-Message消息存储 1. CommitLog的作用2. CommitLog 存储消息3. 时序图4. 小结 在Broker消息接收一篇中分析到Broker接收到消息最终会调用CommitLong#putMessage方法存储消息。 本篇来分析CommitLong#putMessage存储消息的流程。 1. CommitLog的作用 Store all metadata downtime for recovery, data protection reliability 翻译为存储元数据提供在停机时恢复和数据保护的能力 CommitLog是针对 MappedFileQueue 的封装使用。 其中有一个属性 protected final MappedFileQueue mappedFileQueue;MappedFileQueue是用来存储消息的文件队列对上层提供可无限使用的文件容量有一个属性 private final CopyOnWriteArrayListMappedFile mappedFiles new CopyOnWriteArrayListMappedFile();是实际存储消息的链表MappedFile是消息文件实体每个 MappedFile 统一文件大小。 他们三个的关系是他们的比例为CommitLog : MappedFileQueue : MappedFile 1 : 1 : N CommitLog 存储在 MappedFile 有两种内容类型 MESSAGE 消息。BLANK 文件不足以存储消息时的空白占位。 2. CommitLog 存储消息 CommitLog存储消息的入口方法为CommitLog#putMessage(…) public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result null;StoreStatsService storeStatsService this.defaultMessageStore.getStoreStatsService();String topic msg.getTopic();int queueId msg.getQueueId();// 事务相关final int tranType MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Deliveryif (msg.getDelayTimeLevel() 0) {if (msg.getDelayTimeLevel() this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}topic ScheduleMessageService.SCHEDULE_TOPIC;queueId ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}long eclipseTimeInLock 0;// 获取写入映射文件MappedFile unlockMappedFile null;MappedFile mappedFile this.mappedFileQueue.getLastMappedFile();// 获取写入锁putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {long beginLockTimestamp this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 当不存在映射文件时进行创建if (null mappedFile || mappedFile.isFull()) {mappedFile this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null mappedFile) {log.error(create mapped file1 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);}// 存储消息result mappedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// 当文件尾时获取新的映射文件并进行插入unlockMappedFile mappedFile;// Create a new file, re-write the messagemappedFile this.mappedFileQueue.getLastMappedFile(0);if (null mappedFile) {// XXX: warn and notify melog.error(create mapped file2 error, topic: msg.getTopic() clientAddr: msg.getBornHostString());beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);}result mappedFile.appendMessage(msg, this.appendMessageCallback);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);case UNKNOWN_ERROR:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);default:beginTimeInLock 0;return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);}eclipseTimeInLock this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock 0;} finally {// 释放写入锁putMessageLock.unlock();}if (eclipseTimeInLock 500) {log.warn([NOTIFYME]putMessage in lock cost time(ms){}, bodyLength{} AppendMessageResult{}, eclipseTimeInLock, msg.getBody().length, result);}if (null ! unlockMappedFile this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());// 刷新磁盘handleDiskFlush(result, putMessageResult, msg);// 数据同步主节点同步到从节点可以看到handleHA(result, putMessageResult, msg);return putMessageResult; }可以看到此方法获取到映射文件然后将消息写入文件并刷入磁盘其中会使用到写入锁进行并发控制。最后进行数据同步将主节点的数据同步到从节点。刷新磁盘的方法为CommitLog#handleDiskFlush public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 进行同步||异步 flush||commit// Synchronization flushif (FlushDiskType.SYNC_FLUSH this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {final GroupCommitService service (GroupCommitService) this.flushCommitLogService;if (messageExt.isWaitStoreMsgOK()) {GroupCommitRequest request new GroupCommitRequest(result.getWroteOffset() result.getWroteBytes());service.putRequest(request);boolean flushOK request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error(do groupcommit, wait for flush failed, topic: messageExt.getTopic() tags: messageExt.getTags() client address: messageExt.getBornHostString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);}} else {service.wakeup();}}// Asynchronous flushelse {if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {flushCommitLogService.wakeup();} else {commitLogService.wakeup();}} }主从数据同步的方法为CommitLog#handleHA public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {// 如果是Master节点同步到从节点if (BrokerRole.SYNC_MASTER this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {HAService service this.defaultMessageStore.getHaService();if (messageExt.isWaitStoreMsgOK()) {// Determine whether to waitif (service.isSlaveOK(result.getWroteOffset() result.getWroteBytes())) {GroupCommitRequest request new GroupCommitRequest(result.getWroteOffset() result.getWroteBytes());service.putRequest(request);service.getWaitNotifyObject().wakeupAll();boolean flushOK request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());if (!flushOK) {log.error(do sync transfer other node, wait return, but failed, topic: messageExt.getTopic() tags: messageExt.getTags() client address: messageExt.getBornHostNameString());putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}}// Slave problemelse {// Tell the producer, slave not availableputMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);}}}}对于获取文件和文件刷新落盘先不做深究后面再深入分析。 3. 时序图 4. 小结 本篇主要分析了CommitLog的作用以及其进行消息存储的流程CommitLog主要是对MappedFileQueue的封装使用MappedFileQueue是对消息文件的封装。存储流程主要是 获取映射文件获取写锁写文件释放写锁刷新文件到磁盘分为同步刷新和异步刷新如果是Master节点需要同步数据到从节点 Master同步数据到从节点的过程放在下一篇《RocketMQ源码阅读-Master数据同步从节点》进行分析。
http://www.dnsts.com.cn/news/97698.html

相关文章:

  • 做网页的软件做网站设计网络网站有哪些功能
  • 互动平台网站建设郑州做网站
  • django网站开发逻辑设计素材网站的素材可以商用吗
  • 做pc端网站如何滨州的网站开发
  • 网站系统免费封面上的网站怎么做的
  • 常用网站推广方法优化营商环境 提升服务效能
  • 浙江做网站大同市建设工程质量监督站网站
  • 现在网站一般都是什么语言做的湖南省郴州市宜章县
  • 哪个网站做电商门槛最低网站怎样做推广
  • 给公司做网站 图片倾权设计师网站 pins
  • 淄博网站建设报价北京企业响应式网站建设
  • 游戏网站开发协议建设网站0基础需要学什么
  • 做侵权网站用哪里的服务器稳郑州小程序网站开发
  • 中间商可以做网站吗赣州做网站的
  • 郑州网站开发培训班华为网站的建设建议书
  • 建设网站包维护学生账号登录平台登录入口
  • 怎样在公司的网站服务器上更新网站内容东莞网站如何制作
  • wordpress 全站不刷新中卫网站设计公司
  • 国产手机做系统下载网站wordpress 仪表盘命名
  • 怎么改版网站音乐网站如何建立
  • 做公司永久免费网站什么好河南开元建设有限公司网站
  • 响应式环保网站北京建网站的公司哪个比较好
  • 网站开发服务费算无形资产吗蜘蛛云建网站怎样
  • 服装网站建设方案重庆企业年报网上申报入口
  • 做网站的技术要求高吗wordpress站群的作用
  • 营销型企业网站优化的作用黄山春节旅游攻略
  • 常州网站推云服务器做网站要备案吗
  • 南宁最高端网站建设科技公司名称大全简单大气
  • 做高端企业网站建设公司公司如何做网站不发钱
  • 深圳网站建设选哪家多视频网站建设