设为首页 加入收藏

TOP

golang channel 源码剖析(五)
2019-05-23 14:35:57 】 浏览:341
Tags:golang channel 源码 剖析
siz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return }

和非阻塞发送有两个不同的地方:

  • 对 closed 的判断放到了后面。
  • 使用了 atomic。

我们先来看一下下面这段代码:

c := make(chan int, 1)
c <- 1

go func() {
    select {
    case <-c:
        println("recv from c")
    default:
        println("c is not ready - BUG!")
    }
}()

close(c)
<-c

从 go 的语义上来说,不论何时,default 都不应该被执行:如果 select 发生在 close 之前,那么从 c 中取出来的数据应该是 1。 如果 select 发生在 close 之后但是在 <-c 之前,那么也应该从 c 中取出 1。如果 select 发生在 <-c 之后,从 c 中取出的数据是 0 ,而且接收数据是失败的,但是不会执行 default。

那么,如果把对 closed 的判断放到通道是否有数据可接收的判断之前,像这样:

if !block && atomic.Load(&c.closed) == 0 && (c.dataqsiz == 0 && c.sendq.first == nil ||
    c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0)  {
    return
}

这意味着 if 测试通过后的一瞬间存在两种情况:

  • 通道未关闭,但是不存在数据可接收,也没有发送者在等待。对于这种情况,应该返回 (false,false)。执行 default 段的代码。
  • 通道已关闭,且不存在数据可接收,也没有发送者在等待。对于这种情况,根据 go 语义,应该返回 (true, false),并且执行 case 段的代码。但是我们的这个实现显然是错误的,它返回了 (false,false)。就上面的接收例子而言, close(c)<-c 正好发生在 atomic.Load(&c.closed) == 0 执行完成之后,但还没有执行后面的判断,那 if 再执行后面的判断,显然也是通过的。所以问题就出来了。

再来看一下正确的实现,它也会在 if 测试通过后的一瞬间存在两种情况:

  • 不存在数据可接收,而且通道没有关闭。此时返回 (false,false)
  • 存在数据可接收,而且通道没有关闭。此时应该返回 (true,true)。但是,这种情况意味着上一种情况曾今存在过, 而且至少在 if 执行前的那一瞬间还存在。所以我们认为它返回 (false,false) 是合理的。

另外 atomic 在这里是为了保证内存顺序的正确性。

  1. 加锁,然后判断如果通道已经关闭而且没有剩余的数据可以读取了,那么就返回 (true,false)。
lock(&c.lock)

if c.closed != 0 && c.qcount == 0 {
    unlock(&c.lock)
    if ep != nil {
        typedmemclr(c.elemtype, ep)
    }
    return true, false
}

typedmemclr 的作用是将 ep 指向的类型为 elemtype 的内存块置为 0 值。

  1. 如果有发送者在队列等待,那么直接从发送者那里提取数据,并且唤醒这个发送者。当然对于带缓冲区的 chan,它会先将缓冲区的数据提取出来,然后将等待中的发送者的数据拷贝到缓冲区中。
if sg := c.sendq.dequeue(); sg != nil {
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        if ep != nil {
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx 
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    goready(gp, skip+1)
}

recv 函数判断 chan 是否带有缓冲区,如果不带缓冲区,直接从发送者那里复制数据到 ep。如果带缓冲区,那么你应该能够理解,由于有发送者在等待,所以缓冲区一定是满的。它将缓冲区的第一个数据复制到 ep,然后将发送者的数据复制到缓冲区。这是为了尽量满足先来后到的需求(当然,由于并发的存在,这样做实际上不能完全确定)。

接下来,通过 goready 将发送者唤醒。

  1. 如果缓冲区中有数据,那么从缓冲区复制数据到 ep,并且修改下次接收位置和 qcount
if c.qcount > 0 {
    qp := chanbuf(c, c.recvx)
    if ep != nil {
        typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
}
  1. 在执行完成上面的流程后,仍然没有返回,说明缓冲区内已经没有数据了,而且也没有发送者在等待中。所以如果是非阻塞接收,那么直接返回 (false,false)。
if !block {
    unlock(&c.lock)
    return false, false
}
  1. 对于阻塞接收的情况,将调用者 goroutine 挂起,并且等待被唤醒。
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
  1. goparkunlock 返回后,说明已经接收到数据了,或者是通道已经被关闭了。此时和发送一样,做一些清理工作。然后根据是否为关闭导致的返回对应的 bool 值。

7. 关闭通道

closechan 函数实现了通道的关闭,它的声明如下:

func closechan(c *hchan)

close

首页 上一页 2 3 4 5 6 7 下一页 尾页 5/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Golang实现requests库 下一篇Golang websocket推送

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目