chan 按照如下的流程执行:
- 加锁,然后判断如果通道早已关闭了,就 panic。(你不能对一个被关闭的通道再执行关闭操作)
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
- 将关闭标志置为 1.
c.closed = 1
- 唤醒所有的接收者,并且将接收数据置为 0 值。唤醒所有发送者,令其 panic。 gList 就是一个 g 对象的列表。
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
unlock(&c.lock)
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
8. select
select 函数是本文的最后一部分,也是最复杂的一部分。它的实现函数是 selectgo
8.1 selectgo 的声明
runtime 通过遍历+等待的方式实现 select 语义,遍历时判断如果 有可执行的 case 或者 select 中带有 default,那么就执行之。如果没有,就通过 gopark 将调用者转换为等待状态,使用 sudog 链表表示它在多个通道上等待。其中任意一个通道对应的 sudog 都可以唤醒调用者。
函数 selectgo 实现了 select 语义。它的第一个返回值表示需要执行哪个 case, 第 2 个返回值表示如果要执行的 case 是 caseRecv,那么接收数据是否成功(对于已经关闭的通道来说,这个返回值会是 false,这个我们在 chanrecv 函数中已经看到了)。
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool)
- 参数 cas0 指向 scase 数组的第一个元素, 每个 scase 表示一个 case 分支, scase 的定义如下:
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
kind uint16
// ...
}
const (
caseNil = iota
caseRecv
caseSend
caseDefault
)
c 表示这个 case 对应的通道 ,elem 表示接收数据的地址或者要发送的数据的地址。kind 取值为 caseNil 表示一个 0 值,在真实的 select 中没有任何东西和它对应,它用于表示无效的的意思。caseRecv 和 caseSend 分别表示接收和发送的 case。caseDefault 对应 default 分支。
- order0 参数指向的是一个 2 倍 case 数量大小的数组,它用来为 selectgo 提供额外的空间用来使用堆排序和随机顺序执行。你可能在想,这个空间它自己也能分配,为什么要让外部提供?其实这样做是有它的目的的,首先在 selectgo 中,它不知道调用者的 case 究竟有多少个,那么它无法分配栈内存,它只能分配堆内存,而我们的代码中 for + select 的用法是很常见的,这样小而且频繁的堆内存分配势必给 gc 带来非常大的压力。其次,在 select 的调用处,编译器能够知道你有多少个 case,所以它可以给你分配固定大小的栈内存。(对于这一段,如果你觉得难以理解,可以先跳过,不影响你理解后文)。
- ncases 表示的是 case 的数量,包括 default。
8.2 避免死锁
在继续探索这个函数之前,可能还需要了解一个东西。那就是对多个锁的占有和释放。
在 selectgo 中,毫无疑问要同时访问多个通道,每个通道都应该加锁才能访问。那么要获得多个锁的所有权,为了不造成死锁,需要按照固定的顺序加锁和解锁(我想你应该知道死锁是什么,而且这种按顺序的加锁和解锁方式可以避免死锁)。
runtime 中的 sellock 和 selunlock 用于对 scase 数组加锁和解锁。注意解锁的时候顺序和加锁的顺序是相反的。
另外由于一个 select 语句中可能存在多个 case 对同一个通道的操作,而对于同一个通道来说,只能加锁一次,也只能解锁一次。所以加锁迭代中需要判断是否和上次加锁的通道一样,解锁迭代中需要判断下个要解锁的通道是否和当前通道一样。 lockorder 是要保证同一个通道存在多次,那么它们需要是相邻的。
func sellock(scases []scase, lockorder []uint16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[o].c
if c0 != nil && c0 != c {
c = c0
lock(&c.lock)
}
}
}
func selunlock(scases []scase, lockorder []uint16) {
for i := len(scases) - 1; i >= 0; i-- {
c := scases[lockorder[i]].c
if c == nil {
break
}
if i > 0 && c == scases[lockorder[i-1]].c {
continue // will unlock it on the next iteration
}
unlock(&c.lock)
}
}
接下来我们深入探索 selectgo 这个函数的实现,根据代码结构,本节将按照分段的方式对这个函数进行讲解。
8.3 pollorder 和 lockorder
pollorder 表示轮询顺序,为了实现 select 中的随机语义,轮询应该是随机的。 pollorder 对应参数 order0 指针的前半部分。pollorder 包含 0~ncases-1 中的所有数字,下面是随机生成 pollorder 的代码
for i := 1; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}
这个很有意思,它假设第一个元素初始为 0,而且没有对后面的元素做任何假设。每次迭代中,从前面的所有元素中随机挑选一个,然后将当前索引和它置换。从而生成 0~ncases-1 的值。
它只要求第一个元素初始值为 0 ,这样编译器可以为我们对 select 的调用生成更加高效的代码。
lockorder 表示加锁顺序,用以传给 sellock 和 selunlock 加锁和解锁。它最后存储的值为按照地址