多核处理器日益普及的现在很多代码都得和并发/并行打交道,对于内置了并发支持(goroutine)的golang来说并发编程是必不可少的一环。
链表是我们再熟悉不过的数据结构,在并发编程中我们也时长需要用到,今天我们就来看两种带锁的并发安全的单项链表。
方案一:粗粒度锁,完全锁住链表
方案一的做法是将所有操作用锁——Mutex串行化处理。串行化处理是指锁和链表相关联,当需要修改或读取链表时就获取锁,只要该goroutine持有锁,那么其他goroutine就无法修改或读取链表,直到锁的持有者将其释放。
这样可以保证任何时间都只有一个goroutine能处理链表,如此一来也就避免了数据竞争。下面是链表结构的定义:
type MutexList struct { locker *sync.Mutex head, tail *Node size int64 }
size表示当前list的长度,head是一个哨兵节点,不存储实际的数值。
下面是节点的定义:
type Node struct { Value interface{} Next *Node } func NewNode(v interface{}) *Node { n := &Node{Value: v} n.Next = nil return n }
节点和它的初始化不用多说,因为数据访问通过list来控制,所以节点里不需要再有mutex的存在。
好了我们进入正题,在粗粒度的解决方案里Enq方法负责将数据插入list的末尾,这是O(1)时间的操作,将list锁住然后更新tail即可,注意我们不允许插入nil:
func (l *MutexList) Enq(v interface{}) bool { if v == nil { return false } l.locker.Lock() node := NewNode(v) l.tail.Next = node l.tail = node l.size++ l.locker.Unlock() return true }
然后是insert,它将数据插入在给出的index处,index从0开始,同样我们不允许插入nil,同时会检查index,index不能超过size,当list只有size-1个节点时,新数据会插入在list的末尾:
func (l *MutexList) Insert(index int64, v interface{}) bool { l.locker.Lock() defer l.locker.Unlock() if index > l.size || index < 0 || v == nil { return false } current := l.head for i := int64(0); i <= index-1; i++ { // index - 1是最后一个节点时 if current.Next == nil { break } current = current.Next } node := NewNode(v) node.Next = current.Next current.Next = node l.size++ return true }
这里我们使用了defer,那是因为只有一个mutex,而且函数有多个出口,容易在编码过程中漏掉对锁的释放。
节点的删除和查找也是类似的步骤,先给列表上锁,然后修改/读取,最后解锁,这里就不多讲解了。
然后是获取size的函数,后面的测试中要用,虽然我们以原子操作来获取了长度,但是仍然可能存在获得size之后其他goroutine进行了remove导致size改变进而引发Insert返回false,所幸的是我们的测试里并不会让remove和Insert同时出现,因此不会出现insert返回失败的问题,在实际使用时需要注意Insert的返回值,这一点在第二种方案中也是一样的:
func (l *MutexList) Length() int64 { return atomic.LoadInt64(&l.size) }
因为方案二的该函数并无什么变化,因此就省略了。
如你所见,方案一的优点在于实现起来简单,确点在于一次只有一个goroutine能处理list,几乎所有对list的操作都被串行化了。
方案一无疑能很好地工作,但是它的性能十分有限,所以我们来看看方案二。
方案二:细粒度锁,锁住需要修改的节点
方案二的做法是将锁放到node里,每次需要修改的仅仅是部分节点,而不用把整个list锁住,这样能保证互不干扰的goroutine们可以同时处理list,而会互相干扰的goroutine则会被节点的mutex阻塞,以保证不存在竟态数据。
当然,为了保证不会有多个goroutine同时处理一个节点,我们需要在取得要修改节点的锁之前先取得前项节点的锁,然后才能取得修改节点的锁。这个步骤很像交叉手,它被称为锁耦合。
另外一个需要注意的地方是加锁的顺序,所有操作的加锁顺序/方向必须相同,比如从head开始锁定到tail,如果不按统一的顺序加锁将会出现死锁。考虑如下情况,goroutine A锁住了节点1,正准备锁定节点2,这时goroutine B沿反方向加锁,它要锁住节点2然后再锁住节点1,如果B运气很好先于A取得了节点2的锁,那么它将一直等待锁住节点1,而A则会始终等待锁住节点2,出现了A等B,B等A的死锁。但是只要统一了加锁的顺序/方向,那么这种问题就不复存在了。
这是list和node的定义,可以看见锁已经移动到node结构里了:
type List struct { head, tail *MutexNode size int64 } type MutexNode struct { Locker *sync.Mutex Value interface{} Next *MutexNode } func NewMutexNode(v interface{}) *MutexNode { n := &MutexNode{Value: v} n.Locker = &sync.Mutex{} n.Next = nil return n } func NewList() *List { l := &List{} l.head = NewMutexNode(nil) l.tail = l.head return l }
下面我们来看Enq,功能与方案一一致,只是在处理锁的地方有所不同,因为tail节点总是在list末尾的元素,符合我们从head开始的加锁顺序,又因为l.tail的位置始终是确定的,所以可以省略锁住前项节点的步骤;然而l.tail会在我们等待锁的时间段里被更新,所以我们需要处理l.tail被更新的情况:
func (l *List) Enq(v interface{}) bool { if v == nil { return false } tail := l.tail tail.Locker.Lock() for tail.Next != nil { next := tail.Next next.Locker.Lock() tail.Locker.Unlock() tail = next } node := NewMutexNode(v