当前位置: 首页 > news >正文

南昌网站建设优化公司排名宣传片拍摄费用

南昌网站建设优化公司排名,宣传片拍摄费用,网站建设需要保存什么,做带支付平台的网站RocketMQ的Broker分为Master和Slave两个角色#xff0c;为了保证高可用性#xff0c;Master角色的机器接收到消息后#xff0c;要把内容同步到Slave机器上#xff0c;这样一旦Master宕机#xff0c;Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实…        RocketMQ的Broker分为Master和Slave两个角色为了保证高可用性Master角色的机器接收到消息后要把内容同步到Slave机器上这样一旦Master宕机Slave机器依然可以提供服务。下面分析Master和Slave角色机器间同步功能实现的源码。 1 同步属性信息 Slave需要和Master同步的不只是消息本身一些元数据信息也需要同步比如TopicConfig信息、ConsumerOffset信息、DelayOffset和SubscriptionGroupConfig信息。Broker在启动的时候判断自己的角色是否是Slave是的话就启动定时同步任务如代码清单12-1所示。 代码清单12-1 Slave角色定时同步元数据信息 if (BrokerRole.SLAVE this.messageStoreConfig.getBrokerRole()) {     if (this.messageStoreConfig.getHaMasterAddress() ! null this.messageStoreConfig.getHaMasterAddress().length() 6) {         this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());         this.updateMasterHAServerAddrPeriodically false;     } else {         this.updateMasterHAServerAddrPeriodically true;     }     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {         Override         public void run() {             try {                 BrokerController.this.slaveSynchronize.syncAll();             } catch (Throwable e) {                 log.error(ScheduledTask syncAll slave exception, e);             }         }     }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); }   在syncAll函数里调用syncTopicConfig、getAllConsumerOffset、syncDelayOffset和syncSubscriptionGroupConfig进行元数据同步。我们以syncConsumerOffset为例来看看底层的具体实现如代码清单12-2所示。 代码清单12-2 getAllConsumerOffset具体实现 public ConsumerOffsetSerializeWrapper getAllConsumerOffset(     final String addr) throws InterruptedException, RemotingTimeoutException,     RemotingSendRequestException, RemotingConnectException, MQBroker-Exception {     RemotingCommand request RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);     RemotingCommand response this.remotingClient.invokeSync(addr, request, 3000);     assert response ! null;     switch (response.getCode()) {         case ResponseCode.SUCCESS: {             return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);         }         default:             break;     }     throw new MQBrokerException(response.getCode(), response.getRemark()); }   getAllConsumerOffset的基本逻辑是组装一个RemotingCommand底层通过Netty将消息发送到Master角色的Broker然后获取Offset信息。 2 同步消息体 下面介绍Master和Slave之间同步消息体内容的方法也就是同步CommitLog内容的方法。CommitLog和元数据信息不同首先CommitLog的数据量比元数据要大其次对实时性和可靠性要求也不一样。元数据信息是定时同步的在两次同步的时间差里如果出现异常可能会造成Master上的元数据内容和Slave上的元数据内容不一致不过这种情况还可以补救手动调整Offset重启Consumer等。CommitLog在高可靠性场景下如果没有及时同步一旦Master机器出故障消息就彻底丢失了。所以有专门的代码来实现Master和Slave之间消息体内容的同步。 主要的实现代码在Broker模块的org.apache.rocketmq.store.ha包中里面包括HAService、HAConnection和WaitNotifyObject这三个类。 HAService是实现commitLog同步的主体它在Master机器和Slave机器上执行的逻辑不同默认是在Master机器上执行见代码清单12-3。 代码清单12-3 根据Broker角色确定是否设置HaMasterAddress if (BrokerRole.SLAVE this.messageStoreConfig.getBrokerRole()) {     if (this.messageStoreConfig.getHaMasterAddress() ! null this.messageStoreConfig         .getHaMasterAddress().length() 6) {         this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());         this.updateMasterHAServerAddrPeriodically false;     } else {         this.updateMasterHAServerAddrPeriodically true;     }   当Broker角色是Slave的时候MasterAddr的值会被正确设置这样HAService在启动的时候在HAClient这个内部类中connectMaster会被正确执行如代码清单12-4所示。 代码清单12-4 Slave角色连接Master private boolean connectMaster() throws ClosedChannelException {     if (null socketChannel) {         String addr this.masterAddress.get();         if (addr ! null) {             SocketAddress socketAddress RemotingUtil.string2SocketAddress(addr);             if (socketAddress ! null) {                 this.socketChannel RemotingUtil.connect(socketAddress);                 if (this.socketChannel ! null) {                     this.socketChannel.register(this.selector, SelectionKey.OP_READ);                 }             }         }         this.currentReportedOffset HAService.this.defaultMessageStore.getMaxPhyOffset();         this.lastWriteTimestamp System.currentTimeMillis();     }     return this.socketChannel ! null; }   从代码中可以看出HAClient试图通过Java NIO函数去连接Master角色的Broker。Master角色有相应的监听代码如代码清单12-5所示。 代码清单12-5 监听Slave的HA连接 public void beginAccept() throws Exception {     this.serverSocketChannel ServerSocketChannel.open();     this.selector RemotingUtil.openSelector();     this.serverSocketChannel.socket().setReuseAddress(true);     this.serverSocketChannel.socket().bind(this.socketAddressListen);     this.serverSocketChannel.configureBlocking(false);     this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); } CommitLog的同步不是经过netty command的方式而是直接进行TCP连接这样效率更高。连接成功以后通过对比Master和Slave的Offset不断进行同步。 3 sync_master和async_master sync_master和async_master是写在Broker配置文件里的配置参数这个参数影响的是主从同步的方式。从字面意思理解sync_master是同步方式也就是Master角色Broker中的消息要立刻同步过去async_master是异步方式也就是Master角色Broker中的消息是通过异步处理的方式同步到Slave角色的机器上的。下面结合代码来分析sync_master下的消息同步如代码清单12-6所示。 代码清单12-6 sync_master下的消息同步 public void handleHA(AppendMessageResult result,     PutMessageResult putMessageResult, MessageExt messageExt) {     if (BrokerRole.SYNC_MASTER this.defaultMessageStore         .getMessageStoreConfig().getBrokerRole()) {         HAService service this.defaultMessageStore.getHaService();         if (messageExt.isWaitStoreMsgOK()) {             // Determine whether to wait             if (service.isSlaveOK(result.getWroteOffset() result                 .getWroteBytes())) {                 GroupCommitRequest request new GroupCommitRequest                     (result.getWroteOffset() result                     .getWroteBytes());                 service.putRequest(request);                 service.getWaitNotifyObject().wakeupAll();                 boolean flushOK                     request.waitForFlush(this.defaultMessageStore                         .getMessageStoreConfig().getSyncFlushTimeout());                 if (!flushOK) {                     log.error(do sync transfer other node, wait return,                         but failed, topic: messageExt                         .getTopic() tags:                         messageExt.getTags() client address:                         messageExt.getBornHostNameString());                     putMessageResult.setPutMessageStatus(PutMessageStatus                         .FLUSH_SLAVE_TIMEOUT);                 }             }             // Slave problem             else {                 // Tell the producer, slave not available                 putMessageResult.setPutMessageStatus(PutMessageStatus                     .SLAVE_NOT_AVAILABLE);             }         }     } }   在CommitLog类的putMessage函数末尾调用handleHA函数。代码中的关键词是wakeupAll和waitForFlush在同步方式下Master每次写消息的时候都会等待向Slave同步消息的过程同步完成后再返回如代码清单12-7所示。putMessage函数比较长仅列出关键的代码。 代码清单12-7 putMessage中调用handleHA public PutMessageResult putMessage(final MessageExtBrokerInner msg) {     // Set the storage time     msg.setStoreTimestamp(System.currentTimeMillis());     // Set the message body BODY CRC (consider the most appropriate setting     // on the client)     msg.setBodyCRC(UtilAll.crc32(msg.getBody()));     // Back to Results     AppendMessageResult result null;     StoreStatsService storeStatsService this.defaultMessageStore         .getStoreStatsService();     String topic msg.getTopic();     int queueId msg.getQueueId();   ……     handleDiskFlush(result, putMessageResult, msg);     handleHA(result, putMessageResult, msg);     return putMessageResult; }
http://www.dnsts.com.cn/news/254103.html

相关文章:

  • 网站域名打不开的原因网站推广应该怎么做
  • 公司flash网站模板河北互联网公司
  • 天津西青区离哪个火车站近网站策划书1000字
  • 浙江平台网站建设哪家有湖北百度seo排名
  • 番禺 网站建设百度网页安全警告怎么解除
  • 有哪些企业可以做招聘的网站有哪些网站建设的流程与思路
  • 网站制作网站建设做报名统计的网站
  • 电子商务网站建设与管理 教案南京百度seo
  • wordpress %3c 3.6.1汕头seo推广
  • 适合平面设计师的网站上海自助建站软件
  • 阿里企业的网站建设江门网站制作开发
  • 网站建设招标方案模板免费的网页游戏
  • 通过平台建网站wsp网站开发
  • 小学网站模板西安做网站哪家好
  • 做网站必须要买空间wordpress视频手机版
  • 学做标书的网站网站 建设在作用是什么意思
  • 淘宝做链接的网站大站网站建设
  • 做近代史纲要题的网站c asp.net 做网站
  • 做网站要学多久wordpress win主机
  • 张东敏 上海 科技 网站建设优化大师官方下载
  • 免费建网站软件系统公司建网站制作平台
  • 湛江宇锋网站建设公司简介怎么写模板
  • 阜阳市城乡建设网站绍兴网站公司网站制作
  • 哪些网站可以做淘宝客成都网站建设服务平台
  • 英特尔nuc做网站服务器郑州商城app制作
  • 怎么做优惠卷网站网站建设管理风险
  • 建设一个企业网站需要多少钱百度 官网
  • 军博网站建设公司如何用front怕个做网站
  • 百度广告一级代理seo实战密码百度云
  • 黑龙江省建设厅网站站长胶南建网站