源码解析
type hchan struct {
qcount uint // Channel 中的元素个数
dataqsiz uint // Channel 中的循环队列的长度
buf unsafe.Pointer // Channel 的缓冲区数据指针
elemsize uint16 // 当前 Channel 能够收发的元素大小
closed uint32
elemtype *_type // 当前 Channel 能够收发的元素类型
sendx uint // Channel 的发送操作处理到的位置
recvx uint // Channel 的接收操作处理到的位置
recvq waitq // 当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,双向链表(sugog)
sendq waitq // 当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,双向链表(sugog)
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
创建channel
channel的初始化有2种,一种是没有缓冲区的channel,一种是有缓冲区的channel。对应的初始化之后hchan也是有区别的。
无缓冲区的channel,初始化的时候只为channel分配内存,缓冲区dataqsiz的长度为0
有缓冲的channel,初始化时会为channel和缓冲区分配内存,dataqsiz长度大于0
同时channel的元素大小和缓冲区的长度都是有大小限制的
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 如果内存超了,或者分配的内存大于channel最大分配内存,或者分配的size小于0,直接Panic
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 如果没有缓冲区,分配一段内存
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 有缓冲时,如果元素不包含指针类型,会为当前的 Channel 和底层的数组分配一块连续的内存空间
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 有缓冲区,且元素包含指针类型,channel和buf数组各自分配内存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 元素大小,元素类型,循环数组长度,更新到channel
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
发送数据(ch <- i)
-
发送数据前会加锁,防止多个线程并发修改数据。如果channel已经关闭,直接Panic
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
-
当存在等待的接收者时,通过
runtime.send
直接将数据发送给阻塞的接收者当channel的recvq队列不为空,而且channel是没有数据数据写入的。这个时候如果有数据写入,会直接把数据拷贝到接收者变量所在的内存地址上。即使这是一个有缓冲的channel,当有等待的接收者时,也是直接给接收者,不会先保存到循环队列
// 如果目标 Channel 没有被关闭并且已经有处于读等待的 Goroutine,那么 runtime.chansend 会从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向它发送数据 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if s