fmt.Printf("请求 %s 开始....\n", url)
// 假设这里是发送请求,获取数据
if strings.Contains(url, "url2") {
// 假设请求 url2 时出现错误
time.Sleep(time.Second*2)
return "", errors.New("请求出错")
} else if strings.Contains(url, "url3") {
// 假设 请求 url3 需要1秒
select {
case <- errCtx.Done():
ret, err = "", errors.New("请求3被取消")
return
case <- time.After(time.Second*3):
fmt.Printf("请求 %s 结束....\n", url)
return "success3", nil
}
} else {
select {
case <- errCtx.Done():
ret, err = "", errors.New("请求1被取消")
return
case <- time.After(time.Second):
fmt.Printf("请求 %s 结束....\n", url)
return "success1", nil
}
}
}
执行结果:
请求 http://localhost/url2 开始....
请求 http://localhost/url3 开始....
请求 http://localhost/url1 开始....
请求 http://localhost/url1 结束....
eg.Wait error: 请求出错
2、errgroup源码分析
看了上面的例子,我们对errgroup
有了一定了解,接下来,我们一起看看errgroup
做了那些封装。
2.1 errgroup.Group
errgroup.Group
源码如下:
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero Group is valid, has no limit on the number of active goroutines,
// and does not cancel on error.
type Group struct {
// context 的 cancel 方法
cancel func()
wg sync.WaitGroup
//传递信号的通道,这里主要是用于控制并发创建 goroutine 的数量
//通过 SetLimit 设置过后,同时创建的goroutine 最大数量为n
sem chan token
// 保证只接受一次错误
errOnce sync.Once
// 最先返回的错误
err error
}
看结构体中的内容,发现比原生的sync.WaitGroup
多了下面的内容:
cancel func()
sem chan token
errOnce sync.Once
err error
2.2 WithContext 方法
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &Group{cancel: cancel}, ctx
}
方法逻辑还是比较简单的,主要做了两件事:
- 使用
context
的WithCancel()
方法创建一个可取消的Context
- 将
context.WithCancel(ctx)
创建的 cancel
赋值给 Group中的cancel
2.3 Go
1.2 最后一个例子说,不用手动去执行 cancel 的原因就在这里。
g.cancel() //这里就是为啥不用手动执行 cancel的原因
// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group's context, if the
// group was created by calling WithContext. The error will be returned by Wait.
func (g *Group) Go(f func() error) {
if g.sem != nil {
//往 sem 通道中发送空结构体,控制并发创建 goroutine 的数量
g.sem <- token{}
}
g.wg.Add(1)
go func() {
// done()函数的逻辑就是当 f 执行完后,从 sem 取一条数据,并且 g.wg.Done()
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() { // 这里就是确保 g.err 只被赋值一次
g.err = err
if g.cancel != nil {
g.cancel() //这里就是为啥不用手动执行 cancel的原因
}
})
}
}()
}
2.4 TryGo
看注释,知道此函数的逻辑是:当正在执行的goroutine数量小于通过SetLimit()
设置的数量时,可以启动成功,返回 true,否则启动失败,返回false。
// TryGo calls the given function in a new goroutine only if the number of
// active goroutines in the group is currently below the configured limit.
//
// The return value reports whether the goroutine was started.
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.se