设为首页 加入收藏

TOP

记一次etcd全局锁使用不当导致的事故(二)
2023-07-23 13:25:45 】 浏览:61
Tags:etcd 全局锁
{ if err = watchLock(client, gMutex, ctx); err != nil { fmt.Println("get etcd global key failed") return } } // 启动成功,做具体的业务逻辑处理 fmt.Println("todo ..............") select {} } func watchLock(client *clientv3.Client, gMutex *concurrency.Mutex, ctx context.Context) (err error) { watchCh := client.Watch(ctx, lockName, clientv3.WithPrefix()) for { select { case <-ctx.Done(): return ctx.Err() case <-watchCh: if err = gMutex.TryLock(ctx); err == nil { // 获取到锁 return nil } } } }

将上述代码编译成可执行文件 main.exe、main1.exe 后,先后执行上面两个可执行文件,然后通过下面的命令查看 etcd 中的 RAFT APPLYEND INDEX ,不会出现RAFT APPLYEND INDEX 持续增长的现象,也就是从源头解决了问题。

4、TryLock 源码分析

以下是自己的理解,如果有不对的地方,请不吝赐教,十分感谢

那下面一起看看 TryLock 方法里面做了什么操作,会导致 RAFT APPLYEND INDEX 持续增长呢。

TryLock 方法源码如下:

func (m *Mutex) TryLock(ctx context.Context) error {
	resp, err := m.tryAcquire(ctx)
	if err != nil {
		return err
	}
	// if no key on prefix / the minimum rev is key, already hold the lock
	ownerKey := resp.Responses[1].GetResponseRange().Kvs
	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
		m.hdr = resp.Header
		return nil
	}
	client := m.s.Client()
	// Cannot lock, so delete the key
    // 这里的 client.Delete 会走到 raft 模块,从而使 etcd 的 raft applyed index 增加 1
	if _, err := client.Delete(ctx, m.myKey); err != nil {
		return err
	}
	m.myKey = "\x00"
	m.myRev = -1
	return ErrLocked
}

tryAcquire 方法源码如下:

// 下面主要是使用到了 etcd 中的事务,
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
	s := m.s
	client := m.s.Client()
	
    // m.myKey = /TEST/LOCKER/326989110b4e9304
	m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    // 这里就是定义一个判断语句,创建 myKey 时的版本号是否 等于 0
	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
	// put self in lock waiters via myKey; oldest waiter holds lock
    // 往 etcd 中写入 myKey
	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
	// reuse key in case this session already holds the lock
    // 查询 myKey
	get := v3.OpGet(m.myKey)
	// fetch current holder to complete uncontended path with only one RPC
	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    // 这里是重点,判断 cmp 中的条件是否成立,成立则执行 Then 中的语句,否则执行  Else 中的语句
    // 这里的语句肯定是成功的,因为我们测试的环境是执行两个不同的 session
    // 简单的可以理解为两个不同的程序,实际上是 两个不同的会话就会不同
    // 所以我们这里的场景是 会执行 v3.OpPut 操作。所以这里会增加一次 revision
    // 即 etcd 的 raft applyed index 会增加 1
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
	if err != nil {
		return nil, err
	}
	m.myRev = resp.Header.Revision
	if !resp.Succeeded {
		m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
	}
	return resp, nil
}

下面这张图是 debug 时,先启动一个可执行文件,然后使用 debug 方式启动的程序,程序执行完 tryAcquire 方法后,截取的一张图,这也作证了上面的分析。304 这个 key 是之前启动程序就存在的 key,下面 30f 的 key 是 debug 期间生成的 key。

大家如果有不清楚的地方,亲自去调试下,看看代码,就会明白上面说的内容了。

5、思考

其实,这并不是难以考虑到的问题,代码中出现这个问题,主要是自己对 etcd 的了解程度不够,不清楚 TryLock 的原理,以为像简单的查询Get那样,不会导致 revision 的增长,但实际上并不是这样。而是生产中出现了问题才去看为什么会这样,然后再去解决问题,这是一种不太好的方式,希望以后在编码的时候,尽量多考虑考虑,减少问题出现。

想起来前几天看到一篇问题,也是 for 循环中的出现的问题,原文链接,感谢可以去看看 Go坑:time.After可能导致的内存泄露问题分析

首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇一文了解Go语言的I/O接口设计 下一篇for-range排坑指南

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目