1.8 应用到Observer 上
既然通过weak_ptr 能探查对象的生死,那么Observer 模式的竞态条件就很容易解决,只要让Observable 保存weak_ptr<Observer> 即可:
- 39 class Observable // not 100% thread safe!
- 40 {
- 41 public:
- 42 void register_(weak_ptr<Observer> x); // 参数类型可用const weak_ptr<Observer>&
- 43 // void unregister(weak_ptr<Observer> x); // 不需要它
- 44 void notifyObservers();
- 45
- 46 private:
- 47 mutable MutexLock mutex_;
- 48 std::vector<weak_ptr<Observer> > observers_;
- 49 typedef std::vector<weak_ptr<Observer> >::iterator Iterator;
- 50 };
- 51
- 52 void Observable::notifyObservers()
- 53 {
- 54 MutexLockGuard lock(mutex_);
- 55 Iterator it = observers_.begin(); // Iterator 的定义见第49 行
- 56 while (it != observers_.end())
- 57 {
- 58 shared_ptr<Observer> obj(it->lock()); // 尝试提升,这一步是线程安全的
- 59 if (obj)
- 60 {
- 61 // 提升成功,现在引用计数值至少为2 (想想为什么?)
- 62 obj->update(); // 没有竞态条件,因为obj 在栈上,对象不可能在本作用域内销毁
- 63 ++it;
- 64 }
- 65 else
- 66 {
- 67 // 对象已经销毁,从容器中拿掉weak_ptr
- 68 it = observers_.erase(it);
- 69 }
- 70 }
- 71 }
- recipes/thread/test/Observer_safe.cc
就这么简单。前文代码(3) 处(p. 10 的L17)的竞态条件已经弥补了。思考:如果把L48 改为vector<shared_ptr<Observer> > observers_;,会有什么后果?
解决了吗
把Observer* 替换为weak_ptr<Observer> 部分解决了Observer 模式的线程安全,但还有以下几个疑点。这些问题留到本章§1.14 中去探讨,每个都是能解决的。
侵入性强制要求Observer 必须以shared_ptr 来管理。
不是完全线程安全Observer 的析构函数会调用subject_->unregister(this),万一subject_ 已经不复存在了呢? 为了解决它, 又要求Observable 本身是用shared_ptr 管理的,并且subject_ 多半是个weak_ptr<Observable>。
锁争用(lock contention) 即Observable 的三个成员函数都用了互斥器来同步,这会造成register_() 和unregister() 等待notifyObservers(),而后者的执行时间是无上限的,因为它同步回调了用户提供的update() 函数。我们希望register_() 和unregister() 的执行时间不会超过某个固定的上限,以免殃及无辜群众。
死锁万一L62 的update() 虚函数中调用了(un)register 呢?如果mutex_ 是不可重入的,那么会死锁;如果mutex_ 是可重入的,程序会面临迭代器失效(coredump 是最好的结果),因为vector observers_ 在遍历期间被意外地修改了。这个问题乍看起来似乎没有解决办法,除非在文档里做要求。(一种办法是:用可重入的mutex_,把容器换为std::list,并把++it 往前挪一行。)
我个人倾向于使用不可重入的mutex,例如Pthreads 默认提供的那个,因为“要求mutex 可重入”本身往往意味着设计上出了问题(§2.1.1)。Java 的intrinsiclock 是可重入的,因为要允许synchronized 方法相互调用(派生类调用基类的同名synchronized 方法),我觉得这也是无奈之举。