当前位置: 首页 > news >正文

做水果网站平台wordpress 小工具 开发

做水果网站平台,wordpress 小工具 开发,石家庄房产网上备案查询,苏州制作网页找哪家文章目录 Nacos配置中心源码总流程图NacosClient源码分析获取配置注册监听器 NacosServer源码分析配置dump配置发布 Nacos配置中心源码 总流程图 Nacos2.1.0源码分析在线流程图 源码的版本为2.1.0 #xff0c;并在配置了下面两个启动参数#xff0c;一个表示单机启动#… 文章目录 Nacos配置中心源码总流程图NacosClient源码分析获取配置注册监听器 NacosServer源码分析配置dump配置发布 Nacos配置中心源码 总流程图 Nacos2.1.0源码分析在线流程图 源码的版本为2.1.0 并在配置了下面两个启动参数一个表示单机启动一个是指定nacos的工作目录其中会存放各种运行文件方便查看 -Dnacos.standalonetrue -Dnacos.homeD:\nacos-cluster\nacos2.1.0standaloneNacosClient源码分析 在NacosClient端服务注册中心核心的接口是NamingService而配置中心核心的接口是ConfigService 我们可以添加一个配置然后查看这里的实例代码 /* * Demo for Nacos * pom.xmldependencygroupIdcom.alibaba.nacos/groupIdartifactIdnacos-client/artifactIdversion${version}/version/dependency */ package com.alibaba.nacos.example;import java.util.Properties; import java.util.concurrent.Executor; import com.alibaba.nacos.api.NacosFactory; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.config.listener.Listener; import com.alibaba.nacos.api.exception.NacosException;/*** Config service example** author Nacos**/ public class ConfigExample {public static void main(String[] args) throws NacosException, InterruptedException {String serverAddr localhost;String dataId nacos-config-demo.yaml;String group DEFAULT_GROUP;Properties properties new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);//获取配置服务ConfigService configService NacosFactory.createConfigService(properties);//获取配置String content configService.getConfig(dataId, group, 5000);System.out.println(content);//注册监听器configService.addListener(dataId, group, new Listener() {Overridepublic void receiveConfigInfo(String configInfo) {System.out.println(recieve: configInfo);}Overridepublic Executor getExecutor() {return null;}});//发布配置//boolean isPublishOk configService.publishConfig(dataId, group, content);//System.out.println(isPublishOk);//发送properties格式configService.publishConfig(dataId,group,common.age30, ConfigType.PROPERTIES.getType());Thread.sleep(3000);content configService.getConfig(dataId, group, 5000);System.out.println(content);/* boolean isRemoveOk configService.removeConfig(dataId, group);System.out.println(isRemoveOk);Thread.sleep(3000);content configService.getConfig(dataId, group, 5000);System.out.println(content);Thread.sleep(300000);*/} } 获取配置 总结 获取配置的主要方法是 NacosConfigService 类的 getConfig 方法通常情况下该方法直接从本地文件中取得配置的值如果本地文件不存在或者内容为空则再通过grpc从远端拉取配置并保存到本地快照中。 NacosServer端的处理是从磁盘读取配置文件./nacosHome/data/config-data/DEFAULT_GROUP/dataId然后将读取到的content返回 接下来的源码就是这里一块的流程它是如何调用到NacosConfigService 类的 getConfig ()方法 public interface ConfigService {String getConfig(String dataId, String group, long timeoutMs) throws NacosException;...... }还是一样从spring.factiries文件中起步 进入到NacosConfigBootstrapConfiguration自动配置类的这其中会创建一个NacosPropertySourceLocatorbean对象 Configuration(proxyBeanMethods false) ConditionalOnProperty(name {spring.cloud.nacos.config.enabled},matchIfMissing true ) public class NacosConfigBootstrapConfiguration {public NacosConfigBootstrapConfiguration() {}BeanConditionalOnMissingBeanpublic NacosConfigProperties nacosConfigProperties() {return new NacosConfigProperties();}BeanConditionalOnMissingBeanpublic NacosConfigManager nacosConfigManager(NacosConfigProperties nacosConfigProperties) {return new NacosConfigManager(nacosConfigProperties);}// 核心beanBeanpublic NacosPropertySourceLocator nacosPropertySourceLocator(NacosConfigManager nacosConfigManager) {return new NacosPropertySourceLocator(nacosConfigManager);}BeanConditionalOnMissingBean(search SearchStrategy.CURRENT)ConditionalOnNonDefaultBehaviorpublic ConfigurationPropertiesRebinder smartConfigurationPropertiesRebinder(ConfigurationPropertiesBeans beans) {return new SmartConfigurationPropertiesRebinder(beans);} }在NacosPropertySourceLocator这个bean中它的接口中的默认方法会调用locate()方法 加载共享配置文件也就是shared-configs配置项指定的数组 加载加载扩展的配置文件也就是extension-configs配置项指定的数组 加载和应用名相关的几个默认配置文件比如order-service-dev.yml 上面三个方法中都会各自调用到loadNacosDataIfPresent() -- loadNacosPropertySource(...) -- NacosPropertySourceBuilder.build() public class NacosPropertySourceLocator implements PropertySourceLocator {public PropertySource? locate(Environment env) {this.nacosConfigProperties.setEnvironment(env);// 获取配置中心服务ConfigServiceConfigService configService this.nacosConfigManager.getConfigService();if (null configService) {log.warn(no instance of config service found, cant load config from nacos);return null;} else {long timeout (long)this.nacosConfigProperties.getTimeout();this.nacosPropertySourceBuilder new NacosPropertySourceBuilder(configService, timeout);String name this.nacosConfigProperties.getName();String dataIdPrefix this.nacosConfigProperties.getPrefix();if (StringUtils.isEmpty(dataIdPrefix)) {dataIdPrefix name;}if (StringUtils.isEmpty(dataIdPrefix)) {dataIdPrefix env.getProperty(spring.application.name);}CompositePropertySource composite new CompositePropertySource(NACOS);// 加载共享配置文件this.loadSharedConfiguration(composite);// 加载扩展的配置文件this.loadExtConfiguration(composite);// 加载当前应用配置文件// 在该方法中会进行下面三行的逻辑/*dataIdPrefixdataIdPrefix . fileExtensiondataIdPrefix - profile . fileExtension*/this.loadApplicationConfiguration(composite, dataIdPrefix, this.nacosConfigProperties, env);return composite;}}// 可以详细看一下NacosClient启动时是怎么根据微服务名去取配置文件的private void loadApplicationConfiguration() {String fileExtension properties.getFileExtension();String nacosGroup properties.getGroup();// 最先使用微服务名 调用下面的loadNacosDataIfPresent()方法this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup, fileExtension, true);// 接下来是使用微服务名文件后缀名的方式 调用下面的loadNacosDataIfPresent()方法this.loadNacosDataIfPresent(compositePropertySource, dataIdPrefix . fileExtension, nacosGroup, fileExtension, true);String[] var7 environment.getActiveProfiles();int var8 var7.length;for(int var9 0; var9 var8; var9) {String profile var7[var9];// 第三次使用 微服务名profile 文件后缀名 调用下面的loadNacosDataIfPresent()方法String dataId dataIdPrefix - profile . fileExtension;this.loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup, fileExtension, true);}}// 在上面三个加载共享、扩展、当前应用名方法中最终都会调用到下面的loadNacosDataIfPresent(...) 方法中private void loadNacosDataIfPresent(...) {if (null ! dataId dataId.trim().length() 1) {if (null ! group group.trim().length() 1) {// 调用loadNacosPropertySource方法NacosPropertySource propertySource this.loadNacosPropertySource(dataId, group, fileExtension, isRefreshable);this.addFirstPropertySource(composite, propertySource, false);}}}// loadNacosDataIfPresent(...) --- loadNacosPropertySource(...)private NacosPropertySource loadNacosPropertySource(...) {return NacosContextRefresher.getRefreshCount() ! 0L !isRefreshable ? NacosPropertySourceRepository.getNacosPropertySource(dataId, group) : // 这里会进入到NacosPropertySourceBuilder类的build方法this.nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable);}}NacosPropertySourceBuilder类的代码调用流程 这里就会调用到核心接口configService接口实现类的getConfig()方法 public class NacosPropertySourceBuilder {......NacosPropertySource build(String dataId, String group, String fileExtension, boolean isRefreshable) {// 这里会先调用loadNacosData方法ListPropertySource? propertySources this.loadNacosData(dataId, group, fileExtension);NacosPropertySource nacosPropertySource new NacosPropertySource(propertySources, group, dataId, new Date(), isRefreshable);NacosPropertySourceRepository.collectNacosPropertySource(nacosPropertySource);return nacosPropertySource;}// build(...) --- loadNacosData(...)private ListPropertySource? loadNacosData(String dataId, String group, String fileExtension) {String data null;try {// 这里进入到了configService接口实现类的getConfig()方法data this.configService.getConfig(dataId, group, this.timeout);if (StringUtils.isEmpty(data)) {log.warn(Ignore the empty nacos configuration and get it based on dataId[{}] group[{}], dataId, group);return Collections.emptyList();}if (log.isDebugEnabled()) {log.debug(String.format(Loading nacos data, dataId: %s, group: %s, data: %s, dataId, group, data));}return NacosDataParserHandler.getInstance().parseNacosData(dataId, data, fileExtension);} catch (NacosException var6) {log.error(get data from Nacos error,dataId:{} , dataId, var6);} catch (Exception var7) {log.error(parse data from Nacos error,dataId:{},data:{}, new Object[]{dataId, data, var7});}return Collections.emptyList();}}核心方法NacosClient向NacosServer发送请求拉取配置的方法。 前面的调用栈如果不会可以直接在下面getConfig()方法出打一个断点然后从debug中看调用栈。方法具体的实现 NacosClient端这里首先会读取本地文件本地是有一个缓存的如果本地缓存中没有我们需要的配置那么就需要从NacosServer端拉取配置了发送请求获取响应数据将数据在本地文件中缓存一份 // 只是方法调用 public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {return getConfigInner(namespace, dataId, group, timeoutMs); }private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {group blank2defaultGroup(group);ParamUtils.checkKeyParam(dataId, group);ConfigResponse cr new ConfigResponse();cr.setDataId(dataId);cr.setTenant(tenant);cr.setGroup(group);// use local config first// NacosClient端这里首先会读取本地文件本地是有一个缓存的String content LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);if (content ! null) {LOGGER.warn(..);cr.setContent(content);String encryptedDataKey LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);content cr.getContent();return content;}// 如果本地缓存中没有我们需要的配置那么就需要从NacosServer端拉取配置了try {// 从服务端获取配置ConfigResponse response worker.getServerConfig(dataId, group, tenant, timeoutMs, false);cr.setContent(response.getContent());cr.setEncryptedDataKey(response.getEncryptedDataKey());configFilterChainManager.doFilter(null, cr);content cr.getContent();return content;} catch (NacosException ioe) {if (NacosException.NO_RIGHT ioe.getErrCode()) {throw ioe;}LOGGER.warn(..);}LOGGER.warn(..);// 再从本地文件缓存中找content LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);cr.setContent(content);String encryptedDataKey LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);content cr.getContent();return content; }//-------------------------- public ConfigResponse getServerConfig(String dataId, String group, String tenant, long readTimeout, boolean notify)throws NacosException {// 默认组名if (StringUtils.isBlank(group)) {group Constants.DEFAULT_GROUP;}// 从服务端查询配置return this.agent.queryConfig(dataId, group, tenant, readTimeout, notify); }// 向NacosServer发送请求拉取配置数据并在本地文件中缓存一份 public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify)throws NacosException {ConfigQueryRequest request ConfigQueryRequest.build(dataId, group, tenant);request.putHeader(NOTIFY_HEADER, String.valueOf(notify));RpcClient rpcClient getOneRunningClient();if (notify) {CacheData cacheData cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));if (cacheData ! null) {rpcClient ensureRpcClient(String.valueOf(cacheData.getTaskId()));}}// 发送请求获取响应数据ConfigQueryResponse response (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);ConfigResponse configResponse new ConfigResponse();if (response.isSuccess()) {// 将数据在本地文件中缓存一份LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());configResponse.setContent(response.getContent());String configType;if (StringUtils.isNotBlank(response.getContentType())) {configType response.getContentType();} else {configType ConfigType.TEXT.getType();}configResponse.setConfigType(configType);String encryptedDataKey response.getEncryptedDataKey();LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey);configResponse.setEncryptedDataKey(encryptedDataKey);return configResponse;} else if (response.getErrorCode() ConfigQueryResponse.CONFIG_NOT_FOUND) {LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, null);LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, null);return configResponse;} else if (response.getErrorCode() ConfigQueryResponse.CONFIG_QUERY_CONFLICT) {...} }注册监听器 结论 从spring.facotries文件中开始其中一个bean会监听spring容器启动完成的事件然后它会为当前应用添加监听器遍历每个dataId添加监听器。当nacosServer端更改了配置这里监听器中的方法就会运行这里都会发布一个RefreshEvent事件处理RefreshEvent事件的方法中会 刷新环境变量销毁RefreshScope注解修改的bean实例 NacosServer端如果修改了配置就会发布一个事件而在NacosClient端这边就会有一个EventListener去监听该事件并进行相应的处理。 在ConfigService接口中有三个和监听器相关的方法 public interface ConfigService {String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener)throws NacosException;void addListener(String dataId, String group, Listener listener) throws NacosException;void removeListener(String dataId, String group, Listener listener);}接下来进入源码中入口是NacosConfigAutoConfiguration自动配置的NacosContextRefresherbean 对象 Configuration(proxyBeanMethods false) ConditionalOnProperty(name {spring.cloud.nacos.config.enabled},matchIfMissing true ) public class NacosConfigAutoConfiguration {...Beanpublic NacosContextRefresher nacosContextRefresher(NacosConfigManager nacosConfigManager, NacosRefreshHistory nacosRefreshHistory) {return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);}... }该类它监听了ApplicationReadyEvent事件 在spring容器启动完成后就会调用该类的onApplicationEvent()方法 给当前应用注册nacos监听器 为每个 dataId注册监听器 当某个dataId发生了更改这里都会发布一个RefreshEvent事件 public class NacosContextRefresher implements ApplicationListenerApplicationReadyEvent , ApplicationContextAware {// 在spring容器启动完成后就会调用该类的onApplicationEvent()方法public void onApplicationEvent(ApplicationReadyEvent event) {if (this.ready.compareAndSet(false, true)) {// 给当前应用注册nacos监听器this.registerNacosListenersForApplications();}}// 给当前应用注册nacos监听器private void registerNacosListenersForApplications() {// 是否刷新配置默认为trueif (this.isRefreshEnabled()) {Iterator var1 NacosPropertySourceRepository.getAll().iterator();// 遍历每个dataIdwhile(var1.hasNext()) {NacosPropertySource propertySource (NacosPropertySource)var1.next();if (propertySource.isRefreshable()) {String dataId propertySource.getDataId();// 为每个 dataId注册监听器this.registerNacosListener(propertySource.getGroup(), dataId);}}}}// 为每个 dataId注册监听器private void registerNacosListener(final String groupKey, final String dataKey) {String key NacosPropertySourceRepository.getMapKey(dataKey, groupKey);// 定义一个监听器Listener listener (Listener)this.listenerMap.computeIfAbsent(key, (lst) - {return new AbstractSharedListener() {public void innerReceive(String dataId, String group, String configInfo) {NacosContextRefresher.refreshCountIncrement();// 配置的历史记录NacosContextRefresher.this.nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);// 发布一个RefreshEvent事件会在处理该事件的位置真正进行刷新配置项NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, ...));}};});try {// 调用configService接口的addListener()添加监听器this.configService.addListener(dataKey, groupKey, listener);} catch (NacosException var6) {log.warn(String.format(register fail for nacos listener ,dataId[%s],group[%s], dataKey, groupKey), var6);}} }当NacosServer端某个配置文件改动后就会回调上面监听器的innerReceive()方法在该方法中就会发布RefreshEvent事件处理该事件的是RefreshEventListener类中的onApplicationEvent()方法 直接调用refresh()方法 public class RefreshEventListener implements SmartApplicationListener {...public void onApplicationEvent(ApplicationEvent event) {if (event instanceof ApplicationReadyEvent) {this.handle((ApplicationReadyEvent)event);} else if (event instanceof RefreshEvent) {// 处理RefreshEvent事件调用handler()方法this.handle((RefreshEvent)event);}}public void handle(ApplicationReadyEvent event) {this.ready.compareAndSet(false, true);}public void handle(RefreshEvent event) {if (this.ready.get()) {// 这里就会调用refresh()方法进行刷新SetString keys this.refresh.refresh();}} }接下来就进入到了ContextRefresher类的refresh()方法 刷新环境变量销毁RefreshScope注解修改的bean实例 public synchronized SetString refresh() {// 刷新环境变量SetString keys this.refreshEnvironment();// 销毁RefreshScope注解修改的bean实例this.scope.refreshAll();return keys; }NacosServer源码分析 配置dump 服务端启动时就会依赖 DumpService 的 init 方法从数据库中 load 配置存储在本地磁盘上并将一些重要的元信息例如 MD5 值缓存在内存中。服务端会根据心跳文件中保存的最后一次心跳时间来判断到底是从数据库 dump 全量配置数据还是部分增量配置数据如果机器上次心跳间隔是 6h 以内的话。 全量 dump 当然先清空磁盘缓存然后根据主键 ID 每次捞取一千条配置刷进磁盘和内存。增量 dump 就是捞取最近六小时的新增配置包括更新的和删除的先按照这批数据刷新一遍内存和文件再根据内存里所有的数据全量去比对一遍数据库如果有改变的再同步一次相比于全量 dump 的话会减少一定的数据库 IO 和磁盘 IO 次数。 配置发布 结论 更改数据库中的数据持久化信息到mysql 触发一个ConfigDataChangeEvent事件。至此请求结束。 接下来就处理上面的事件 遍历Nacos集群下的所有节点包括自己 生成一个http/rpc的任务对象去执行这里就直接看rpc任务对象的处理 判断是不是当前节点如果是就调用dump()方法去处理 将更改的数据保存至本地磁盘中 生成md5并通过一个key将md5存入cache中再发布一个LocalDataChangeEvent事件该事件存了key 处理上方事件的方法中会开启一个任务在任务的run()方法中会真正调用客户端发送grpc请求发送一个ConfigChangeNotifyRequest请求对象 如果不是当前节点就发送grpc请求为其他节点同步修改配置项 NacosClient端的处理 接收到ConfigChangeNotifyRequest请求对象然后就放入了一个阻塞队列中。客户端while死循环队列中有任务了/每隔5s 从队列中获取任务/null去执行配置监听器方法根据CacheData对象远程获取配置内容进行md5的比较如果有变化就通知监听器去处理这就回到了nacosClient端获取配置中的流程了 我们接下来分析在NacosServer端修改了配置点击发布配置NacosClient怎么就能接收到是哪一个dataId修改了嘞 发布配置官方接口文档 这里实际上是调用的NacosServer的/nacos/v2/cs/config接口处理该请求的是ConfigController.publishConfig()方法 在这一次请求中其实就是做了两件事将更新写入数据库中然后发布一个事件将事件添加进队列中此时请求就结束了。 在controller方法中有两行核心的方法 // 进入service层核心方法 // 持久化配置信息到数据库 persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);// 触发ConfigDataChangeEvent事件这是客户端能感知配置更新的根本原因 ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));持久化配置信息到数据库就没必须继续看下去了我们接下来看看notifyConfigChange()方法的实现 该方法就是单纯的一层一层方法调用 public class ConfigChangePublisher {public static void notifyConfigChange(ConfigDataChangeEvent event) {if (PropertyUtil.isEmbeddedStorage() !EnvUtil.getStandaloneMode()) {return;}// 该方法继续调用NotifyCenter.publishEvent(event);} }public static boolean publishEvent(final Event event) {try {// 该方法继续调用return publishEvent(event.getClass(), event);} catch (Throwable ex) {LOGGER.error(There was an exception to the message publishing : , ex);return false;} }private static boolean publishEvent(final Class? extends Event eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}final String topic ClassUtils.getCanonicalName(eventType);EventPublisher publisher INSTANCE.publisherMap.get(topic);if (publisher ! null) {// 该方法继续调用return publisher.publish(event);}return false; }这里就会进入到DefaultPublisher类的publish(event)方法中。该类非常重要Nacos很多功能都用的这统一的一套事件发布与订阅。 public boolean publish(Event event) {checkIsStart();// 如果队列中写满了那么就返回false下面就直接处理了// 该类的run()方法中会死循环从队列中取任务执行boolean success this.queue.offer(event);if (!success) {LOGGER.warn(Unable to plug in due to interruption, synchronize sending time, event : {}, event);// 处理事件receiveEvent(event);return true;}return true; }此时本次http请求就已经结束了这里将事件放入队列中后就会有其他的订阅者来异步处理事件。 这样的设计也实现了发布任务与处理任务之间的解耦 此时队列中有了任务在NacosServer中任务订阅者此时还需要做两件事 通知集群其他Nacos节点进行更新通知NacosClient端配置发生了更改 public void notifySubscriber(final Subscriber subscriber, final Event event) {LOGGER.debug([NotifyCenter] the {} will received by {}, event, subscriber);// 订阅者需要去处理事件// 主要做两件事 通知集群其他Nacos节点进行更新、通知NacosClient端配置发生了更改final Runnable job () - subscriber.onEvent(event);final Executor executor subscriber.executor();if (executor ! null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error(Event callback exception: , e);}} }这里会进入到AsyncNotifyService的构造方法中 遍历集群环境下的所有节点创建任务添加进http/grpc的队列中从http/grpc的队列中取任务执行 public AsyncNotifyService(ServerMemberManager memberManager) {...// Register A Subscriber to subscribe ConfigDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {Overridepublic void onEvent(Event event) {// Generate ConfigDataChangeEvent concurrentlyif (event instanceof ConfigDataChangeEvent) {ConfigDataChangeEvent evt (ConfigDataChangeEvent) event;long dumpTs evt.lastModifiedTs;String dataId evt.dataId;String group evt.group;String tenant evt.tenant;String tag evt.tag;// 获取nacos集群下的各个节点CollectionMember ipList memberManager.allMembers();// In fact, any type of queue here can beQueueNotifySingleTask httpQueue new LinkedListNotifySingleTask();QueueNotifySingleRpcTask rpcQueue new LinkedListNotifySingleRpcTask();for (Member member : ipList) {// 使用http/rpc的方式通知各节点具体的dataId被修改了// 这里先添加进队列下面的if中处理if (!MemberUtil.isSupportedLongCon(member)) {httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),evt.isBeta));} else {rpcQueue.add(new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));}}// 处理队列中的任务if (!httpQueue.isEmpty()) {ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));}if (!rpcQueue.isEmpty()) {// 直接看AsyncRpcTask类中的run()方法ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));}}}......});}我们这里是nacos2.X的版本所以我这里就自己看AsyncRpcTask类的run()方法 调用dump()方法发送请求同步其他节点数据变化 public void run() {while (!queue.isEmpty()) {NotifySingleRpcTask task queue.poll();ConfigChangeClusterSyncRequest syncRequest new ConfigChangeClusterSyncRequest();syncRequest.setDataId(task.getDataId());syncRequest.setGroup(task.getGroup());syncRequest.setBeta(task.isBeta);syncRequest.setLastModified(task.getLastModified());syncRequest.setTag(task.tag);syncRequest.setTenant(task.getTenant());Member member task.member;// 判断member是不是当前节点if (memberManager.getSelf().equals(member)) {// 如果是当前节点就直接调用dump()方法// 这里又会经过服务注册那边当服务更改后为订阅者进行推送的流程中这里最终是会到DumpProcessor.process()方法if (syncRequest.isBeta()) {dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),syncRequest.getLastModified(), NetUtils.localIP(), true);} else {dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());}continue;}// 其他节点if (memberManager.hasMember(member.getAddress())) {boolean unHealthNeedDelay memberManager.isUnHealth(member.getAddress());if (unHealthNeedDelay) {...} else {if (!MemberUtil.isSupportedLongCon(member)) {asyncTaskExecute(new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,task.getLastModified(), member.getAddress(), task.isBeta));} else {try {// 为nacos集群中的其他节点进行同步配置变化configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));} catch (Exception e) {MetricsMonitor.getConfigNotifyException().increment();asyncTaskExecute(task);}}}} else {//No nothig if member has offline.}} }dump()这里又会经过服务注册那边当服务更改后为订阅者进行推送的流程中先将task存入一个队列中 -- 去队列中的任务 -- 各自的任务处理类去处理。这里最终是会到DumpProcessor.process()方法 方法调用process() -- configDump() —dump()将配置保存在磁盘文件中配置发生变化更新md5发布LocalDataChangeEvent事件 目的告诉NacosClient端配置发生了改变。处理该事件的RpcConfigChangeNotifier.onEvent() public boolean process(NacosTask task) {...// 直接看这里最后的configDump()方法return DumpConfigHandler.configDump(build.build()); }public static boolean configDump(ConfigDumpEvent event){......if (StringUtils.isBlank(event.getTag())) {......boolean result;if (!event.isRemove()) {// 核心方法进入到这里// 写入磁盘result ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type, encryptedDataKey);if (result) {ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,content.length());}} else {......}return result;} else {.......}public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,String type, String encryptedDataKey) {......try {// 根据content配置内容生成一个md5。content中的内容有变化那么生成的md5也肯定是不一样的final String md5 MD5Utils.md5Hex(content, Constants.ENCODE);// 配置的最后一次更新时间if (lastModifiedTs ConfigCacheService.getLastModifiedTs(groupKey)) {DUMP_LOG.warn(...);return true;}if (md5.equals(ConfigCacheService.getContentMd5(groupKey)) DiskUtil.targetFile(dataId, group, tenant).exists()) {DUMP_LOG.warn(...);} else if (!PropertyUtil.isDirectRead()) {// 将配置保存在磁盘文件中DiskUtil.saveToDisk(dataId, group, tenant, content);}// 配置发生变化更新md5// 继续跟入该方法updateMd5(groupKey, md5, lastModifiedTs, encryptedDataKey);return true;} catch (IOException ioe) {......return false;} finally {releaseWriteLock(groupKey);}}public static void updateMd5(String groupKey, String md5, long lastModifiedTs, String encryptedDataKey) {// 根据groupKey将md5数据保存在缓存中CacheItem cache makeSure(groupKey, encryptedDataKey, false);if (cache.md5 null || !cache.md5.equals(md5)) {cache.md5 md5;cache.lastModifiedTs lastModifiedTs;// 发布LocalDataChangeEvent事件包含groupKey// 目的告诉NacosClient端配置发生了改变。处理该事件的RpcConfigChangeNotifier.onEvent()NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));}}接下来看看RpcConfigChangeNotifier.onEvent()的方法逻辑 遍历各个客户端发送grpc请求 Override public void onEvent(LocalDataChangeEvent event) {...configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag);}public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,ListString betaIps, String tag) {// 其实就代表着Client集合SetString listeners configChangeListenContext.getListeners(groupKey);if (CollectionUtils.isEmpty(listeners)) {return;}int notifyClientCount 0;// 遍历各个客户端for (final String client : listeners) {Connection connection connectionManager.getConnection(client);if (connection null) {continue;}ConnectionMeta metaInfo connection.getMetaInfo();//一些检查校验String clientIp metaInfo.getClientIp();String clientTag metaInfo.getTag();if (isBeta betaIps ! null !betaIps.contains(clientIp)) {continue;}if (StringUtils.isNotBlank(tag) !tag.equals(clientTag)) {continue;}// 封装一个请求对象ConfigChangeNotifyRequestConfigChangeNotifyRequest notifyRequest ConfigChangeNotifyRequest.build(dataId, group, tenant);// 创建rpc推送任务在RpcPushTask.run()方法中推送客户端RpcPushTask rpcPushRetryTask new RpcPushTask(notifyRequest, 50, client, clientIp, metaInfo.getAppName());push(rpcPushRetryTask);notifyClientCount;}}public void run() {tryTimes;if (!tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) {push(this);} else {// 这里推送客户端客户端再进行refresh操作rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {Overridepublic void onSuccess() {tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp);}Overridepublic void onFail(Throwable e) {tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp);Loggers.REMOTE_PUSH.warn(Push fail, e);push(RpcPushTask.this);}}, ConfigExecutor.getClientConfigNotifierServiceExecutor());}}
http://www.dnsts.com.cn/news/263155.html

相关文章:

  • wordpress 图片存储宁波seo排名外包公司
  • 生产企业网站模板店面设计师岗位职责
  • 河北营销型网站方案企业工商注册信息查询系统官网
  • 优秀网页设计网站是docker wordpress v
  • 专门做牛肉的网站有什么做兼职的医疗网站
  • 花都移动网站建设如何保存个人网站
  • WordPress20w文章徐州关键词优化
  • 建设一个网站的过程wordpress pdf插件下载
  • 网站设计应该考虑的重要因素企业宣传片视频模板
  • 卫生网站建设方案新的网站的建设步骤
  • 常用网站图标中国科技成就作文
  • 单页网站模板wapzt16j门户网
  • 蓬莱建设局规划处网站百度快速排名用什
  • php仿博客园网站彬县网房屋出租
  • 网站设计_网站建设_手机网站建设珠海企业建站
  • 重庆自助企业建站模板网站建设类型报价表
  • 虚拟币网站开发制作怎么买wordpress
  • 网站没有流量怎么回事新浪网站制作
  • 唐山建设网站公司WordPress右侧导航菜单主题
  • 做网站怎么找客户联系方式前端开发入门视频教程
  • 网站空间管理地址企业为什么要增资
  • 静态网站结构如何更新宣传片制作标准
  • 网站建设几个要素上海企业网络营销推广服务
  • 南京网站设计公司兴田德润电话多少dedecms后台程序已经安装完了怎么把自己的网站加进去?
  • 一个网站同时做百度和360 百度商桥都可以接收客户信息吗怎么用ps做网站图片
  • 海口cms模板建站宝宝发烧反反复复什么原因导致的
  • 2017年做啥网站致富郑州关键词排名公司电话
  • 网站制作需要多少钱新闻微信网站开发多少钱
  • django 开放api 做网站手机网站 自适应屏幕
  • 全国物流网站有哪些平台怎么做游戏推广赚钱