会用wordpress建站,河南今天发生的重大新闻,铜川网站建设哪家好,高端女装有哪些品牌文章目录 原因限流对象限流后的做法怎么确定限流阈值观测业务性能数据压测借鉴链路上的其他服务手动计算 四种静态限流算法令牌桶漏桶固定窗口与滑动窗口 手写限流算法令牌桶漏桶固定窗口滑动窗口 分布式限流的具体实现 原因
尽管云原生网关里有统一入口的限流#xff08;根据… 文章目录 原因限流对象限流后的做法怎么确定限流阈值观测业务性能数据压测借鉴链路上的其他服务手动计算 四种静态限流算法令牌桶漏桶固定窗口与滑动窗口 手写限流算法令牌桶漏桶固定窗口滑动窗口 分布式限流的具体实现 原因
尽管云原生网关里有统一入口的限流根据ip、userID来控制但是有的微服务需要有自己的限流策略比如根据不同的算法任务、不同的子产品来限制所以封装了一个限流器公共包可以在多个微服务中复用这个功能。直接原因是有一次某个子功能流量激增大量任务失败。
关键步骤
实现限流策略例如基于令牌桶或漏桶配置和初始化微服务启动时加载配置和初始化限流器
限流对象
针对ip限流例如1s限制50个请求考虑到公共ip的情况 针对某个算法任务不限制vip用户普通用户1s限制创建10个创建任务的请求。
限流后的做法 目前的做法是限流了就直接拒绝抛出错误提示还有其他的做法:
同步阻塞等待一段时间。如果是偶发性地触发了限流那么稍微阻塞等待一会儿后面就有极大的概率能得到处理。比如说限流设置为一秒钟 100 个请求恰好来了 101 个请求。多出来的一个请求只需要等一秒钟下一秒钟就会被处理。但是要注意控制住超时也就是说你不能让人无限期地等待下去。同步转异步。这里我们又一次看到了这个手段它是指如果一个请求没被限流那就直接同步处理而如果被限流了那么这个请求就会被存储起来等到业务低峰期的时候再处理。这个其实跟降级差不多。TODO 研究到降级时再过来看一下调整负载均衡算法用redis的话似乎跟负载均衡没关系如果是在网关里做限流是可以调整负载均衡器的。如果某个请求被限流了那么就相当于告诉负载均衡器应该尽可能少给这个节点发送请求。我在熔断里面给你讲过类似的方案。不过在熔断里面是负载均衡器后续不再发请求而在限流这里还是会发送请求只是会降低转发请求到该节点的概率。调整节点的权重就能达成这种效果。
怎么确定限流阈值
观测业务性能数据
我们公司有完善的监控所以我可以通过观测到的性能数据来确定阈值。比如说观察线上的数据如果在业务高峰期整个集群的 QPS 都没超过 1000那么就可以考虑将阈值设定在 1200多出来的 200 就是余量。 不过这种方式有一个要求就是服务必须先上线有了线上的观测数据才能确定阈值。并且整个阈值很有可能是偏低的。因为业务巅峰并不意味着是集群性能的瓶颈。如果集群本身可以承受每秒 3000 个请求但是因为业务量不够每秒只有 1000 个请求那么我这里预估出来的阈值是显著低于集群真实瓶颈 QPS 的。
压测
不过我个人觉得最好的方式应该是在线上执行全链路压测测试出瓶颈。即便不能做全链路压测也可以考虑模拟线上环境进行压测再差也应该在测试环境做一个压力测试。
借鉴链路上的其他服务
不过如果真的做不了或者来不及或者没资源那么还可以考虑参考类似服务的阈值。比如说如果 A、B 服务是紧密相关的也就是通常调用了 A 服务就会调用 B 服务那么可以用 A 已经确定的阈值作为 B 的阈值。又或者 A 服务到 B 服务之间有一个转化关系。比如说创建订单到支付会有一个转化率假如说是 90%如果创建订单的接口阈值是 100那么支付的接口就可以设置为 90。
手动计算
实在没办法了就只能手动计算了。也就是沿着整条调用链路统计出现了多少次数据库查询、多少次微服务调用、多少次第三方中间件访问如 RedisKafka 等。举一个最简单的例子假如说一个非常简单的服务整个链路只有一次数据库查询这是一个会回表的数据库查询根据公司的平均数据这一次查询会耗时 10ms那么再增加 10 ms 作为 CPU 计算耗时。也就是说这一个接口预期的响应时间是 20ms。如果一个实例是 4 核那么就可以简单用 1000ms÷20ms×4200 得到阈值。
四种静态限流算法
令牌桶
系统会以一个恒定的速率产生令牌这些令牌会放到一个桶里面每个请求只有拿到了令牌才会被执行。每当一个请求过来的时候就需要尝试从桶里面拿一个令牌。如果拿到了令牌那么请求就会被处理如果没有拿到那么这个请求就被限流了。当令牌桶已满时新生成的令牌会被丢弃不会增加桶中的令牌数量。 你需要注意本身令牌桶是可以积攒一定数量的令牌的。比如说桶的容量是 100也就是这里面最多积攒 100 个令牌。那么当某一时刻突然来了 100 个请求它们都能拿到令牌。
漏桶
漏桶是指当请求以不均匀的速度到达服务器之后限流器会以固定的速率转交给业务逻辑。 某种程度上你可以将漏桶算法看作是令牌桶算法的一种特殊形态。你将令牌桶中桶的容量设想为 0就是漏桶了。 所以你可以看到在漏桶里面令牌产生之后你就需要取走没取走的话也不会积攒下来。因此漏桶是绝对均匀的而令牌桶不是绝对均匀的。
固定窗口与滑动窗口
固定窗口是指在一个固定时间段只允许执行固定数量的请求。比如说在一秒钟之内只能执行 100 个请求。滑动窗口类似于固定窗口也是指在一个固定时间段内只允许执行固定数量的请求。区别就在于滑动窗口是平滑地挪动窗口而不像固定窗口那样突然地挪动窗口。假设窗口大小是一分钟。此时时间是 t1那么窗口的起始位置是 t1-1 分钟。过了 2 秒之后窗口大小依旧是 1 分钟但是窗口的起始位置也向后挪动了 2 秒变成了 t1 - 1 分钟 2 秒。这也就是滑动的含义。
手写限流算法
参考 https://blog.csdn.net/z3551906947/article/details/140477024并且里面阐述了各个算法的优缺点漏桶是可以用来处理突发流量的。
令牌桶
package limiterimport (synctime
)// TokenBucketLimiter 令牌桶限流器
type TokenBucketLimiter struct {capacity int // 容量currentTokens int // 令牌数量rate int // 发放令牌速率/秒lastTime time.Time // 上次发放令牌时间mutex sync.Mutex // 避免并发问题
}// NewTokenBucketLimiter 创建一个新的令牌桶限流器实例。
func NewTokenBucketLimiter(capacity, rate int) *TokenBucketLimiter {return TokenBucketLimiter{capacity: capacity,rate: rate,lastTime: time.Now(),currentTokens: 0, // 初始化时桶中没有令牌}
}// TryAcquire 尝试从令牌桶中获取一个令牌。
func (l *TokenBucketLimiter) TryAcquire() bool {l.mutex.Lock()defer l.mutex.Unlock()now : time.Now()interval : now.Sub(l.lastTime) // 计算时间间隔// 如果距离上次发放令牌超过 1/rate 秒则发放新的令牌if float64(interval) float64(time.Second)/float64(l.rate) {// 计算应该发放的令牌数量但不超过桶的容量newTokens : int(float64(interval)/float64(time.Second)* l.rate) l.currentTokens minInt(l.capacity, l.currentTokensnewTokens)// 更新上次发放令牌的时间l.lastTime now}// 如果桶中没有令牌则请求失败if l.currentTokens 0 {return false}// 桶中有令牌消费一个令牌l.currentTokens--return true
}// minInt 返回两个整数中的较小值。
func minInt(a, b int) int {if a b {return a}return b
}func TestName(t *testing.T) {tokenBucket : NewTokenBucketLimiter(5, 10)for i : 0; i 10; i {fmt.Println(tokenBucket.TryAcquire())}time.Sleep(100 * time.Millisecond)fmt.Println(tokenBucket.TryAcquire())
}漏桶
package limiterimport (fmtmathsynctestingtime
)// LeakyBucketLimiter 漏桶限流器
type LeakyBucketLimiter struct {capacity int // 桶容量currentLevel int // 当前水位rate int // 水流速度/秒lastTime time.Time // 上次放水时间mutex sync.Mutex // 避免并发问题
}// NewLeakyBucketLimiter 初始化漏桶限流器
func NewLeakyBucketLimiter(capacity, rate int) *LeakyBucketLimiter {return LeakyBucketLimiter{capacity: capacity,currentLevel: 0, // 初始化时水位为0rate: rate,lastTime: time.Now(),}
}// TryAcquire 尝试获取处理请求的权限
func (l *LeakyBucketLimiter) TryAcquire() bool {l.mutex.Lock() // 直接获取写锁defer l.mutex.Unlock()// 如果上次放水时间距今不到 1/rate 秒不需要放水now : time.Now()interval : now.Sub(l.lastTime)// 计算放水后的水位if float64(interval) float64(time.Second)/float64(l.rate) {l.currentLevel int(math.Max(0, float64(l.currentLevel)-float64(interval)/float64(time.Second)*float64(l.rate)))l.lastTime now}// 尝试增加水位if l.currentLevel l.capacity {l.currentLevelreturn true}return false
}func TestName(t *testing.T) {tokenBucket : NewLeakyBucketLimiter(5, 10)for i : 0; i 10; i {fmt.Println(tokenBucket.TryAcquire())}time.Sleep(100 * time.Millisecond)fmt.Println(tokenBucket.TryAcquire())
}固定窗口
package mainimport (synctime
)// FixedWindowRateLimiter 定义固定窗口限流器
type FixedWindowRateLimiter struct {mu sync.MutexmaxRequests intrequestCount intwindow time.Time // 窗口的起始点每个窗口长度1s
}// NewFixedWindowRateLimiter 创建一个新的固定窗口限流器实例
func NewFixedWindowRateLimiter(maxRequests int) *FixedWindowRateLimiter {return FixedWindowRateLimiter{maxRequests: maxRequests,window: time.Now().Truncate(time.Second),}
}// TryAcquire 尝试获取请求许可
func (f *FixedWindowRateLimiter) TryAcquire() bool {f.mu.Lock()defer f.mu.Unlock()// 检查是否需要重置窗口if time.Now().After(f.window.Add(time.Second)) {f.requestCount 0f.window time.Now().Truncate(time.Second)}// 检查是否达到最大请求次数if f.requestCount f.maxRequests {return false}// 请求成功递增计数器f.requestCountreturn true
}func main() {limiter : NewFixedWindowRateLimiter(5)for i : 0; i 10; i {if limiter.TryAcquire() {fmt.Println(请求通过)} else {fmt.Println(请求被拒绝)}time.Sleep(100 * time.Millisecond)}
}滑动窗口
package mainimport (synctime
)// SlidingWindowRateLimiter 定义滑动窗口限流器
type SlidingWindowRateLimiter struct {mu sync.MutexmaxRequests intwindowSize time.Duration // 窗口长度windows []intwindowIndex intcurrentTime time.Time // 上个滑窗的起始点
}// NewSlidingWindowRateLimiter 创建一个新的滑动窗口限流器实例
func NewSlidingWindowRateLimiter(maxRequests int, windowSize time.Duration) *SlidingWindowRateLimiter {numWindows : int(windowSize.Seconds())return SlidingWindowRateLimiter{maxRequests: maxRequests,windowSize: windowSize,windows: make([]int, numWindows),currentTime: time.Now().Truncate(time.Second),windowIndex: 0,}
}// TryAcquire 尝试获取请求许可
func (s *SlidingWindowRateLimiter) TryAcquire() bool {s.mu.Lock()defer s.mu.Unlock()// 更新当前时间currentTime : time.Now().Truncate(time.Second)// 检查是否需要更新窗口if currentTime.After(s.currentTime.Add(s.windowSize)) {s.currentTime currentTimes.windowIndex 0} else if currentTime.After(s.currentTime.Add(time.Second)) {s.windowIndex (s.windowIndex 1) % len(s.windows)}// 清除过期窗口for i : range s.windows {if currentTime.Before(s.currentTime.Add(time.Duration(i1)*time.Second)) {break}s.windows[i] 0}// 检查是否达到最大请求次数totalRequests : 0for _, count : range s.windows {totalRequests count}if totalRequests s.maxRequests {return false}// 请求成功递增计数器s.windows[s.windowIndex]return true
}func main() {limiter : NewSlidingWindowRateLimiter(5, 10*time.Second)for i : 0; i 10; i {if limiter.TryAcquire() {fmt.Println(请求通过)} else {fmt.Println(请求被拒绝)}time.Sleep(100 * time.Millisecond)}
}分布式限流的具体实现
从单机或者集群的角度看可以分为单机限流或者集群限流。集群限流一般需要借助 Redis 之类的中间件来记录流量和阈值。换句话说就是你需要用 Redis 等工具来实现前面提到的限流算法。当然如果是利用网关来实现集群限流那么可以摆脱 Redis。