上图,刚开始 offset 指向了 bucket[1]
,经过了两个 span
之后,bucket[2]
和 bucket[3]
会被清空,同时,新的 offset 会指向 bucket[3]
,新添加的数据会写入到 bucket[3]
。
再来看看数据统计,也就是窗口内的有效数据量是多少。
// Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set.
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
rw.lock.RLock()
defer rw.lock.RUnlock()
var diff int
span := rw.span()
// ignore current bucket, because of partial data
if span == 0 && rw.ignoreCurrent {
diff = rw.size - 1
} else {
diff = rw.size - span
}
// 需要统计的 bucket 数量,窗口大小减去 span 数量
if diff > 0 {
// 获取统计的起始位置,span 是已经被重置的 bucket
offset := (rw.offset + span + 1) % rw.size
rw.win.reduce(offset, diff, fn)
}
}
func (w *window) reduce(start, count int, fn func(b *Bucket)) {
for i := 0; i < count; i++ {
// 自定义统计函数
fn(w.buckets[(start+i)%w.size])
}
}
统计出窗口数据之后,就可以判断是否需要熔断了。
执行熔断
接下来就是执行熔断了,主要就是看看自适应熔断是如何实现的。
// core/breaker/googlebreaker.go
const (
// 250ms for bucket duration
window = time.Second * 10
buckets = 40
k = 1.5
protection = 5
)
窗口的定义部分,整个窗口是 10s,然后分成 40 个 bucket,每个 bucket 就是 250ms。
// googleBreaker is a netflixBreaker pattern from google.
// see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
type googleBreaker struct {
k float64
stat *collection.RollingWindow
proba *mathx.Proba
}
func (b *googleBreaker) accept() error {
// 获取最近一段时间的统计数据
accepts, total := b.history()
// 根据上文提到的算法来计算一个概率
weightedAccepts := b.k * float64(accepts)
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
// 如果小于等于 0 直接通过,不熔断
if dropRatio <= 0 {
return nil
}
// 随机产生 0.0-1.0 之间的随机数与上面计算出来的熔断概率相比较
// 如果随机数比熔断概率小则进行熔断
if b.proba.TrueOnProba(dropRatio) {
return ErrServiceUnavailable
}
return nil
}
func (b *googleBreaker) history() (accepts, total int64) {
b.stat.Reduce(func(b *collection.Bucket) {
accepts += int64(b.Sum)
total += b.Count
})
return
}
以上就是自适应熔断的逻辑,通过概率的比较来随机淘汰掉部分请求,然后随着服务恢复,淘汰的请求会逐渐变少,直至不淘汰。
func (b *googleBreaker) allow() (internalPromise, error) {
if err := b.accept(); err != nil {
return nil, err
}
// 返回一个 promise 异步回调对象,可由开发者自行决定是否上报结果到熔断器
return googlePromise{
b: b,
}, nil
}
// req - 熔断对象方法
// fallback - 自定义快速失败函数,可对熔断产生的err进行包装后返回
// acceptable - 对本次未熔断时执行请求的结果进行自定义的判定,比如可以针对http.code,rpc.code,body.code
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
if err := b.accept(); err != nil {
// 熔断中,如果有自定义的fallback则执行
if fallback != nil {
return fallback(err)
}
return err
}
defer func() {
// 如果执行req()过程发生了panic,依然判定本次执行失败上报至熔断器
if e := recover(); e != nil {
b.markFailure()
panic(e)
}
}()
err := req()
// 上报结果
if acceptable(err) {
b.markSuccess()
} else {
b.markFailure()
}
return err
}
熔断器对外暴露两种类型的方法:
1、简单场景直接判断对象是否被熔断,执行请求后必须需手动上报执行结果至熔断器。
func (b *googleBreaker) allow() (internalPromise, error)
2、复杂场景下支持自定义快速失败,自定义判定请求是否成功的熔断方法,自动上报执行结果至熔断器。
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
个人感觉,熔断这部分代码,相较于前几篇文章,理解起来是更困难的。但其中的一些设计思想,和底层的实现原理也是非常值得学习的,希望这篇文章能够对大家有帮助。
以上就是本文的全部内容,如果觉得还不错的话欢迎点赞,转发和关注,感谢支持。
参考文章: