siz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
和非阻塞发送有两个不同的地方:
- 对 closed 的判断放到了后面。
- 使用了 atomic。
我们先来看一下下面这段代码:
c := make(chan int, 1)
c <- 1
go func() {
select {
case <-c:
println("recv from c")
default:
println("c is not ready - BUG!")
}
}()
close(c)
<-c
从 go 的语义上来说,不论何时,default 都不应该被执行:如果 select 发生在 close 之前,那么从 c 中取出来的数据应该是 1。 如果 select 发生在 close 之后但是在 <-c 之前,那么也应该从 c 中取出 1。如果 select 发生在 <-c 之后,从 c 中取出的数据是 0 ,而且接收数据是失败的,但是不会执行 default。
那么,如果把对 closed 的判断放到通道是否有数据可接收的判断之前,像这样:
if !block && atomic.Load(&c.closed) == 0 && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) {
return
}
这意味着 if 测试通过后的一瞬间存在两种情况:
- 通道未关闭,但是不存在数据可接收,也没有发送者在等待。对于这种情况,应该返回 (false,false)。执行 default 段的代码。
- 通道已关闭,且不存在数据可接收,也没有发送者在等待。对于这种情况,根据 go 语义,应该返回 (true, false),并且执行 case 段的代码。但是我们的这个实现显然是错误的,它返回了 (false,false)。就上面的接收例子而言,
close(c)
和 <-c
正好发生在 atomic.Load(&c.closed) == 0
执行完成之后,但还没有执行后面的判断,那 if 再执行后面的判断,显然也是通过的。所以问题就出来了。
再来看一下正确的实现,它也会在 if 测试通过后的一瞬间存在两种情况:
- 不存在数据可接收,而且通道没有关闭。此时返回 (false,false)
- 存在数据可接收,而且通道没有关闭。此时应该返回 (true,true)。但是,这种情况意味着上一种情况曾今存在过, 而且至少在 if 执行前的那一瞬间还存在。所以我们认为它返回 (false,false) 是合理的。
另外 atomic 在这里是为了保证内存顺序的正确性。
- 加锁,然后判断如果通道已经关闭而且没有剩余的数据可以读取了,那么就返回 (true,false)。
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
typedmemclr 的作用是将 ep 指向的类型为 elemtype 的内存块置为 0 值。
- 如果有发送者在队列等待,那么直接从发送者那里提取数据,并且唤醒这个发送者。当然对于带缓冲区的 chan,它会先将缓冲区的数据提取出来,然后将等待中的发送者的数据拷贝到缓冲区中。
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
recv 函数判断 chan 是否带有缓冲区,如果不带缓冲区,直接从发送者那里复制数据到 ep。如果带缓冲区,那么你应该能够理解,由于有发送者在等待,所以缓冲区一定是满的。它将缓冲区的第一个数据复制到 ep,然后将发送者的数据复制到缓冲区。这是为了尽量满足先来后到的需求(当然,由于并发的存在,这样做实际上不能完全确定)。
接下来,通过 goready 将发送者唤醒。
- 如果缓冲区中有数据,那么从缓冲区复制数据到 ep,并且修改下次接收位置和 qcount
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
- 在执行完成上面的流程后,仍然没有返回,说明缓冲区内已经没有数据了,而且也没有发送者在等待中。所以如果是非阻塞接收,那么直接返回 (false,false)。
if !block {
unlock(&c.lock)
return false, false
}
- 对于阻塞接收的情况,将调用者 goroutine 挂起,并且等待被唤醒。
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
- goparkunlock 返回后,说明已经接收到数据了,或者是通道已经被关闭了。此时和发送一样,做一些清理工作。然后根据是否为关闭导致的返回对应的 bool 值。
7. 关闭通道
closechan 函数实现了通道的关闭,它的声明如下:
func closechan(c *hchan)
close