设为首页 加入收藏

TOP

golang开发:go并发的建议(完)(一)
2023-07-23 13:28:58 】 浏览:55
Tags:golang 开发 :go

上次说了一下Go语言布道师 Dave Cheney对Go并发的建议,个人觉得最重要的一条,这次主要想说一下这个。
8.3. Never start a goroutine without knowning when it will stop(永远不要在不知道何时停止的情况下启动 goroutine)

我们的需求

我这边当时有个需求是这样的,我们有个考试系统的,每次学员答完试卷去检查一下这次交卷是否是这次考试的最后一份试卷,如果是最后一份试卷的话,需要计算这次考试的总成绩,生成考试的学习报告,当然了,如果不是最后一份试卷的话啥也不干。
生成试卷和报告是必须要生成的,不能出现考完试了没有总成绩和总报告。
接到这个需求的时候,我首先想到的是使用golang的goroutine去异步算出成绩生成报告。然后写代码就是这样的。

go createReport()

这不刚好是8.3 永远不要这样写的建议么?
然后觉得应该写一个管理goroutine异步执行任务的类库,创建执行销毁都由这个管理工具去执行。准备写的时候发现B站的代码里有一个这样的类库,异步执行的类库。

B站的类库

B站代码里面异步任务是这个文件
openbilibili-go-common-master/library/sync/pipeline/fanout/fanout.go

var (
	// ErrFull chan full.
	ErrFull   = errors.New("fanout: chan full")
	stats     = prom.BusinessInfoCount
	traceTags = []trace.Tag{
		trace.Tag{Key: trace.TagSpanKind, Value: "background"},
		trace.Tag{Key: trace.TagComponent, Value: "sync/pipeline/fanout"},
	}
)

type options struct {
	worker int
	buffer int
}

// Option fanout option
type Option func(*options)

// Worker specifies the worker of fanout
func Worker(n int) Option {
	if n <= 0 {
		panic("fanout: worker should > 0")
	}
	return func(o *options) {
		o.worker = n
	}
}

// Buffer specifies the buffer of fanout
func Buffer(n int) Option {
	if n <= 0 {
		panic("fanout: buffer should > 0")
	}
	return func(o *options) {
		o.buffer = n
	}
}

type item struct {
	f   func(c context.Context)
	ctx context.Context
}

// Fanout async consume data from chan.
type Fanout struct {
	name    string
	ch      chan item
	options *options
	waiter  sync.WaitGroup

	ctx    context.Context
	cancel func()
}

// New new a fanout struct.
func New(name string, opts ...Option) *Fanout {
	if name == "" {
		name = "fanout"
	}
	o := &options{
		worker: 1,
		buffer: 1024,
	}
	for _, op := range opts {
		op(o)
	}
	c := &Fanout{
		ch:      make(chan item, o.buffer),
		name:    name,
		options: o,
	}
	c.ctx, c.cancel = context.WithCancel(context.Background())
	c.waiter.Add(o.worker)
	for i := 0; i < o.worker; i++ {
		go c.proc()
	}
	return c
}

func (c *Fanout) proc() {
	defer c.waiter.Done()
	for {
		select {
		case t := <-c.ch:
			wrapFunc(t.f)(t.ctx)
			stats.State(c.name+"_channel", int64(len(c.ch)))
		case <-c.ctx.Done():
			return
		}
	}
}

func wrapFunc(f func(c context.Context)) (res func(context.Context)) {
	res = func(ctx context.Context) {
		defer func() {
			if r := recover(); r != nil {
				buf := make([]byte, 64*1024)
				buf = buf[:runtime.Stack(buf, false)]
				log.Error("panic in fanout proc, err: %s, stack: %s", r, buf)
			}
		}()
		f(ctx)
		if tr, ok := trace.FromContext(ctx); ok {
			tr.Finish(nil)
		}
	}
	return
}

// Do save a callback func.
func (c *Fanout) Do(ctx context.Context, f func(ctx context.Context)) (err error) {
	if f == nil || c.ctx.Err() != nil {
		return c.ctx.Err()
	}
	nakeCtx := metadata.WithContext(ctx)
	if tr, ok := trace.FromContext(ctx); ok {
		tr = tr.Fork("", "Fanout:Do").SetTag(traceTags...)
		nakeCtx = trace.NewContext(nakeCtx, tr)
	}
	select {
	case c.ch <- item{f: f, ctx: nakeCtx}:
	default:
		err = ErrFull
	}
	stats.State(c.name+"_channel", int64(len(c.ch)))
	return
}

// Close close fanout
func (c *Fanout) Close() error {
	if err := c.ctx.Err(); er
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇go-zero docker-compose 搭建课件.. 下一篇实践GoF的设计模式:代理模式

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目