高新区网站建设的建议,wordpress4.0.6 漏洞,定制网站前准备,网站竞价 英文Nacos客户端本地缓存及故障转移
在Nacos本地缓存的时候有的时候必然会出现一些故障#xff0c;这些故障就需要进行处理#xff0c;涉及到的核心类为ServiceInfoHolder和FailoverReactor。 本地缓存有两方面#xff0c;第一方面是从注册中心获得实例信息会缓存在内存当中这些故障就需要进行处理涉及到的核心类为ServiceInfoHolder和FailoverReactor。 本地缓存有两方面第一方面是从注册中心获得实例信息会缓存在内存当中也就是通过Map的形式承载这样查询操作都方便。第二方面便是通过磁盘文件的形式定时缓存起来以备不时之需。
故障转移也分两方面第一方面是故障转移的开关是通过文件来标记的第二方面是当开启故障转移之后当发生故障时可以从故障转移备份的文件中来获得服务实例信息。
ServiceInfoHolder功能概述
ServiceInfoHolder类顾名思义服务信息的持有者。每次客户端从注册中心获取新的服务信息时都会调用该类其中processServiceInfo方法来进行本地化处理包括更新缓存服务、发布事件、更新本地文件等。 除了这些核心功能以外该类在实例化的时候还做了本地缓存目录初始化、故障转移初始化等操作下面我们来分析。
ServiceInfo的本地内存缓存
ServiceInfo注册服务的信息其中包含了服务名称、分组名称、集群信息、实例列表信息上次更新时间等所以我们由此得出客户端从服务端注册中心获得到的信息在本地都以ServiceInfo作为承载者。 而ServiceInfoHolder类又持有了ServiceInfo通过一个ConcurrentMap来储存.
// ServiceInfoHolder
private final ConcurrentMapString, ServiceInfo serviceInfoMap;这就是Nacos客户端对服务端获取到的注册信息的第一层缓存并且之前的课程中我们分析processServiceInfo方法时我们已经看到当服务信息变更时会第一时间更新ServiceInfoMap中的信息.
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {....//缓存服务信息serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 判断注册的实例信息是否更改boolean changed isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}....return serviceInfo;
}serviceInfoMap的使用就是这样当变动实例向其中put最新数据即可。当使用实例时根据key进行get操作即可。
serviceInfoMap在ServiceInfoHolder的构造方法中进行初始化默认创建一个空的ConcurrentMap。但当配置了启动时从缓存文件读取信息时则会从本地缓存进行加载。
public ServiceInfoHolder(String namespace, Properties properties) {initCacheDir(namespace, properties);// 启动时是否从缓存目录读取信息默认false。if (isLoadCacheAtStart(properties)) {this.serviceInfoMap new ConcurrentHashMapString, ServiceInfo(DiskCache.read(this.cacheDir));} else {this.serviceInfoMap new ConcurrentHashMapString, ServiceInfo(16);}this.failoverReactor new FailoverReactor(this, cacheDir);this.pushEmptyProtection isPushEmptyProtect(properties);
}这里我们要注意一下涉及到了本地缓存目录在我们上节课的学习中我们知道processServiceInfo方法中当服务实例变更时会看到通过DiskCache#write方法向该目录写入ServiceInfo信息。
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {.....// 服务实例已变更if (changed) {NAMING_LOGGER.info(current ips:({}) service: {} - {}, serviceInfo.ipCount(), serviceInfo.getKey(),JacksonUtils.toJson(serviceInfo.getHosts()));// 添加实例变更事件InstancesChangeEvent订阅者NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));// 记录Service本地文件DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;
}本地缓存目录
本地缓存目录cacheDir是ServiceInfoHolder的一个属性用于指定本地缓存的根目录和故障转移的根目录。 在ServiceInfoHolder的构造方法中初始化并且生成缓存目录 这个initCacheDir就不用了细看了就是生成缓存目录的操作默认路径${user.home}/nacos/naming/public也可以自定义通过System.setProperty(“JM.SNAPSHOT.PATH”)自定义. 这里初始化完目录之后故障转移信息也存储在该目录下。
private void initCacheDir(String namespace, Properties properties) {String jmSnapshotPath System.getProperty(JM_SNAPSHOT_PATH_PROPERTY);String namingCacheRegistryDir ;if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) ! null) {namingCacheRegistryDir File.separator properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);}if (!StringUtils.isBlank(jmSnapshotPath)) {cacheDir jmSnapshotPath File.separator FILE_PATH_NACOS namingCacheRegistryDir File.separator FILE_PATH_NAMING File.separator namespace;} else {cacheDir System.getProperty(USER_HOME_PROPERTY) File.separator FILE_PATH_NACOS namingCacheRegistryDir File.separator FILE_PATH_NAMING File.separator namespace;}
}故障转移
在ServiceInfoHolder的构造方法中还会初始化一个FailoverReactor类同样是ServiceInfoHolder的成员变量。FailoverReactor的作用便是用来处理故障转移的。
public ServiceInfoHolder(String namespace, Properties properties) {....// this为ServiceHolder当前对象这里可以立即为两者相互持有对方的引用this.failoverReactor new FailoverReactor(this, cacheDir);.....
}我们来看一下FailoverReactor的构造方法FailoverReactor的构造方法基本上把它的功能都展示出来了
1. 持有ServiceInfoHolder的引用
2. 拼接故障目录${user.home}/nacos/naming/public/failover其中public也有可能是其他的自定义命名空间
3. 初始化executorService执行者服务
4. init方法通过executorService开启多个定时任务执行public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {// 持有ServiceInfoHolder的引用this.serviceInfoHolder serviceInfoHolder;// 拼接故障目录${user.home}/nacos/naming/public/failoverthis.failoverDir cacheDir FAILOVER_DIR;// 初始化executorServicethis.executorService new ScheduledThreadPoolExecutor(1, new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r);// 守护线程模式运行thread.setDaemon(true);thread.setName(com.alibaba.nacos.naming.failover);return thread;}});// 其他初始化操作通过executorService开启多个定时任务执行this.init();
}init方法执行
在这个方法中开启了三个定时任务这三个任务其实都是FailoverReactor的内部类
1. 初始化立即执行执行间隔5秒执行任务SwitchRefresher
2. 初始化延迟30分钟执行执行间隔24小时执行任务DiskFileWriter
3. 初始化立即执行执行间隔10秒执行核心操作为DiskFileWriterpublic void init() {// 初始化立即执行执行间隔5秒执行任务SwitchRefresherexecutorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);// 初始化延迟30分钟执行执行间隔24小时执行任务DiskFileWriterexecutorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);// backup file on startup if failover directory is empty.// 如果故障目录为空启动时立即执行立即备份文件// 初始化立即执行执行间隔10秒执行核心操作为DiskFileWriterexecutorService.schedule(new Runnable() {Overridepublic void run() {try {File cacheDir new File(failoverDir);if (!cacheDir.exists() !cacheDir.mkdirs()) {throw new IllegalStateException(failed to create cache dir: failoverDir);}File[] files cacheDir.listFiles();if (files null || files.length 0) {new DiskFileWriter().run();}} catch (Throwable e) {NAMING_LOGGER.error([NA] failed to backup file on startup., e);}}}, 10000L, TimeUnit.MILLISECONDS);
}这里我们先看DiskFileWriter这里的逻辑不难就是获取ServiceInfo中缓存的ServiceInfo判断是否满足写入磁盘如果条件满足就将其写入拼接的故障目录因为后两个定时任务执行的都是DiskFileWriter但是第三个定时任务是有前置判断的只要文件不存在就会立即执行把文件写入到本地磁盘中。
class DiskFileWriter extends TimerTask {Overridepublic void run() {MapString, ServiceInfo map serviceInfoHolder.getServiceInfoMap();for (Map.EntryString, ServiceInfo entry : map.entrySet()) {ServiceInfo serviceInfo entry.getValue();if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils.equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils.equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils.equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils.equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {continue;}// 将缓存写入磁盘DiskCache.write(serviceInfo, failoverDir);}}
}接下来我们再来看第一个定时任务SwitchRefresher的核心实现具体逻辑如下
1. 如果故障转移文件不存在则直接返回文件开关
2. 比较文件修改时间如果已经修改则获取故障转移文件中的内容。
3. 故障转移文件中存储了0和1标识。0表示关闭1表示开启。
4. 当为开启状态时执行线程FailoverFileReader。class SwitchRefresher implements Runnable {long lastModifiedMillis 0L;Overridepublic void run() {try {File switchFile new File(failoverDir UtilAndComs.FAILOVER_SWITCH);// 文件不存在则退出if (!switchFile.exists()) {switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());NAMING_LOGGER.debug(failover switch is not found, {}, switchFile.getName());return;}long modified switchFile.lastModified();if (lastModifiedMillis modified) {lastModifiedMillis modified;// 获取故障转移文件内容String failover ConcurrentDiskUtil.getFileContent(failoverDir UtilAndComs.FAILOVER_SWITCH,Charset.defaultCharset().toString());if (!StringUtils.isEmpty(failover)) {String[] lines failover.split(DiskCache.getLineSeparator());for (String line : lines) {String line1 line.trim();// 1 表示开启故障转移模式if (IS_FAILOVER_MODE.equals(line1)) {switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());NAMING_LOGGER.info(failover-mode is on);new FailoverFileReader().run();// 0 表示关闭故障转移模式} else if (NO_FAILOVER_MODE.equals(line1)) {switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());NAMING_LOGGER.info(failover-mode is off);}}} else {switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());}}} catch (Throwable e) {NAMING_LOGGER.error([NA] failed to read failover switch., e);}}
}FailoverFileReader
顾名思义故障转移文件读取基本操作就是读取failover目录存储的备份服务信息文件内容然后转换成ServiceInfo并且将所有的ServiceInfo储存在FailoverReactor的ServiceMap属性中。
流程如下
1. 读取failover目录下的所有文件进行遍历处理
2. 如果文件不存在跳过
3. 如果文件是故障转移开关标志文件跳过
4. 读取文件中的备份内容转换为ServiceInfo对象
5. 将ServiceInfo对象放入到domMap中
6. 最后判断domMap不为空赋值给serviceMapclass FailoverFileReader implements Runnable {Overridepublic void run() {MapString, ServiceInfo domMap new HashMapString, ServiceInfo(16);BufferedReader reader null;try {File cacheDir new File(failoverDir);if (!cacheDir.exists() !cacheDir.mkdirs()) {throw new IllegalStateException(failed to create cache dir: failoverDir);}File[] files cacheDir.listFiles();if (files null) {return;}for (File file : files) {if (!file.isFile()) {continue;}// 如果是故障转移标志文件则跳过if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {continue;}ServiceInfo dom new ServiceInfo(file.getName());try {String dataString ConcurrentDiskUtil.getFileContent(file, Charset.defaultCharset().toString());reader new BufferedReader(new StringReader(dataString));String json;if ((json reader.readLine()) ! null) {try {dom JacksonUtils.toObj(json, ServiceInfo.class);} catch (Exception e) {NAMING_LOGGER.error([NA] error while parsing cached dom : {}, json, e);}}} catch (Exception e) {NAMING_LOGGER.error([NA] failed to read cache for dom: {}, file.getName(), e);} finally {try {if (reader ! null) {reader.close();}} catch (Exception e) {//ignore}}if (!CollectionUtils.isEmpty(dom.getHosts())) {domMap.put(dom.getKey(), dom);}}} catch (Exception e) {NAMING_LOGGER.error([NA] failed to read cache file, e);}// 读入缓存if (domMap.size() 0) {serviceMap domMap;}}
}但是这里还有一个问题就是serviceMap是哪里用到的这个其实是我们之前读取实例时候用到的getServiceInfo方法。
其实这里就是一旦开启故障转移就会先调用failoverReactor.getService方法此方法便是从serviceMap中获取ServiceInfo
public ServiceInfo getService(String key) {ServiceInfo serviceInfo serviceMap.get(key);if (serviceInfo null) {serviceInfo new ServiceInfo();serviceInfo.setName(key);}return serviceInfo;
}调用serviceMap方法getServiceInfo方法就在ServiceInfoHolder中
// ServiceInfoHolder
public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {NAMING_LOGGER.debug(failover-mode: {}, failoverReactor.isFailoverSwitch());String groupedServiceName NamingUtils.getGroupedName(serviceName, groupName);String key ServiceInfo.getKey(groupedServiceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}return serviceInfoMap.get(key);
}