当前位置: 首页 > news >正文

苏州建设造价信息网站韩国美食做视频网站有哪些

苏州建设造价信息网站,韩国美食做视频网站有哪些,广州做门户网站,正规的关键词优化软件目录 一、主从同步工作原理 1. 主从配置 2. 启动HA 二、主从同步实现机制 1. 从Broker发送连接事件 2. 主Broker接收连接事件 3. 从Broker反馈复制进度 4. ReadSocketService线程读取从Broker复制进度 5. WriteSocketService传输同步消息 6. GroupTransferService线程…目录 一、主从同步工作原理 1. 主从配置 2. 启动HA 二、主从同步实现机制 1. 从Broker发送连接事件 2. 主Broker接收连接事件 3. 从Broker反馈复制进度 4. ReadSocketService线程读取从Broker复制进度 5. WriteSocketService传输同步消息 6. GroupTransferService线程通知HA结果 1)待需要HA的消息集合 2)通知消息发送者线程 三、读写分离机制 四、参考资料 一、主从同步工作原理 为了提高消息消费的高可用性避免Broker发生单点故障引起存储在Broker上的消息无法及时消费 RocketMQ引入Broker主备机制即消息消费到达主服务器后需要将消息同步到消息从服务器如果主服务器Broker宕机后消息消费者可以从从服务器拉取消息。 下图所示是Broker的HA交互机制流程图及类图。主从同步模式分为同步、异步。 step1主服务器启动并在特定端口上监听从服务器的连接step2从服务器主动连接主服务器主服务器接收客户端的连接并建立相关TCP连接step3从服务器主动向主服务器发送待拉取消息偏移量主服务器解析请求并返回消息给从服务器step4从服务器保存消息并继续发送新的消息同步请求。1. 主从配置 参考rocketmq-distribution项目的conf目录下有2主2从异步HA配置2m-2s-async、2主2从同步HA配置2m-2s-sync。以下1主1从异步HA配置实例如下。 主Broker配置 brokerClusterName DefaultCluster brokerName broker-a brokerId 0 deleteWhen 04 fileReservedTime 48 brokerRole ASYNC_MASTER flushDiskType ASYNC_FLUSH # 启动SQL过滤 enablePropertyFilter truenamesrvAddr192.168.1.55:9876;172.17.0.3:9876 brokerIP1192.168.1.55 从Broker配置 注意brokerName与主机相同brokerId 0时则为从0时则为主brokerRole角色为SLAVE从刷盘类型为ASYNC_FLUSH异步刷盘。 brokerClusterName DefaultCluster brokerName broker-a brokerId 1 deleteWhen 04 fileReservedTime 48 brokerRole SLAVE flushDiskType ASYNC_FLUSH # 启动SQL过滤 enablePropertyFilter truenamesrvAddr192.168.1.55:9876;172.17.0.3:9876 2. 启动HA org.apache.rocketmq.store.DefaultMessageStore#start是Broker启动方法如下图所示是其调用链及相关HA部分代码。  /*** broker启动时消息存储线程* BrokerController#startBasicService()* throws Exception*/ Override public void start() throws Exception {// 是否HA主从复制if (!messageStoreConfig.isEnableDLegerCommitLog() !this.messageStoreConfig.isDuplicationEnable()) {this.haService.init(this);}......if (this.haService ! null) {this.haService.start();}...... } org.apache.rocketmq.store.ha.DefaultHAService#init是HAService初始化方法如下代码所示。注意从Broker的broker.conf配置的brokerRole为SLAVE才能创建HAClient从Broker注册到主Broker。 Override public void init(final DefaultMessageStore defaultMessageStore) throws IOException {this.defaultMessageStore defaultMessageStore;this.acceptSocketService new DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig());this.groupTransferService new GroupTransferService(this, defaultMessageStore);if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() BrokerRole.SLAVE) {this.haClient new DefaultHAClient(this.defaultMessageStore);}this.haConnectionStateNotificationService new HAConnectionStateNotificationService(this, defaultMessageStore); } org.apache.rocketmq.store.ha.DefaultHAService#start是HAService启动方法。注意 org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService主Broker接收从Broker的连接事件org.apache.rocketmq.store.ha.GroupTransferService负责主Broker向从Broker发送同步数据org.apache.rocketmq.store.ha.HAClient从Broker向主Broker发送连接事件Broker启动时根据配置brokerRole配置ASYNC_MASTER、SYNC_MASTER、SLAVE判定Broker是主还是从。若是Slave角色在broker配置文件中获取haMasterAddress并更新至masterAddress但是haMasterAddress配置为空则启动成功但是不会执行HA。 /*** 启动HAService* step1{link AcceptSocketService}接收从Broker的注册事件方法是{link AcceptSocketService#beginAccept()}* step2启动{link AcceptSocketService}线程监听从Broker发送心跳* step3同步数据{link GroupTransferService}线程启动主Broker向从Broker发送数据* step4启动从Broker{link HAClient}发送心跳到主Broker*/ Override public void start() throws Exception {// 主接收从Broker的连接事件SelectionKey.OP_ACCEPT连接事件this.acceptSocketService.beginAccept();// 启动主Broker线程this.acceptSocketService.start();// 主Broker同步数据线程启动this.groupTransferService.start();this.haConnectionStateNotificationService.start();// 启动从Broker{link HAClient}发送心跳到主Brokerif (haClient ! null) {this.haClient.start();} } 二、主从同步实现机制 1. 从Broker发送连接事件 org.apache.rocketmq.store.ha.DefaultHAClient是从Broker向主Broker的发送连接事件的核心类是个线程。其主要属性如下代码所示。 // Socket读缓存区大小4M private static final int READ_MAX_BUFFER_SIZE 1024 * 1024 * 4; // 主Broker地址 private final AtomicReferenceString masterHaAddress new AtomicReference(); private final AtomicReferenceString masterAddress new AtomicReference(); // 从Broker向主Broker发起HA的偏移量 private final ByteBuffer reportOffset ByteBuffer.allocate(8); // 网络传输通道 private SocketChannel socketChannel; // 事件选择器 private Selector selector; /*** 上次读取主Broker的时间戳* last time that slave reads date from master.*/ private long lastReadTimestamp System.currentTimeMillis(); /*** 上次写入主Broker的时间戳* last time that slave reports offset to master.*/ private long lastWriteTimestamp System.currentTimeMillis();// 反馈HA的复制进度从Broker的Commitlog文件的最大偏移量 private long currentReportedOffset 0; // 本次处理读缓存区的指针 private int dispatchPosition 0; // 读缓存区 private ByteBuffer byteBufferRead ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); // 读缓存区备份与byteBufferRead交换 private ByteBuffer byteBufferBackup ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); private DefaultMessageStore defaultMessageStore; // HA连接状态 private volatile HAConnectionState currentState HAConnectionState.READY; // 流监控 private FlowMonitor flowMonitor; org.apache.rocketmq.store.ha.DefaultHAClient#run是HAClient启动执行任务其调用链和代码如下。 DefaultHAClient#connectMaster()从Broker连接到主Broker。DefaultHAClient#transferFromMaster()向主发送HA进度处理从主Broker发送过来的消息并commit所有消息追加到Commitlog文件内存映射缓存中。  /*** 启动HAClient* {link DefaultHAClient#connectMaster()}从Broker连接到主Broker* {link DefaultHAClient#transferFromMaster()}向主发送HA进度处理从主Broker发送过来的消息并commit所有消息追加到Commitlog文件内存映射缓存中*/ Override public void run() {log.info(this.getServiceName() service started);this.flowMonitor.start();while (!this.isStopped()) {try {switch (this.currentState) {case SHUTDOWN:return;case READY:if (!this.connectMaster()) {log.warn(HAClient connect to master {} failed, this.masterHaAddress.get());this.waitForRunning(1000 * 5);}continue;case TRANSFER:// 向主发送HA进度处理从主Broker发送过来的消息并commit所有消息追加到Commitlog文件内存映射缓存中if (!transferFromMaster()) {// 没有可拉取消息时设置READY状态closeMasterAndWait();continue;}break;default:this.waitForRunning(1000 * 2);continue;}long interval this.defaultMessageStore.now() - this.lastReadTimestamp;if (interval this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {log.warn(AutoRecoverHAClient, housekeeping, found this connection[ this.masterHaAddress ] expired, interval);this.closeMaster();log.warn(AutoRecoverHAClient, master not response some time, so close connection);}} catch (Exception e) {log.warn(this.getServiceName() service has exception. , e);this.closeMasterAndWait();}}log.info(this.getServiceName() service end); } 注意一旦HAClient线程启动后在状态READY、TRANSFER来回变化READY状态下发送从Broker连接事件到主Broker开启Socket连接TRANSFER状态下主从发送相关数据信息如从向主发送HA复制进度currentReportedOffset即从Broker的Commitlog文件的最大偏移量主向从发送同步消息。 org.apache.rocketmq.store.ha.DefaultHAClient#connectMaster是从Broker连接到主Broker的核心方法其代码如下。 /*** 从Broker连接到主Broker* 注意* a. Broker启动时若是Slave角色从broker配置文件中获取haMasterAddress并更新至masterAddress* b. 若是Slave角色但是haMasterAddress配置为空则启动成功但是不会执行HA* return true连接成功false连接失败*/ public boolean connectMaster() throws ClosedChannelException {if (null socketChannel) {// 获取主Broker地址String addr this.masterHaAddress.get();if (addr ! null) {// 根据地址创建SocketAddress对象SocketAddress socketAddress RemotingUtil.string2SocketAddress(addr);// 获取SocketChannelthis.socketChannel RemotingUtil.connect(socketAddress);if (this.socketChannel ! null) {// SocketChannel注册OP_READ网络读事件this.socketChannel.register(this.selector, SelectionKey.OP_READ);log.info(HAClient connect to master {}, addr);this.changeCurrentState(HAConnectionState.TRANSFER);}}// 获取Commitlog最大偏移量HA同步进度this.currentReportedOffset this.defaultMessageStore.getMaxPhyOffset();this.lastReadTimestamp System.currentTimeMillis();}return this.socketChannel ! null; } 2. 主Broker接收连接事件 org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService是主Broker接收从Broker连接事件的实现类是一个线程。其主要属性如下代码所示。 // 主Broker监听本地的Socket本地IP 端口号 private final SocketAddress socketAddressListen; // Socket通道基于NIO private ServerSocketChannel serverSocketChannel; // 事件选择器基于NIO private Selector selector; org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#beginAccept方法定义了主Broker监听从Broker的连接事件。 /*** 启动监听从broker的连接* Starts listening to slave connections.** throws Exception If fails.*/ public void beginAccept() throws Exception {this.serverSocketChannel ServerSocketChannel.open();this.selector RemotingUtil.openSelector();this.serverSocketChannel.socket().setReuseAddress(true); // TCP可重复使用this.serverSocketChannel.socket().bind(this.socketAddressListen); // 绑定监听端口if (0 messageStoreConfig.getHaListenPort()) {messageStoreConfig.setHaListenPort(this.serverSocketChannel.socket().getLocalPort());log.info(OS picked up {} to listen for HA, messageStoreConfig.getHaListenPort());}this.serverSocketChannel.configureBlocking(false); // 非阻塞模式this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); // 注册OP_ACCEPT连接事件 } org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#run监听到从Broker连接事件的任务处理为每个连接事件创建org.apache.rocketmq.store.ha.HAConnection对象并启动负责M-S的数据同步逻辑。 /*** 标准的NIO连接处理方式* step1选择器每1s处理一次连接就绪事件* step2是否连接事件若是创建{link SocketChannel}* step3每一个连接创建{link HAConnection}负责M-S的数据同步逻辑*/ Override public void run() {log.info(this.getServiceName() service started);while (!this.isStopped()) {try {// 选择器每1s处理一次连接就绪事件this.selector.select(1000);SetSelectionKey selected this.selector.selectedKeys();if (selected ! null) {for (SelectionKey k : selected) {// 是否连接事件if ((k.readyOps() SelectionKey.OP_ACCEPT) ! 0) {// 若是连接事件时创建SocketChannelSocketChannel sc ((ServerSocketChannel) k.channel()).accept();if (sc ! null) {DefaultHAService.log.info(HAService receive new connection, sc.socket().getRemoteSocketAddress());try {// 每一个连接创建HAConnection并启动负责M-S的数据同步逻辑HAConnection conn createConnection(sc);conn.start();DefaultHAService.this.addConnection(conn);} catch (Exception e) {log.error(new HAConnection exception, e);sc.close();}}} else {log.warn(Unexpected ops in select k.readyOps());}}selected.clear();}} catch (Exception e) {log.error(this.getServiceName() service has exception., e);}}log.info(this.getServiceName() service end); } org.apache.rocketmq.store.ha.DefaultHAConnection创建并启动时启动读、写线程服务。其关键属性如下代码所示。 private WriteSocketService writeSocketService主Broker向从Broker写数据服务类private ReadSocketService readSocketService主Broker读取从Broker数据服务类 private final DefaultHAService haService; private final SocketChannel socketChannel; // HA客户端连接地址 private final String clientAddress; // 主Broker向从Broker写数据服务类 private WriteSocketService writeSocketService; // 主Broker读取从Broker数据服务类 private ReadSocketService readSocketService; private volatile HAConnectionState currentState HAConnectionState.TRANSFER; // 从Broker请求拉取的偏移量 private volatile long slaveRequestOffset -1; // 从Broker反馈已完成的偏移量 private volatile long slaveAckOffset -1; private FlowMonitor flowMonitor; 3. 从Broker反馈复制进度 org.apache.rocketmq.store.ha.DefaultHAClient#transferFromMaster是从Broker与主Broker传输数据的核心方法代码如下所示该方法有两大功能 从Broker向主Broker反馈HA复制进度即currentReportedOffset从Broker的Commitlog文件的最大偏移量方法org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset执行。从Broker接收主BrokerHA同步消息内容方法org.apache.rocketmq.store.ha.DefaultHAClient#processReadEvent执行。 /*** 向主反馈HA复制进度处理从主Broker发送过来的消息并commit所有消息追加到Commitlog文件内存映射缓存中* {link DefaultHAClient#reportSlaveMaxOffset(long)}向主反馈HA复制进度即currentReportedOffset从Broker的Commitlog文件的最大偏移量* {link DefaultHAClient#processReadEvent()}处理从主Broker发送过来的消息并commit所有消息追加到Commitlog文件内存映射缓存中*/ private boolean transferFromMaster() throws IOException {boolean result;// 判断是否需要向主Broker反馈当前待拉取偏移量if (this.isTimeToReportOffset()) {log.info(Slave report current offset {}, this.currentReportedOffset);// 向主Broker反馈拉取偏移量result this.reportSlaveMaxOffset(this.currentReportedOffset);if (!result) {return false;}}this.selector.select(1000);// 处理主Broker发送过来的消息数据result this.processReadEvent();if (!result) {return false;}return reportSlaveMaxOffsetPlus(); } org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset向主Broker反馈HA复制进度代码如下。 /*** 向主Broker反馈拉取偏移量* 注意* a. 向主Broker反馈拉取偏移量maxOffset: 对于Slave端发送下次待拉取消息偏移量* 对于Master端本次请求拉取的偏移量也可以理解为同步ACK* b. 手动切换ByteBuffer的写模式/读模式* c. 通过{link Buffer#hasRemaining()}判断缓存内容是否完全写入SocketChannel基于NIO模式的写范例* param maxOffset HA待拉取偏移量* return ByteBuffer缓存的内容是否写完*/ private boolean reportSlaveMaxOffset(final long maxOffset) {// 偏移量写入ByteBufferthis.reportOffset.position(0); // 写缓存位置this.reportOffset.limit(8); // 写缓存字节长度this.reportOffset.putLong(maxOffset); // 偏移量写入ByteBuffer// 将ByteBuffer的写模式 转为 读模式this.reportOffset.position(0);this.reportOffset.limit(8);// 循环并判定ByteBuffer是否完全写入SocketChannelfor (int i 0; i 3 this.reportOffset.hasRemaining(); i) {try {this.socketChannel.write(this.reportOffset);} catch (IOException e) {log.error(this.getServiceName() reportSlaveMaxOffset this.socketChannel.write exception, e);return false;}}lastWriteTimestamp this.defaultMessageStore.getSystemClock().now();return !this.reportOffset.hasRemaining(); } 4. ReadSocketService线程读取从Broker复制进度 org.apache.rocketmq.store.ha.DefaultHAConnection.ReadSocketService#processReadEvent是主Broker读取从Broker拉取消息的请求获取内容是HA复制进度。其代码如下看出主Broker获取从Broker的HA复制进度后赋值给DefaultHAConnection#slaveRequestOffset属性后立即唤醒GroupTransferService线程执行消息同步。 /*** 主Broker读取从Broker拉取消息的请求* step1判定byteBufferRead是否有剩余空间没有则{link Buffer#flip()}* step2用剩余空间从SocketChannel读数据到缓存中读取到的内容是从Broker拉取消息的偏移量* step3通知等待同步HA复制结果的发送消息线程* return*/ private boolean processReadEvent() {int readSizeZeroTimes 0;/*byteBufferRead没有剩余空间时则position limit capacity调用flip()方法后则position 0, limit capacity加上processPosition 0说明从头开始处理*/if (!this.byteBufferRead.hasRemaining()) {this.byteBufferRead.flip(); // ByteBuffer重置处理this.processPosition 0;}// ByteBuffer有剩余空间循环至byteBufferRead没有剩余空间while (this.byteBufferRead.hasRemaining()) {try {// 从SocketChannel读数据到缓存中int readSize this.socketChannel.read(this.byteBufferRead);if (readSize 0) {readSizeZeroTimes 0; // 重置this.lastReadTimestamp DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();// 读取内容长度 8说明收到从Broker的拉取请求内容是offsetif ((this.byteBufferRead.position() - this.processPosition) 8) {int pos this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);long readOffset this.byteBufferRead.getLong(pos - 8);this.processPosition pos;// 从Broker反馈已完成的偏移量DefaultHAConnection.this.slaveAckOffset readOffset;// 更新从Broker请求拉取的偏移量if (DefaultHAConnection.this.slaveRequestOffset 0) {DefaultHAConnection.this.slaveRequestOffset readOffset;log.info(slave[ DefaultHAConnection.this.clientAddress ] request offset readOffset);}// 通知等待同步HA复制结果的发送消息线程DefaultHAConnection.this.haService.notifyTransferSome(DefaultHAConnection.this.slaveAckOffset);}} else if (readSize 0) {// 连续读取字节数0则终止本次读取处理if (readSizeZeroTimes 3) {break;}} else {log.error(read socket[ DefaultHAConnection.this.clientAddress ] 0);return false;}} catch (IOException e) {log.error(processReadEvent exception, e);return false;}}return true; } 5. WriteSocketService传输同步消息 ReadSocketService线程读取从Broker发送的HA复制进度由org.apache.rocketmq.store.ha.DefaultHAConnection.WriteSocketService根据DefaultHAConnection#slaveRequestOffset获取主Broker还没有同步的所有消息进行HA同步。其如下代码所示WriteSocketService#run方法是同步消息核心逻辑。 /*** 传输消息内容到HA客户端* step1slaveRequestOffset为-1时说明主Broker没有收到从Broker的拉取请求忽略本次写事件* step2nextTransferFromWhere为-1时说明初次传输计算nextTransferFromWhere待传输offset* step3判断上次是否传输完* 上次传输完当前时间 与 上次传输时间的间隔 发送心跳包时间间隔* 发送心跳包长度: nextTransferFromWhere size消息长度0避免长连接由空闲而关闭* 上次传输没有完成继续传输忽略本次写事件* step4根据从Broker待拉取消息offset查找之后的所有可读消息* step5待同步消息总大小 一次传输的大小默认32KB则截取此时一次传输不是完整的消息* step6传输消息内容*/ Override public void run() {log.info(this.getServiceName() service started);while (!this.isStopped()) {try {this.selector.select(1000);// slaveRequestOffset为-1时说明主Broker没有收到从Broker的拉取请求忽略本次写事件if (-1 DefaultHAConnection.this.slaveRequestOffset) {Thread.sleep(10);continue;}/*nextTransferFromWhere为-1时说明初次传输初次传输时计算nextTransferFromWhere待传输offset*/// 初次传输if (-1 this.nextTransferFromWhere) {// 0 时从Commitlog文件的最大偏移量传输if (0 DefaultHAConnection.this.slaveRequestOffset) {long masterOffset DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();masterOffset masterOffset- (masterOffset % DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());if (masterOffset 0) {masterOffset 0;}this.nextTransferFromWhere masterOffset;}// !0 时从Broker请求的偏移量else {this.nextTransferFromWhere DefaultHAConnection.this.slaveRequestOffset;}log.info(master transfer data from this.nextTransferFromWhere to slave[ DefaultHAConnection.this.clientAddress ], and slave request DefaultHAConnection.this.slaveRequestOffset);}/*判断上次是否传输完上次传输完当前时间 与 上次传输时间的间隔 发送心跳包时间间隔发送心跳包长度: nextTransferFromWhere size消息长度0避免长连接由空闲而关闭上次传输没有完成继续传输忽略本次写事件*/// lastWriteOver为true则上次传输完if (this.lastWriteOver) {// 当前时间 与 上次传输时间的间隔long interval DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;// 时间间隔 发送心跳包时间间隔if (interval DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {// Build Header 发送心跳包长度: nextTransferFromWhere size消息长度0避免长连接由空闲而关闭this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(this.nextTransferFromWhere);this.byteBufferHeader.putInt(0);this.byteBufferHeader.flip();// 传输数据this.lastWriteOver this.transferData();if (!this.lastWriteOver)continue;}}// lastWriteOver为false则上次传输没有完成则继续传输else {// 继续传输上次拉取请求还未完成则忽略本次写事件this.lastWriteOver this.transferData();if (!this.lastWriteOver)continue;}// 根据从Broker待拉取消息offset查找之后的所有可读消息SelectMappedBufferResult selectResult DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);if (selectResult ! null) {int size selectResult.getSize();// 待同步消息总大小 一次传输的大小默认32KBif (size DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {size DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();}int canTransferMaxBytes flowMonitor.canTransferMaxByteNum();if (size canTransferMaxBytes) {if (System.currentTimeMillis() - lastPrintTimestamp 1000) {log.warn(Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s,String.format(%.2f, flowMonitor.maxTransferByteInSecond() / 1024.0),String.format(%.2f, flowMonitor.getTransferredByteInSecond() / 1024.0));lastPrintTimestamp System.currentTimeMillis();}size canTransferMaxBytes;}long thisOffset this.nextTransferFromWhere;this.nextTransferFromWhere size; // 下一次写入的offsetselectResult.getByteBuffer().limit(size);this.selectMappedBufferResult selectResult;// Build Header 传输size大小的消息内容不一定是完整的消息this.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(thisOffset);this.byteBufferHeader.putInt(size);this.byteBufferHeader.flip();// 传输数据this.lastWriteOver this.transferData();} else {DefaultHAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);}} catch (Exception e) {DefaultHAConnection.log.error(this.getServiceName() service has exception., e);break;}}DefaultHAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();if (this.selectMappedBufferResult ! null) {this.selectMappedBufferResult.release();}changeCurrentState(HAConnectionState.SHUTDOWN);this.makeStop();readSocketService.makeStop();haService.removeConnection(DefaultHAConnection.this);SelectionKey sk this.socketChannel.keyFor(this.selector);if (sk ! null) {sk.cancel();}try {this.selector.close();this.socketChannel.close();} catch (IOException e) {DefaultHAConnection.log.error(, e);}DefaultHAConnection.log.info(this.getServiceName() service end); } 6. GroupTransferService线程通知HA结果 org.apache.rocketmq.store.ha.GroupTransferService该类负责将主从同步复制结束后通知阻塞的消息发送者线程。同步主从Broker模式即消息刷磁盘后继续等待新消息被传输到从Broker等待传输结果并通知消息发送线程。 1)待需要HA的消息集合 org.apache.rocketmq.store.CommitLog#asyncPutMessage是消息生产者发送消息到Broker时执行存储消息参考《RocketMQ5.0.0消息存储二_消息存储流程》该方法会根据同步或异步模式默认来执行org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA方法完成刷盘和HA复制方法调用链如下。 生产者把消息发送到Broker完成commit操作消息提交到文件内存映射中 随后根据同步/异步模式完成刷盘和HA。HA操作时把消息提交请求添加到org.apache.rocketmq.store.ha.GroupTransferService.requestsWrite是主Broker待需要HA的的集合。以下是org.apache.rocketmq.store.CommitLog#handleHA的代码。   private CompletableFuturePutMessageStatus handleHA(AppendMessageResult result, PutMessageResult putMessageResult,int needAckNums) {if (needAckNums 0 needAckNums 1) {return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}HAService haService this.defaultMessageStore.getHaService();long nextOffset result.getWroteOffset() result.getWroteBytes();// Wait enough acks from different slavesGroupCommitRequest request new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);haService.putRequest(request);haService.getWaitNotifyObject().wakeupAll();return request.future(); } 2)通知消息发送者线程 HAService启动时会启动GroupTransferService线程。GroupTransferService#run执行任务如下代码所示。 Override public void run() {log.info(this.getServiceName() service started);while (!this.isStopped()) {try {// 间隔10sthis.waitForRunning(10);// 主从同步复制结束后通知阻塞的消息发送者线程this.doWaitTransfer();} catch (Exception e) {log.warn(this.getServiceName() service has exception. , e);}}log.info(this.getServiceName() service end); } 其中执行waitForRunning()方法时会去执行org.apache.rocketmq.store.ha.GroupTransferService#swapRequests方法使得requestsWrite与requestsRead两个集合对调 private volatile ListCommitLog.GroupCommitRequest requestsWrite主Broker待需要HA的消息集合private volatile ListCommitLog.GroupCommitRequest requestsRead主Broker正在执行的HA集合 private void swapRequests() {ListCommitLog.GroupCommitRequest tmp this.requestsWrite;this.requestsWrite this.requestsRead;this.requestsRead tmp; } org.apache.rocketmq.store.ha.GroupTransferService#doWaitTransfer方法是主从同步复制结束后通知阻塞的消息发送者线程如下代码所示。 /*** 主从同步复制结束后通知阻塞的消息发送者线程* step1遍历消息提交请求内存提交到Commitlog文件的内存映射* step2判断主从同步成功从已经成功复制的最大偏移量 消息生产者发送消息后返回下一条消息的偏移量*/ private void doWaitTransfer() {// 加锁synchronized (this.requestsRead) {if (!this.requestsRead.isEmpty()) {// commit请求即内存提交到Commitlog文件的内存映射for (CommitLog.GroupCommitRequest req : this.requestsRead) {boolean transferOK false;long deadLine req.getDeadLine(); // 是否超时final boolean allAckInSyncStateSet req.getAckNums() MixAll.ALL_ACK_IN_SYNC_STATE_SET;for (int i 0; !transferOK deadLine - System.nanoTime() 0; i) { // 是否超时if (i 0) {// 等待1sthis.notifyTransferObject.waitForRunning(1000);}if (!allAckInSyncStateSet req.getAckNums() 1) {// 主从同步成功判断从已经成功复制的最大偏移量 消息生产者发送消息后返回下一条消息的偏移量transferOK haService.getPush2SlaveMaxOffset().get() req.getNextOffset();continue;}if (allAckInSyncStateSet this.haService instanceof AutoSwitchHAService) {// In this mode, we must wait for all replicas that in InSyncStateSet.final AutoSwitchHAService autoSwitchHAService (AutoSwitchHAService) this.haService;final SetString syncStateSet autoSwitchHAService.getSyncStateSet();if (syncStateSet.size() 1) {// Only mastertransferOK true;break;}// Include masterint ackNums 1;for (HAConnection conn : haService.getConnectionList()) {final AutoSwitchHAConnection autoSwitchHAConnection (AutoSwitchHAConnection) conn;if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) autoSwitchHAConnection.getSlaveAckOffset() req.getNextOffset()) {ackNums;}if (ackNums syncStateSet.size()) {transferOK true;break;}}} else {// Include masterint ackNums 1;for (HAConnection conn : haService.getConnectionList()) {// TODO: We must ensure every HAConnection represents a different slave// Solution: Consider assign a unique and fixed IP:ADDR for each different slaveif (conn.getSlaveAckOffset() req.getNextOffset()) {ackNums;}if (ackNums req.getAckNums()) {transferOK true;break;}}}}if (!transferOK) {log.warn(transfer message to slave timeout, offset : {}, request acks: {},req.getNextOffset(), req.getAckNums());}// 从完成复制后唤醒消息发送者线程req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);}this.requestsRead.clear();}} } 三、读写分离机制 RocketMQ读写分离与其他中间件的实现方式完全不同RocketMQ是消费者首先向主服务器发起拉取消息请求然后主服务器返回一批消息然后会根据主服务器负载压力与主从同步情况向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取。 RocketMQ根据MessageQueu查找Broker地址的唯一依据是brokerName。Broker组织中根据brokerName获取一组Broker服务器M-S它们的brokerName相同但brokerId不同主服务器的brokerId为0从服务器的brokerId大于0。 其方法是org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe 详细消费拉取消息时实现读写分离机制见后续章节参考《RocketMQ5.0.0消息消费一 _ PUSH模式的消息拉取》。 四、参考资料 【RocketMQ】学习RocketMQ必须要知道的主从同步原理_午睡的猫…的博客-CSDN博客_rocketmq主从同步原理 【RocketMQ】主从同步实现原理 - shanml - 博客园 RocketMQ5.0.0消息存储二_消息存储流程_爱我所爱0505的博客-CSDN博客_rocketmq 消息写入流程RocketMQ5.0.0消息存储三_消息转发与恢复机制_爱我所爱0505的博客-CSDN博客  RocketMQ5.0.0消息存储四_刷盘机制_爱我所爱0505的博客-CSDN博客
http://www.dnsts.com.cn/news/272287.html

相关文章:

  • 个人想做企业网站备案网络营销代理
  • 南山企业网站建设自己做的网站 网站备案流程
  • 企业网站手机版企业网站建设情况说明
  • 广东地区建网站的公司大连工程信息招标网
  • wap网站使用微信登陆服务器网站备案
  • 国外免费网站可以直接做ppt的网站
  • 做网站架构需要注意什么建材团购网站建设方案
  • 企业网站建设公司 丰台谷歌外链工具
  • 爱站seo查询江门网站推广设计
  • 网站建设seo规范中国上海门户网站
  • 商务网站建设体会wordpress 魔客
  • 那个网站建设常州哪里做网站
  • 360网站页面的工具栏怎么做企业网站建设费用需要多少钱
  • 国内有wix做的好的网站检察 网站建设
  • 深圳住建设局网站四川微信网站建设推
  • 北京网站设计首选 新鸿儒多少钱算受贿
  • 科大讯飞哪些做教学资源的网站如何做公司简介介绍
  • app网站与普通网站的区别是什么上海的咨询公司排名
  • wordpress网站特别慢四川建设网站官网
  • html5公司网站欣赏物流企业网站建设
  • 想看外国的网站怎么做凌点视频素材网
  • 广东移动手机营业厅网站大方做网站
  • 怎么在网站上做抽奖网站电话改了子页怎么改
  • 苏州建设网站公司建设外贸网站案例
  • 做网站的流程与步骤网页前端开发框架
  • 成都电商网站建设百度风云榜各年度小说排行榜
  • 美丽乡村网站建设策划书建设网站的价格分析
  • 做网站需要填什么上海长宁区网站建设
  • 企业网站 域名注册wordpress 评论过滤
  • 小企业网站建设的服务机构wordpress去掉版权信息