wordpress做物流网站,seo研究所,重庆工程招投标交易信息网,客户管理系统网站模板下载前言
在 Spring Cloud 2020 版本以后#xff0c;移除了对 Netflix 的依赖#xff0c;也就移除了负载均衡器 Ribbon#xff0c;Spring Cloud 官方推荐使用 Loadbalancer 替换 Ribbon#xff0c;而在 LoadBalancer 之前 Spring Cloud 一直使用的是 Ribbon 来做负载[均衡器的…前言
在 Spring Cloud 2020 版本以后移除了对 Netflix 的依赖也就移除了负载均衡器 RibbonSpring Cloud 官方推荐使用 Loadbalancer 替换 Ribbon而在 LoadBalancer 之前 Spring Cloud 一直使用的是 Ribbon 来做负载[均衡器的而且 Ribbon 的负载均衡策略也比 Loadbalancer 更为丰富本篇分享一些关于 Ribbon 相关的源码。
Ribbon 的自动配置
老规矩我们在分析 Ribbon 的源码之前还是去看一下 spring-cloud-netflix-ribbon 的 META-INF 包下的 spring.factories 文件中的内容内容如下
org.springframework.boot.autoconfigure.EnableAutoConfiguration\
org.springframework.cloud.netflix.ribbon.RibbonAutoConfigurationRibbon 的 META-INF 包下的 spring.factories 文件中的内容非常简单只有一个 RibbonAutoConfiguration 类我们来看下这个类都有什么逻辑。
RibbonAutoConfiguration 类源码分析
RibbonAutoConfiguration 是 Ribbon 的全局配置主要是加载 Ribbon 客户端工厂、负载均衡客户端、配置类工厂、重试机制工厂等RibbonAutoConfiguration 类在启动的时候就会被加载。
//标记为配置类
Configuration
//注入的条件
Conditional({RibbonAutoConfiguration.RibbonClassesConditions.class})
//为所有 RibbonClients 提供默认客户端配置
RibbonClients
//加载配置类后加载 EurekaClientAutoConfiguration
AutoConfigureAfter(name {org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration}
)
//加载配置类之前加载 LoadBalancerAutoConfiguration
AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
//使用 RibbonEagerLoadProperties ServerIntrospectorProperties
EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class})
public class RibbonAutoConfiguration {//注入 configurationsAutowired(required false)private ListRibbonClientSpecification configurations new ArrayList();//Ribbon 的饥饿加载模式配置类 Ribbon 在进行客户端负载均衡的时候并不是在服务启动时候就初始化好的 而是在调用的时候才会去创建相应的 client 开启饥饿模式可以提前加载 clientAutowiredprivate RibbonEagerLoadProperties ribbonEagerLoadProperties;public RibbonAutoConfiguration() {}Beanpublic HasFeatures ribbonFeature() {return HasFeatures.namedFeature(Ribbon, Ribbon.class);}//创建 Ribbon 客户端负载均衡器的工厂Beanpublic SpringClientFactory springClientFactory() {SpringClientFactory factory new SpringClientFactory();factory.setConfigurations(this.configurations);return factory;}//Ribbon 负载均衡器客户端 可以查询所有服务实例BeanConditionalOnMissingBean({LoadBalancerClient.class})public LoadBalancerClient loadBalancerClient() {return new RibbonLoadBalancerClient(this.springClientFactory());}//Ribbon 重试工厂BeanConditionalOnClass(name {org.springframework.retry.support.RetryTemplate})ConditionalOnMissingBeanpublic LoadBalancedRetryFactory loadBalancedRetryPolicyFactory(final SpringClientFactory clientFactory) {return new RibbonLoadBalancedRetryFactory(clientFactory);}//获取配置的BeanConditionalOnMissingBeanpublic PropertiesFactory propertiesFactory() {return new PropertiesFactory();}//如果配置了饥饿加载 就初始化 RibbonApplicationContextInitializerBeanConditionalOnProperty({ribbon.eager-load.enabled})public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {return new RibbonApplicationContextInitializer(this.springClientFactory(), this.ribbonEagerLoadProperties.getClients());}static class RibbonClassesConditions extends AllNestedConditions {RibbonClassesConditions() {super(ConfigurationPhase.PARSE_CONFIGURATION);}ConditionalOnClass({Ribbon.class})static class RibbonPresent {RibbonPresent() {}}ConditionalOnClass({AsyncRestTemplate.class})static class AsyncRestTemplatePresent {AsyncRestTemplatePresent() {}}ConditionalOnClass({RestTemplate.class})static class RestTemplatePresent {RestTemplatePresent() {}}ConditionalOnClass({IClient.class})static class IClientPresent {IClientPresent() {}}}private static class OnRibbonRestClientCondition extends AnyNestedCondition {OnRibbonRestClientCondition() {super(ConfigurationPhase.REGISTER_BEAN);}ConditionalOnProperty({ribbon.restclient.enabled})static class RibbonProperty {RibbonProperty() {}}/** deprecated */DeprecatedConditionalOnProperty({ribbon.http.client.enabled})static class ZuulProperty {ZuulProperty() {}}}Target({ElementType.TYPE, ElementType.METHOD})Retention(RetentionPolicy.RUNTIME)DocumentedConditional({RibbonAutoConfiguration.OnRibbonRestClientCondition.class})interface ConditionalOnRibbonRestClient {}Configuration(proxyBeanMethods false)ConditionalOnClass({HttpRequest.class})RibbonAutoConfiguration.ConditionalOnRibbonRestClientprotected static class RibbonClientHttpRequestFactoryConfiguration {Autowiredprivate SpringClientFactory springClientFactory;protected RibbonClientHttpRequestFactoryConfiguration() {}Beanpublic RestTemplateCustomizer restTemplateCustomizer(final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {return (restTemplate) - {restTemplate.setRequestFactory(ribbonClientHttpRequestFactory);};}Beanpublic RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() {return new RibbonClientHttpRequestFactory(this.springClientFactory);}}
}
RibbonClientConfiguration 类源码分析
RibbonClientConfiguration 类在加载机制是在第一次执行 Feign 请求时候才会被加载RibbonClientConfiguration 加载时候会注入 Ribbon 客户端配置类、负载均衡算法、服务列表、负载均衡器、服务列表过滤器、负载均衡器上下文、重试处理器、服务拦截器、预处理器等。
Configuration(proxyBeanMethods false
)
EnableConfigurationProperties
Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class})
public class RibbonClientConfiguration {//默认连接超时时间 1 秒public static final int DEFAULT_CONNECT_TIMEOUT 1000;//默认读取超时时间 1 秒public static final int DEFAULT_READ_TIMEOUT 1000;//默认的 GZIP 负载public static final boolean DEFAULT_GZIP_PAYLOAD true;//Ribbon 客户端名称RibbonClientNameprivate String name client;//Ribbon 配置工厂Autowiredprivate PropertiesFactory propertiesFactory;public RibbonClientConfiguration() {}//Ribbon 客户端配置BeanConditionalOnMissingBeanpublic IClientConfig ribbonClientConfig() {//默认的配置类DefaultClientConfigImpl config new DefaultClientConfigImpl();//加载配置config.loadProperties(this.name);//设置超时时间config.set(CommonClientConfigKey.ConnectTimeout, 1000);//设置读取超时时间config.set(CommonClientConfigKey.ReadTimeout, 1000);//Gzip 负载config.set(CommonClientConfigKey.GZipPayload, true);return config;}//负载均衡算法BeanConditionalOnMissingBeanpublic IRule ribbonRule(IClientConfig config) {if (this.propertiesFactory.isSet(IRule.class, this.name)) {return (IRule)this.propertiesFactory.get(IRule.class, config, this.name);} else {//默认负载均衡算法(先过滤再轮训的负载均衡算法)ZoneAvoidanceRule rule new ZoneAvoidanceRule();rule.initWithNiwsConfig(config);return rule;}}//检测服务健康状态BeanConditionalOnMissingBeanpublic IPing ribbonPing(IClientConfig config) {return (IPing)(this.propertiesFactory.isSet(IPing.class, this.name) ? (IPing)this.propertiesFactory.get(IPing.class, config, this.name) : new DummyPing());}//服务列表BeanConditionalOnMissingBeanpublic ServerListServer ribbonServerList(IClientConfig config) {if (this.propertiesFactory.isSet(ServerList.class, this.name)) {return (ServerList)this.propertiesFactory.get(ServerList.class, config, this.name);} else {ConfigurationBasedServerList serverList new ConfigurationBasedServerList();serverList.initWithNiwsConfig(config);return serverList;}}//服务更新列表BeanConditionalOnMissingBeanpublic ServerListUpdater ribbonServerListUpdater(IClientConfig config) {return new PollingServerListUpdater(config);}//负载均衡器 BeanConditionalOnMissingBeanpublic ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerListServer serverList, ServerListFilterServer serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));}//Ribbon 服务列表过滤器BeanConditionalOnMissingBeanpublic ServerListFilterServer ribbonServerListFilter(IClientConfig config) {if (this.propertiesFactory.isSet(ServerListFilter.class, this.name)) {return (ServerListFilter)this.propertiesFactory.get(ServerListFilter.class, config, this.name);} else {ZonePreferenceServerListFilter filter new ZonePreferenceServerListFilter();filter.initWithNiwsConfig(config);return filter;}}//Ribbon 负载均衡器上下文BeanConditionalOnMissingBeanpublic RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer, IClientConfig config, RetryHandler retryHandler) {return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);}//重试处理器BeanConditionalOnMissingBeanpublic RetryHandler retryHandler(IClientConfig config) {return new DefaultLoadBalancerRetryHandler(config);}//服务拦截器BeanConditionalOnMissingBeanpublic ServerIntrospector serverIntrospector() {return new DefaultServerIntrospector();}//预处理器PostConstructpublic void preprocess() {RibbonUtils.setRibbonProperty(this.name, CommonClientConfigKey.DeploymentContextBasedVipAddresses.key(), this.name);}static class OverrideRestClient extends RestClient {private IClientConfig config;private ServerIntrospector serverIntrospector;protected OverrideRestClient(IClientConfig config, ServerIntrospector serverIntrospector) {this.config config;this.serverIntrospector serverIntrospector;this.initWithNiwsConfig(this.config);}public URI reconstructURIWithServer(Server server, URI original) {URI uri RibbonUtils.updateToSecureConnectionIfNeeded(original, this.config, this.serverIntrospector, server);return super.reconstructURIWithServer(server, uri);}protected Client apacheHttpClientSpecificInitialization() {ApacheHttpClient4 apache (ApacheHttpClient4)super.apacheHttpClientSpecificInitialization();apache.getClientHandler().getHttpClient().getParams().setParameter(http.protocol.cookie-policy, ignoreCookies);return apache;}}
}
Ribbion 负载均衡算法分析
Ribbon 作为一个负载均衡器其具备丰富的负载均衡策略这也是其相比 Spring Cloud LoadBalancer 的优势LoadBalancer 只提供了随机和轮训两种负载均衡算法下面我们来分析一下 Ribbon 复杂均衡算法IRule 是 Ribbon 负载均衡算法的接口。
IRule 接口源码分析
IRule 是 Ribbon 负载均衡算法的接口接口定义了三个方法如下
choose(Object var1)根据指定的算法中从服务列表中选取一个要访问的可用服务。setLoadBalancer(ILoadBalancer var1)设置负载均衡器。getLoadBalancer()获取负载均衡器。
package com.netflix.loadbalancer;public interface IRule {//根据指定的算法中从服务列表中选取一个要访问的可用服务Server choose(Object var1);//设置负载均衡器void setLoadBalancer(ILoadBalancer var1);//获取负载均衡器ILoadBalancer getLoadBalancer();
}
IRule 的实现类如下
RandomRule随机算法统计服务个数使用服务个数生成一个随机数下标通过生成的随机数返回一个可用的服务。RoundRobinRule轮训算法依次执行。WeightedResponseTimeRule响应时间权重轮询算法继承自RoundRobinRule是对轮询算法进行的扩展加入了权重计算。RetryRule重试算法实际调用的也是轮询算法只是在没有获取到服务时会进行循环重试操作超过设置的时间限制则会退出默认为 500ms。BestAvailableRule最小并发算法会遍历所有的服务提供者选择并发量最小的那个服务。AvailabilityFilteringRule可用性断言过滤器算法。ZoneAvoidanceRule 先过滤后轮训的算法也是默认的负载均衡算法。
RandomRule 随机算法源码分析
RandomRule#choose 是 Ribbom 负载均衡随机算法的实现主要逻辑如下
ILoadBalancer 为空判断如果 ILoadBalancer 为空则直接返回服务为 null。ILoadBalancer 不为空且 Server 也不为空则直接返回 Server 。ILoadBalancer 不为空且 Server 为空获取所有服务列表使用服务列表数量获取一个随机数使用这个随机数在可用服务列表中获取服务。如果上一步获取的服务为空则当前线程让出 CPU 资源再次重新循环获取出现这种情况可能是服务出现瞬时状态如果获取的服务不为空则判断服务是否还存活着服务存活直接返回服务负责则会让出 CPU 资源再次重新循环获取服务。
//com.netflix.loadbalancer.RandomRule#choose(com.netflix.loadbalancer.ILoadBalancer, java.lang.Object)
public Server choose(ILoadBalancer lb, Object key) {//负载均衡器为空判断if (lb null) {//负载均衡器为空 则直接返回空return null;} else {//服务Server server null;//开启while 循环while(server null) {//判断当前线程是否被中断if (Thread.interrupted()) {//如果当前线程被中断 直接返回 nullreturn null;}//获取所有可以访问的服务实例列表ListServer upList lb.getReachableServers();//获取所有的服务实例列表ListServer allList lb.getAllServers();int serverCount allList.size();//判断所有服务列表的个数是否为0if (serverCount 0) {//所有服务列表的个数为 0 返回 nullreturn null;}//使用所有服务列表来获取一个随机数int index this.chooseRandomInt(serverCount);//在可用服务列表中 获取随机数下标的服务server (Server)upList.get(index);//判断服务服务是否为 nulif (server null) {//如果服务为空 则当前线程让出 CPU 资源 再次重新循环获取 出现这种情况可能是服务出现瞬时状态Thread.yield();} else {//判断服务是否还存活着if (server.isAlive()) {//服务存活 直接返回服务return server;}//有服务 服务又没有存活 则会让出 CPU 资源 再继续循环获取server null;Thread.yield();}}//server 不为空 直接返回return server;}
}
RoundRobinRule 轮训算法源码分析
RoundRobinRule#choose 方法是 Ribbon 负载均衡轮训算法的实现具体逻辑如下
ILoadBalancer 为空判断如果 ILoadBalancer 为空则直接返回服务为 null。ILoadBalancer 不为空执行 while 循环获取 Server。判断 Server 是否为空且 count 是否小于10满足条件则判断所有服务数量和所有可用服务数量是否都不等于0如果都不等于0则使用轮训算法获取服务对服务为空及可用性判断后返回否则直接返回 null。轮训算法的核心就是 next (current 1) % modulo;也就是当前服务的请求总数1然后和服务总数取模得到下标也就是需要返回的服务对象。
//com.netflix.loadbalancer.RoundRobinRule#choose(com.netflix.loadbalancer.ILoadBalancer, java.lang.Object)
public Server choose(ILoadBalancer lb, Object key) {//判断负载均衡器if (lb null) {log.warn(no load balancer);//负载均衡器为空 返回 nullreturn null;} else {//负载均衡器不为空Server server null;//计数int count 0;//while 循环while(true) {//服务是否为空 count是否小于10if (server null count 10) {//满足条件 获取所有的可用服务列表ListServer reachableServers lb.getReachableServers();//获取所有的服务列表ListServer allServers lb.getAllServers();//可用服务列表数量int upCount reachableServers.size();//所有服务列表数量int serverCount allServers.size();//可用服务列表和所有服务列表是否都不等于0if (upCount ! 0 serverCount ! 0) {//获取服务索引 每次都会 int nextServerIndex this.incrementAndGetModulo(serverCount);//从所用服务列表中根据索引获取服务server (Server)allServers.get(nextServerIndex);//服务为空判断if (server null) {//服务为空 然后 cpu 资源 等待下一次循环Thread.yield();} else {//服务不为空 进行服务存活判断 和服务是否准备就绪if (server.isAlive() server.isReadyToServe()) {//满足条件 返回服务return server;}//否则给服务复制为 nullserver null;}//跳出循环continue;}log.warn(No up servers available from load balancer: lb);return null;}//跳出while 循环的条件if (count 10) {log.warn(No available alive servers after 10 tries from load balancer: lb);}return server;}}
}//com.netflix.loadbalancer.RoundRobinRule#incrementAndGetModulo
private int incrementAndGetModulo(int modulo) {//当前Server请求总数int current;//下一席int next;do {current this.nextServerCyclicCounter.get();//modulo 服务总数next (current 1) % modulo;} while(!this.nextServerCyclicCounter.compareAndSet(current, next));return next;
}
WeightedResponseTimeRule 类源码分析
WeightedResponseTimeRule 是 Ribbon 负载均衡响应时间权重的实现该类继承了 RoundRobinRule对轮询算法进行了扩展加入了权重计算可以为每个服务分配动态权重然后然后加权循环权重高的则会优先执行我们先来看看该来的构造方法。
//com.netflix.loadbalancer.ResponseTimeWeightedRule#ResponseTimeWeightedRule()
public ResponseTimeWeightedRule() {
}//com.netflix.loadbalancer.ResponseTimeWeightedRule#ResponseTimeWeightedRule()
public ResponseTimeWeightedRule(ILoadBalancer lb) {//调用了父类的构造方法 也就是 RoundRobinRule#RoundRobinRulesuper(lb);
}
WeightedResponseTimeRule 提供了一个无参构造方法和一个有参构造方法负载均衡需要使用 ILoadBalancer无参构造方法我们就不深究了我们重点看一下有参构造方法有参构造方法中调用了 super(lb)也就是调用了父类 RoundRobinRule 的构造方法我们来看一下 RoundRobinRule 的构造方法。 //com.netflix.loadbalancer.RoundRobinRule#RoundRobinRule(com.netflix.loadbalancer.ILoadBalancer)
public RoundRobinRule(ILoadBalancer lb) {this();//调用了 WeightedResponseTimeRule#setLoadBalancer 方法this.setLoadBalancer(lb);
}
RoundRobinRule#RoundRobinRule 方法中又调用了 this.setLoadBalancer(lb) 方法也就是调用了 WeightedResponseTimeRule#setLoadBalancer 方法我们接着往下看。
//com.netflix.loadbalancer.WeightedResponseTimeRule#setLoadBalancer
public void setLoadBalancer(ILoadBalancer lb) {//还是先调用了父类的 setLoadBalancer 方法 com.netflix.loadbalancer.AbstractLoadBalancerRule#setLoadBalancersuper.setLoadBalancer(lb);//判断 负载均衡器的类型 是否是 BaseLoadBalancerif (lb instanceof BaseLoadBalancer) {//是 获取负载均衡器的 namethis.name ((BaseLoadBalancer)lb).getName();}//调用 ResponseTimeWeightedRule#initialize 方法this.initialize(lb);
}
WeightedResponseTimeRule#setLoadBalancer 方法中先调用了 父类的 AbstractLoadBalancerRule#setLoadBalancer 方法RoundRobinRule 类继承了 AbstractLoadBalancerRule接着判断了 ILoadBalancer 的类型最后调用了 ResponseTimeWeightedRule#initialize 方法我们接着看。
ResponseTimeWeightedRule#initialize 方法源码分析
ResponseTimeWeightedRule#initialize 方法首先会判断服务权重计时器是否为空不为空则清除服务权重计时器然后创建服务权重计时器并立刻开启定时任务执行权重计算默认每30秒执行一次开启任务后也会先执行一次权重计算最后会在 JVM 关闭时候把权重计算清除。
//com.netflix.loadbalancer.ResponseTimeWeightedRule#initialize
void initialize(ILoadBalancer lb) {//判断服务权重计时器是否为空if (this.serverWeightTimer ! null) {//服务权重计数器不为空 则清除服务权重计时器this.serverWeightTimer.cancel();}//创建服务权重计时器this.serverWeightTimer new Timer(NFLoadBalancer-serverWeightTimer- this.name, true);//执行定时任务 0秒后执行任务 定时执行 间隔时间默认30秒 private int serverWeightTaskTimerInterval 30000;this.serverWeightTimer.schedule(new ResponseTimeWeightedRule.DynamicServerWeightTask(), 0L, (long)this.serverWeightTaskTimerInterval);//创建服务权重ResponseTimeWeightedRule.ServerWeight sw new ResponseTimeWeightedRule.ServerWeight();//计算权重sw.maintainWeights();//jvm关闭时 通知定时任务 权重清空Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {public void run() {ResponseTimeWeightedRule.logger.info(Stopping NFLoadBalancer-serverWeightTimer-{}, ResponseTimeWeightedRule.this.name);ResponseTimeWeightedRule.this.serverWeightTimer.cancel();}}));
}
DynamicServerWeightTask 源码分析
DynamicServerWeightTask 是执行服务权重分配的任务类该类的主要逻辑在于调用了 ServerWeight#maintainWeights 方法我们重点关注该方法即可。
class DynamicServerWeightTask extends TimerTask {DynamicServerWeightTask() {}//定时任务的 run 方法public void run() {//创建一个服务权重ResponseTimeWeightedRule.ServerWeight serverWeight ResponseTimeWeightedRule.this.new ServerWeight();try {//调用权重计算方法 ServerWeight#maintainWeightsserverWeight.maintainWeights();} catch (Exception var3) {ResponseTimeWeightedRule.logger.error(Error running DynamicServerWeightTask for {}, ResponseTimeWeightedRule.this.name, var3);}}
}
ServerWeight#maintainWeights 方法源码分析
ServerWeight#maintainWeights 方法主要是用来维持服务权重的主要逻辑如下
获取负载均衡器负载均衡器为空判断如果负载均衡器为空不予处理。使用 CAS 设置当前服务是否正在进行权重分配中设置成功继续执行业务逻辑否则不予处理。对负载均衡器统计数据为空判断不为空继续执行业务逻辑为空则不予处理。循环遍历所有服务得到总响应时间总响应时间等于每个服务的平均响应时间求和。循环服务列表设置服务的权重服务权重总响应时间-服务的平均响应时间这样服务的响应时间越长其服务权重则越低。
public void maintainWeights() {//获取负载均衡器ILoadBalancer lb ResponseTimeWeightedRule.this.getLoadBalancer();//负载均衡器为空判断if (lb ! null) {//负载均衡器不为空//使用 CAS 设置当前服务正在进行权重分配中if (ResponseTimeWeightedRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) {try {ResponseTimeWeightedRule.logger.info(Weight adjusting job started);//强转之后获取负载均衡器的统计数据AbstractLoadBalancer nlb (AbstractLoadBalancer)lb;LoadBalancerStats stats nlb.getLoadBalancerStats();//负载均衡器的统计数据为空判断if (stats ! null) {//不为空 初始化响应时间double totalResponseTime 0.0D;//服务统计数据ServerStats ss;//迭代遍历所有服务 总响应时间 totalResponseTime 等于每个服务的平均响应时间求和 totalResponseTime ss.getResponseTimeAvg()for(Iterator var6 nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime ss.getResponseTimeAvg()) {//得到服务Server server (Server)var6.next();//获取单个服务统计信息ss stats.getSingleServerStat(server);}//当前权重Double weightSoFar 0.0D;//权重列表ListDouble finalWeights new ArrayList();//获取所有服务Iterator var20 nlb.getAllServers().iterator();//迭代遍历所有服务while(var20.hasNext()) {//获取服务Server serverx (Server)var20.next();//获取服务的统计信息ServerStats ssx stats.getSingleServerStat(serverx);//服务权重总响应时间-服务的平均响应时间 这样服务的响应时间越长 其服务权重则越低double weight totalResponseTime - ssx.getResponseTimeAvg();weightSoFar weightSoFar weight;//设置权重finalWeights.add(weightSoFar);}//设置权重列表ResponseTimeWeightedRule.this.setWeights(finalWeights);return;}} catch (Exception var16) {ResponseTimeWeightedRule.logger.error(Error calculating server weights, var16);return;} finally {ResponseTimeWeightedRule.this.serverWeightAssignmentInProgress.set(false);}}}
}
WeightedResponseTimeRule#choose 方法源码分析
WeightedResponseTimeRule#choose 方法是响应时间权重轮训负载均衡规则的实现具体逻辑如下
ILoadBalancer 为空判断如果 ILoadBalancer 为空则直接返回服务为 null。ILoadBalancer 不为空执行 while 循环获取 Server。判断当前线程是否被中断如果被中断则直接返回 null。获取所有的服务列表如果服务为空则直接返回 null。找出最大的权重。判断是否有进行权重初始化如果没有进行权重初始化则调用父类 RoundRobinRule 的随机算法得到 server如果进行了权重初始化则使用最大权重生成一个大于或等于 0.0 且小于 1.0 的随机浮点数也就是权重然后遍历权重列表从权重列表中找到大于计算出来的权重的索引根据得到的索引去服务列表中获取服务。
WeightedResponseTimeRule 响应时间权重轮训规则的实现方式就是使用定时任务去计算权重定时任务中会判断服务的平均响应时间平均响应时间越大权重越小在进行负载均衡时会使用权重成一个随机数然后循环服务列表找到权重大于这个随机数服务。
//com.netflix.loadbalancer.WeightedResponseTimeRule#choose
public Server choose(ILoadBalancer lb, Object key) {//负载均衡器为空判断if (lb null) {//负载均衡器为空 直接返回 nullreturn null;} else {//定义 ServeriServer server null;//while 循环while(server null) {//获取服务权重集合ListDouble currentWeights this.accumulatedWeights;//当前线程是否被中断if (Thread.interrupted()) {//当前线程被中断 直接返回 nullreturn null;}//获取所有服务列表ListServer allList lb.getAllServers();//服务列表的 size 也就是服务的个数int serverCount allList.size();//服务个数为0 判断if (serverCount 0) {//服务个数为0 直接返回 nullreturn null;}//服务索引int serverIndex 0;//找出最大的权重double maxTotalWeight currentWeights.size() 0 ? 0.0D : (Double)currentWeights.get(currentWeights.size() - 1);//最大权重小于 0.001 且 服务数等于权重数 if (!(maxTotalWeight 0.001D) serverCount currentWeights.size()) {//最大权重小于 0.001 且 服务数等于权重数 取非 表示采用权重轮训算法//返回一个大于或等于 0.0 且小于 1.0 的随机浮点数 也就是权重double randomWeight this.random.nextDouble() * maxTotalWeight;int n 0;//迭代遍历权重列表for(Iterator var13 currentWeights.iterator(); var13.hasNext(); n) {Double d (Double)var13.next();//权重列表中的权重大于计算出来的权重 也就找到了 server 索引下标if (d randomWeight) {serverIndex n;break;}}//根据索引去服务列表中获取服务server (Server)allList.get(serverIndex);} else {//表示没有初始化权重 使用父类 RoundRobinRule 的随机算法 得到serverserver super.choose(this.getLoadBalancer(), key);//server 为空判断if (server null) {//不为空直接返回serverreturn server;}}//server 为空判断if (server null) {//server 为空 线程休眠Thread.yield();} else {//服务是否是活跃的if (server.isAlive()) {//是返回 server return server;}//否则给server 赋值为 nullserver null;}}//server 不为空 直接返回serverreturn server;}
}
RetryRule#choose 方法源码分析
RetryRule#choose 方法是重试负载均衡规则的实现具体逻辑如下
获取当前请求时间计算出限制时间允许请求的最大时间。使用轮训负载负载均衡规则获取一个服务。如果获取到的服务不为空 || 服务不是活跃的 当前时间小于限制时间则创建一个中断线程的任务中断时间为 deadline - System.currentTimeMillis()。开启 while 循环使用轮训的负载均衡规则获取服务如果服务不为空是活跃的或者当前时间大于限制时间则结束循环返回 Server。
可以看出 RetryRule 实际调用的是轮询算法在没有获取到服务时会进行循环重试操作超过设置的时间限制默认为500 毫秒则会退出。
//com.netflix.loadbalancer.RetryRule#choose(com.netflix.loadbalancer.ILoadBalancer, java.lang.Object)
public Server choose(ILoadBalancer lb, Object key) {//获取当前请求时间long requestTime System.currentTimeMillis();//限制时间允许请求的最大时间当前时间500毫秒 long maxRetryMillis 500L;long deadline requestTime this.maxRetryMillis;//应答的 serverServer answer null;//IRule subRule new RoundRobinRule();//使用轮询算法获取一个服务answer this.subRule.choose(key);if ((answer null || !answer.isAlive()) System.currentTimeMillis() deadline) {//服务不为空//服务不是活跃的//当前时间小于限制时间//创建一个中断线程的任务 deadline - System.currentTimeMillis() 后打断线程InterruptTask task new InterruptTask(deadline - System.currentTimeMillis());//开启 while 循环 只要当前线程没有被中断 就一直循环执行while(!Thread.interrupted()) {//使用轮询算法获取一个服务answer this.subRule.choose(key);if (answer ! null answer.isAlive() || System.currentTimeMillis() deadline) {//服务不为空 服务是活跃的 || 当前时间已经大于限制时间 则退出 while 循环break;}//线程让出 cpu 资源Thread.yield();}//while 循环结束 清除任务task.cancel();}//返回服务return answer ! null answer.isAlive() ? answer : null;
}
BestAvailableRule#choose 方法源码分析
BestAvailableRule 继承了ClientConfigEnabledRoundRobinRule主要逻辑如下
判断负载均衡器统计信息是否为空如果为空则调用父类的负载均衡算法也就是轮询算法规则ClientConfigEnabledRoundRobinRule#choose。负载均衡器统计信息不为空获取所有服务判断当前时间服务断路器是否打开短路打开不做处理断路器如果没有打开则判断并发连接数是否小于最小并发连接数小于则完成赋值返回 Server。会遍历所有的服务提供者选择并发量最小的那个服务这个算法可以将请求转发到压力最小的服务器但是如果副本数太多每次都要循环计算出最小并发还是比较好资源的。最后会再次对 Server 为空进行判断如果 Server 为空则调用父类的负载均衡算法也就是轮询算法规则ClientConfigEnabledRoundRobinRule#choose。
//com.netflix.loadbalancer.BestAvailableRule#choose
public Server choose(Object key) {//负载均衡器统计信息为空判断if (this.loadBalancerStats null) {//如果统计信息为空 调用父类的负载均衡规则 也就是轮询算法规则 ClientConfigEnabledRoundRobinRule#choosereturn super.choose(key);} else {//不为空 获取所有的 serverListServer serverList this.getLoadBalancer().getAllServers();//最小并发连接数int minimalConcurrentConnections 2147483647;//当前时间long currentTime System.currentTimeMillis();Server chosen null;//迭代遍历 服务列表Iterator var7 serverList.iterator();while(var7.hasNext()) {Server server (Server)var7.next();//获取服务统计信息ServerStats serverStats this.loadBalancerStats.getSingleServerStat(server);//服务断路器是否打开if (!serverStats.isCircuitBreakerTripped(currentTime)) {//服务断路器没有打开//获取并发连接数int concurrentConnections serverStats.getActiveRequestsCount(currentTime);//判断并发连接数是否小于最小并发连接数if (concurrentConnections minimalConcurrentConnections) {//小于 则把当前并发连接数赋值给 最小并发连接数minimalConcurrentConnections concurrentConnections;//赋值 serverchosen server;}}}//server 为 null 判断if (chosen null) {//调用父类的负载均衡算法 其实就是轮询的算法 ClientConfigEnabledRoundRobinRule#choosereturn super.choose(key);} else {return chosen;}}
}//com.netflix.loadbalancer.ClientConfigEnabledRoundRobinRule#choose
public Server choose(Object key) {//轮训规则为空判断if (this.roundRobinRule ! null) {//轮训规则不为空 执行轮训规则算法return this.roundRobinRule.choose(key);} else {throw new IllegalArgumentException(This class has not been initialized with the RoundRobinRule class);}
}AvailabilityFilteringRule#choose 方法源码分析
可用性过滤规则 AvailabilityFilteringRule 继承自 PredicateBasedRule它和其他规则不一样它将服务委托给 AbstractServerPredicate 过滤掉一些连接失败且高并发的 Server。
//com.netflix.loadbalancer.AvailabilityFilteringRule#choose
public Server choose(Object key) {int count 0;//RoundRobinRule roundRobinRule new RoundRobinRule();//循环使用轮训算法获取服务 最大循环10次for(Server server this.roundRobinRule.choose(key); count 10; server this.roundRobinRule.choose(key)) {//断言规则判断 sever 是否符合规则 符合规则返回 不符合规则重新选择if (this.predicate.apply(new PredicateKey(server))) {return server;}}//如果循环了10次还没有找到 server 就调用父类的方法 PredicateBasedRule#choosereturn super.choose(key);
}//com.netflix.loadbalancer.PredicateBasedRule#choose
public Server choose(Object key) {//获取负载均衡器ILoadBalancer lb this.getLoadBalancer();//从断言中循环过滤后返回 serverOptionalServer server this.getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);//返回 serverreturn server.isPresent() ? (Server)server.get() : null;
}ZoneAvoidanceRule 源码分析
ZoneAvoidanceRule 继承了 PredicateBasedRule 类ZoneAvoidanceRule 的构造方法中传入了 ZoneAvoidancePredicate 和 AvailabilityPredicateZoneAvoidancePredicate 判断一个 Zone 的运行性能是否可用剔除不可用的 Zone ServerAvailabilityPredicate 用于过滤掉连接数过多的 Server该规则会先过滤然后使用轮询算法选出可用的服务Ribbon 默认使用的也是这个负载均衡策略。
//com.netflix.loadbalancer.ZoneAvoidanceRule#ZoneAvoidanceRule
public ZoneAvoidanceRule() {//区域断言ZoneAvoidancePredicate zonePredicate new ZoneAvoidancePredicate(this);//可用性断言AvailabilityPredicate availabilityPredicate new AvailabilityPredicate(this);this.compositePredicate this.createCompositePredicate(zonePredicate, availabilityPredicate);
}//com.netflix.loadbalancer.PredicateBasedRule#choose
public Server choose(Object key) {//获取负载均衡器ILoadBalancer lb this.getLoadBalancer();//从断言中循环过滤后返回 serverOptionalServer server this.getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);//返回 serverreturn server.isPresent() ? (Server)server.get() : null;
}如有不正确的地方请各位指出纠正。