设为首页 加入收藏

TOP

golang channel 源码剖析(四)
2019-05-23 14:35:57 】 浏览:343
Tags:golang channel 源码 剖析
0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false }

注意:前两步中都没有加锁。第 1 步中,没有访问 hchan 的任何成员,所以无需加锁。第 2 步中,可能被写的变量只有 closed 、qcount 和 c.recvq.first ,这些变量都是单字长的,所以对它们的单个值的读操作是原子性的。

然而,我们应该要更仔细的分析,对单个值的读操作是原子性的,但是对多个值的读操作就不一定是原子性的了。因为在判断完 closed 之后,通道可能在这一瞬间从未关闭状态转变成关闭状态(closed 不会从非 0 变成 0,但有可能从 0 变成非 0,所以在判断 closed==0 之后,通道可能还会转变成关闭状态),也就是说这里的 if 测试通过的那一瞬间,可能有两种情况:

  • 通道没有关闭,而且已经满了。那么这段逻辑运行 ok,应该返回 false。
  • 通道已经关闭,而且已经满了。按照发送数据的语义来说,此时应该 panic。但实际上这段逻辑的实现,它会返回 false。

但我们还要注意到的是,第 2 种情况的发生,肯定意味着第 1 种情况发生过。而且它取决与通道的 close 是何时被调用的,至少在 if 之前 close 还没有完成调用。所以我们认为第 2 种情况的逻辑也是正确的。

(嗯,确实有点难理解,也很难描述)

  1. 调用 lock 对通道加锁

  2. 如果此时通道被关闭,那么发生 panic

// 第 3 步,加锁
lock(&c.lock)  

// 第 4 步,如果通道已经被关闭了,那么 panic
if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
}
  1. 从 recvq 中取出一个接收者,如果接收者存在,直接向该接收者发送数据。
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

send 函数将 ep 作为参数传送给接收方的 sg 对象,然后使用 goready 将其唤醒。sg.elem 如果非空,则将 ep 的内容直接 copy 到 elem 指向的地址。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // ...
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem
    memmove(dst, src, t.size)
}

注意:如果有接收者在队列中等待,则说明此时的缓冲区是空的。

  1. 如果缓冲区还有多余的空间,那么将数据写入缓冲区。写入缓冲区后,将发送位置往后移动一个单位,然后将 qcount 加 1
if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
}

其中 chanbuf 函数从 buf 中取出第 i 个元素的存放地址:

func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

typedmemmove 函数将类型为 c.elemtype 的 ep 的内容 拷贝到 qp 中。

  1. 如果执行前面的所有步骤还没有成功发送,那么就表示缓冲区没有空间了,而且也没有任何接收者在等待。所以后面必须要将 goroutine 挂起然后等待新的接收者了。但对于非阻塞的调用,不能等待,返回 false 表示数据发送不成功。
if !block {
        unlock(&c.lock)
        return false
    }
  1. 创建 sudog 对象,然后入队并且让 goroutine 进入等待状态。直到被唤醒时 goparkunlock 才会返回。
gp := getg()

mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c

gp.waiting = mysg
gp.param = nil

c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
  1. goparkunlock 返回后,代表已经发送完数据了,此时做一些清理工作,如将 sudog 对象释放,将 g 的 waiting 置空等。

6. 接收数据

接收数据的操作和发送数据的操作大同小异,它的实现函数为 chanrecv

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) 
  • chanrecv 从 c 中接收数据,并且将接收到的数据存到 ep 中,block 表示是否需要阻塞。
  • 如果没有数据可以接收,而且是非阻塞的情况,则返回 (false,flase)。如果 c 已经关闭了,将 ep 指向的值置为 0值,并且返回 (true, false)。其它情况返回值为 (true,true),表示成功从 c 中获取到了数据。

同样地,block 是为了实现以下语义:

select {
case v = <-c:
    ... foo
default:
    ... bar
}

它被编译成:

if selectnbrecv(&v, c) {
    ... foo
} else {
    ... bar
}

其中 selectnbrecv 的实现为:

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
    selected, _ = chanrecv(c, elem, false)  // 非阻塞接收
    return
}

接下来,我们分析以下 recv 的逻辑:

  1. 如果 c 为空且是非阻塞模式,那么直接返回 (false,false)。否则永远等待
if c == nil {
    if !block {
        return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
}
  1. 对于非阻塞的情况,如果当前没有数据可以接收了,那么返回 (false,false)。
if !block && (c.dataq
首页 上一页 1 2 3 4 5 6 7 下一页 尾页 4/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Golang实现requests库 下一篇Golang websocket推送

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目