设为首页 加入收藏

TOP

微服务架构|go-zero 的自适应熔断器(四)
2023-09-09 10:25:25 】 浏览:333
Tags:go-zero 应熔断
上图,刚开始 offset 指向了 bucket[1],经过了两个 span 之后,bucket[2]bucket[3] 会被清空,同时,新的 offset 会指向 bucket[3],新添加的数据会写入到 bucket[3]

再来看看数据统计,也就是窗口内的有效数据量是多少。

// Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set.
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
    rw.lock.RLock()
    defer rw.lock.RUnlock()

    var diff int
    span := rw.span()
    // ignore current bucket, because of partial data
    if span == 0 && rw.ignoreCurrent {
        diff = rw.size - 1
    } else {
        diff = rw.size - span
    }
    // 需要统计的 bucket 数量,窗口大小减去 span 数量
    if diff > 0 {
        // 获取统计的起始位置,span 是已经被重置的 bucket
        offset := (rw.offset + span + 1) % rw.size
        rw.win.reduce(offset, diff, fn)
    }
}

func (w *window) reduce(start, count int, fn func(b *Bucket)) {
    for i := 0; i < count; i++ {
        // 自定义统计函数
        fn(w.buckets[(start+i)%w.size])
    }
}

统计出窗口数据之后,就可以判断是否需要熔断了。

执行熔断

接下来就是执行熔断了,主要就是看看自适应熔断是如何实现的。

// core/breaker/googlebreaker.go

const (
    // 250ms for bucket duration
    window     = time.Second * 10
    buckets    = 40
    k          = 1.5
    protection = 5
)

窗口的定义部分,整个窗口是 10s,然后分成 40 个 bucket,每个 bucket 就是 250ms。

// googleBreaker is a netflixBreaker pattern from google.
// see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
type googleBreaker struct {
    k     float64
    stat  *collection.RollingWindow
    proba *mathx.Proba
}

func (b *googleBreaker) accept() error {
    // 获取最近一段时间的统计数据
    accepts, total := b.history()
    // 根据上文提到的算法来计算一个概率
    weightedAccepts := b.k * float64(accepts)
    // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
    dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
    // 如果小于等于 0 直接通过,不熔断
    if dropRatio <= 0 {
        return nil
    }

    // 随机产生 0.0-1.0 之间的随机数与上面计算出来的熔断概率相比较
    // 如果随机数比熔断概率小则进行熔断
    if b.proba.TrueOnProba(dropRatio) {
        return ErrServiceUnavailable
    }

    return nil
}

func (b *googleBreaker) history() (accepts, total int64) {
    b.stat.Reduce(func(b *collection.Bucket) {
        accepts += int64(b.Sum)
        total += b.Count
    })

    return
}

以上就是自适应熔断的逻辑,通过概率的比较来随机淘汰掉部分请求,然后随着服务恢复,淘汰的请求会逐渐变少,直至不淘汰。

func (b *googleBreaker) allow() (internalPromise, error) {
    if err := b.accept(); err != nil {
        return nil, err
    }

    // 返回一个 promise 异步回调对象,可由开发者自行决定是否上报结果到熔断器
    return googlePromise{
        b: b,
    }, nil
}

// req - 熔断对象方法
// fallback - 自定义快速失败函数,可对熔断产生的err进行包装后返回
// acceptable - 对本次未熔断时执行请求的结果进行自定义的判定,比如可以针对http.code,rpc.code,body.code
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
    if err := b.accept(); err != nil {
        // 熔断中,如果有自定义的fallback则执行
        if fallback != nil {
            return fallback(err)
        }

        return err
    }

    defer func() {
        // 如果执行req()过程发生了panic,依然判定本次执行失败上报至熔断器
        if e := recover(); e != nil {
            b.markFailure()
            panic(e)
        }
    }()

    err := req()
    // 上报结果
    if acceptable(err) {
        b.markSuccess()
    } else {
        b.markFailure()
    }

    return err
}

熔断器对外暴露两种类型的方法:

1、简单场景直接判断对象是否被熔断,执行请求后必须需手动上报执行结果至熔断器。

func (b *googleBreaker) allow() (internalPromise, error)

2、复杂场景下支持自定义快速失败,自定义判定请求是否成功的熔断方法,自动上报执行结果至熔断器。

func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error

个人感觉,熔断这部分代码,相较于前几篇文章,理解起来是更困难的。但其中的一些设计思想,和底层的实现原理也是非常值得学习的,希望这篇文章能够对大家有帮助。

以上就是本文的全部内容,如果觉得还不错的话欢迎点赞转发关注,感谢支持。


参考文章:

首页 上一页 1 2 3 4 下一页 尾页 4/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇完全可复制、经过验证的 Go 工具链 下一篇一文了解Validator库

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目