排序的通道的。利用 pollorder 构建一个最大堆:
for i := 0; i < ncases; i++ {
j := i
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
注意和常规的最小堆构建稍有不同,因为它将其它内存的内容构建成最小堆放到了当前内存中,并且使用插入法建堆。这种方式的时间复杂度是 O(nlogn)。相比常规的建堆时间复杂度是 O(n)。看似慢了,但实际上在数据量比较小的时候,插入法建堆更快,而且如果在这里使用的是常规建堆方法,需要先执行一次内存拷贝操作。
接下来就是使用大根堆的排序了:
for i := ncases - 1; i >= 0; i-- {
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {
break
}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
k++
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}
每次外层迭代,都将最大的元素移到后面,然后重新调整位置满足堆的属性。
8.3 loop 段
在 loop 段开始之前,selectgo 先使用了 sellock 对所有的通道加锁,注意 lockorder 在这里的作用。
sellock(scases, lockorder)
loop 段是 selectgo 函数的核心部分,它的目的是先遍历一次所有的 case 和 default 语句,看一下是否有可执行的分支,如果有,那么就转移到对应的段去处理。否则就阻塞并且等待被唤醒。
我们先看循环部分:
loop:
var dfli int
var dfl *scase
var casi int
var cas *scase
var recvOK bool
for i := 0; i < ncases; i++ {
casi = int(pollorder[i])
cas = &scases[casi]
c = cas.c
switch cas.kind {
case caseNil:
continue
case caseRecv:
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
case caseSend:
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}
case caseDefault:
dfli = casi
dfl = cas
}
}
它遍历了所有的 case+default,然后按照 case 的类别做如下处理:
- 无效的 case,不处理
- 接收 case,根据不同的情况分别跳转到 recv, bufrecv, rclose 段。注意这里的顺序,rclose 是放在最后面的。
- 发送 case,根据不同的情况分别跳转到 sclose,send, bufsend 段。这里是要把 sclose 放在最前面的,因为向一个已经关闭的通道发送数据,就应该 panic
- 对于 default,selectgo 简单的将这个 case 信息保存下来,留给后面处理。
当循环结束后,如果有 default 语句存在,那么执行 default 的内容。
if dfl != nil {
selunlock(scases, lockorder)
casi = dfli
cas = dfl
goto retc
}
selectgo 用 casi 表示要执行哪个 case 的内容, cas 表示要执行的分支的 scase 对象。这里它简单的对这两个变量赋值,然后转移到 retc 段。
8.4 loop 之后
当上面的流程都执行完了,还没有 goto 出去,说明没有任何 case 当前可以执行。那么就挂起并等待被唤醒。
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
if cas.kind == caseNil {
continue
}
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.c = c
*nextp = sg
nextp = &sg.waitlink
switch cas.kind {
case caseRecv:
c.recvq.enqueue(sg)
case caseSend:
c.sendq.enqueue(sg)
}
}
gp.param = nil
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
它按照锁顺序一次遍历每个 case,然后将其放到 g.waitlink 这个 sudog 链表中,表明是在等待多个 case 。并且对于每个 case,都往 recvq 或者 sendq 里面插入这个 sudog,用以表示这个等待者。
然后使用 gopark 将当前 goroutine 切换到等待状态。
当 gopark 返回时,说明已经被某个 channel 唤醒了,后面主要是一些清理工作。
8.5 bufrecv 段
bufrecv 段从带 buf 的通道中接收数据。执行到 bufrecv 段了,说明对应的通道缓冲区有数据可以接收了
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc
这一段的实现和之前讨论的 recv 函数类似,但是最后它把所有权交给 retc
8.6 bufsend 段
bufsend 段向缓冲区写入数据,与 send 函数类似,但是最后把所有权让给了 retc
bufsend:
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++