g.elem != nil {
// 调用 runtime.sendDirect 将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
// 调用 runtime.goready 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;
// 需要注意的是,发送数据的过程只是将接收方的 Goroutine 放到了处理器的 runnext 中,程序没有立刻执行该 Goroutine
goready(gp, skip+1)
}
当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 如果当前元素数小于循环队列的长度
if c.qcount < c.dataqsiz {
// 使用 runtime.chanbuf 计算出下一个可以存储数据的位置
qp := chanbuf(c, c.sendx)
// 将发送的数据拷贝到缓冲区中
typedmemmove(c.elemtype, qp, ep)
// 发送的位置索引+1
c.sendx++
// 如果循环队列满了就从0开始
// 因为这里的 buf 是一个循环数组,所以当 sendx 等于 dataqsiz 时会重新回到数组开始的位置
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 增加当前元素数
c.qcount++
unlock(&c.lock)
return true
}
...
}
当不存在缓冲区或者缓冲区已满时,等待其他 Goroutine 从 Channel 接收数据
当因为不存在缓冲区或者缓冲区已满无法写入时,会构造sudog等待执行的gorutine结构,放到hchan的等待队列中,直到被唤醒,把数据放到缓冲区或者直接拷贝给接收者
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 使用 select 关键字可以向 Channel 非阻塞地发送消息
if !block {
unlock(&c.lock)
return false
}
// 获取发送数据使用的 Goroutine
gp := getg()
// 获取 runtime.sudog 结构
mysg := acquireSudog()
// 设置待发送数据的内存地址
mysg.elem = ep
// 设置发送数据的goroutine
mysg.g = gp
mysg.isSelect = false
// 设置发送的channel
mysg.c = c
// 设置到goroutine的waiting上
gp.waiting = mysg
// 加入到发送等待队列
c.sendq.enqueue(mysg)
// 阻塞等待唤醒
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
接收数据(<- ch)
-
从一个空 Channel 接收数据
goroutine会让出使用权,并阻塞等待
if c == nil {
if !block {
return
}
// 让出使用权
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 不获取锁的情况下,检查失败的非阻塞操作
if !block && empty(c) {
// 显示未关闭,继续返回false,因为channel不会重新打开
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
// Channel 已经被关闭并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
// Channel 已经被关闭并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
-
当存在等待的发送者时,通过 runtime.recv
从阻塞的发送者或者缓冲区中获取数据
如果是无缓冲的channel,当有接收者进来时,会直接从阻塞的发送者拷贝数据
如果是有缓冲的channel,当有接收者进来时,会先从缓冲区拿数据,接着等待的发送者会把数据拷贝到缓冲区
注意这个时候并没有直接去唤醒发送者,而是放到下次p的执行队列中中,下次调度时会唤醒发送者,发送者