pt := range opts {
opt(&b)
}
if len(b.name) == 0 {
b.name = stringx.Rand()
}
b.throttle = newLoggedThrottle(b.name, newGoogleBreaker())
return &b
}
最终的熔断器又将功能代理给了 throttle
。
这就是它们之间的关系,如果感觉有点乱的话,就反复看,看的次数多了,就清晰了。
日志收集
上文介绍过了,loggedThrottle
是为了记录日志而设计的代理层,这部分内容来分析一下是如何记录日志的。
// core/breaker/breaker.go
type errorWindow struct {
// 记录日志的数组
reasons [numHistoryReasons]string
// 索引
index int
// 数组元素数量,小于等于 numHistoryReasons
count int
lock sync.Mutex
}
func (ew *errorWindow) add(reason string) {
ew.lock.Lock()
// 记录错误日志内容
ew.reasons[ew.index] = fmt.Sprintf("%s %s", time.Now().Format(timeFormat), reason)
// 对 numHistoryReasons 进行取余来得到数组索引
ew.index = (ew.index + 1) % numHistoryReasons
ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
ew.lock.Unlock()
}
func (ew *errorWindow) String() string {
var reasons []string
ew.lock.Lock()
// reverse order
for i := ew.index - 1; i >= ew.index-ew.count; i-- {
reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
}
ew.lock.Unlock()
return strings.Join(reasons, "\n")
}
核心就是这里采用了一个环形数组,通过维护两个字段来实现,分别是 index
和 count
。
count
表示数组中元素的个数,最大值是数组的长度;index
是索引,每次 +1
,然后对数组长度取余得到新索引。
我之前有一次面试就让我设计一个环形数组,当时答的还不是很好,这次算是学会了。
滑动窗口
一般来说,想要判断是否需要触发熔断,那么首先要知道一段时间的请求数量,一段时间内的数量统计可以使用滑动窗口来实现。
首先看一下滑动窗口的定义:
// core/collection/rollingwindow.go
type RollingWindow struct {
lock sync.RWMutex
// 窗口大小
size int
// 窗口数据容器
win *window
// 时间间隔
interval time.Duration
// 游标,用于定位当前应该写入哪个 bucket
offset int
// 汇总数据时,是否忽略当前正在写入桶的数据
// 某些场景下因为当前正在写入的桶数据并没有经过完整的窗口时间间隔
// 可能导致当前桶的统计并不准确
ignoreCurrent bool
// 最后写入桶的时间
// 用于计算下一次写入数据间隔最后一次写入数据的之间
// 经过了多少个时间间隔
lastTime time.Duration // start time of the last bucket
}
再来看一下 window
的结构:
type Bucket struct {
// 桶内值的和
Sum float64
// 桶内 add 次数
Count int64
}
func (b *Bucket) add(v float64) {
b.Sum += v
b.Count++
}
func (b *Bucket) reset() {
b.Sum = 0
b.Count = 0
}
type window struct {
// 桶,一个桶就是一个时间间隔
buckets []*Bucket
// 窗口大小,也就是桶的数量
size int
}
有了这两个结构之后,我们就可以画出这个滑动窗口了,如图所示。
现在来看一下向窗口中添加数据,是怎样一个过程。
func (rw *RollingWindow) Add(v float64) {
rw.lock.Lock()
defer rw.lock.Unlock()
// 获取当前写入下标
rw.updateOffset()
// 向 bucket 中写入数据
rw.win.add(rw.offset, v)
}
func (rw *RollingWindow) span() int {
// 计算距离 lastTime 经过了多少个时间间隔,也就是多少个桶
offset := int(timex.Since(rw.lastTime) / rw.interval)
// 如果在窗口范围内,返回实际值,否则返回窗口大小
if 0 <= offset && offset < rw.size {
return offset
}
return rw.size
}
func (rw *RollingWindow) updateOffset() {
// 经过了多少个时间间隔,也就是多少个桶
span := rw.span()
// 还在同一单元时间内不需要更新
if span <= 0 {
return
}
offset := rw.offset
// reset expired buckets
// 这里是清除过期桶的数据
// 也是对数组大小进行取余的方式,类似上文介绍的环形数组
for i := 0; i < span; i++ {
rw.win.resetBucket((offset + i + 1) % rw.size)
}
// 更新游标
rw.offset = (offset + span) % rw.size
now := timex.Now()
// align to interval time boundary
// 这里应该是一个时间的对齐,保持在桶内指向位置是一致的
rw.lastTime = now - (now-rw.lastTime)%rw.interval
}
// 向桶内添加数据
func (w *window) add(offset int, v float64) {
// 根据 offset 对数组大小取余得到索引,然后添加数据
w.buckets[offset%w.size].add(v)
}
// 重置桶数据
func (w *window) resetBucket(offset int) {
w.buckets[offset%w.size].reset()
}
我画了一张图,来模拟整个滑动过程:
主要经历 4 个步骤:
- 计算当前时间距离上次添加时间经过了多少个时间间隔,也就是多少个 bucket
- 清理过期桶数据
- 更新 offset,更新 offset 的过程实际就是模拟窗口滑动的过程
- 添加数据
比如