在分布式环境中,当需要控制对某一资源的不同进程并发访问时就需要使用分布式锁;可以使用 ZooKeeper + Curator 来实现分布式锁,本文主要介绍 Curator 中分布式锁的使用,文中所使用到的软件版本:Java 1.8.0_341、Zookeeper 3.7.1、curator 5.4.0。
1、引入依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.4.0</version> </dependency>
2、使用样例
2.1、可重入锁
@Test public void interProcessMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessMutex(cf, "/test/lock"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一线程中可重复获取 lock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { //获取了几次就要释放几次 release(lock); release(lock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.2、不可重入锁
@Test public void interProcessSemaphoreMutex() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); cf.start(); InterProcessLock lock = new InterProcessSemaphoreMutex(cf, "/test/lock2"); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { lock.acquire(); //同一线程中不可重复获取 //lock.acquire(); logger.info(Thread.currentThread().getName() + "获得了锁"); Thread.sleep(1000 * 3); } catch (Exception e) { e.printStackTrace(); } finally { release(lock); logger.info(Thread.currentThread().getName() + "释放了锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.3、读写锁(可重入)
@Test public void interProcessReadWriteLock() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessReadWriteLock lock = new InterProcessReadWriteLock(cf, "/test/lock3"); InterProcessReadWriteLock.ReadLock readLock = lock.readLock(); InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock(); cf.start(); int size = 5; CountDownLatch countDownLatch = new CountDownLatch(size); for (int i = 0; i < size; i++) { new Thread(() -> { try { readLock.acquire(); readLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了读锁"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { //获取了几次就要释放几次 release(readLock); release(readLock); logger.info(Thread.currentThread().getName() + "释放了读锁"); } try { writeLock.acquire(); logger.info(Thread.currentThread().getName() + "获得了写锁"); Thread.sleep(1000 * 2); } catch (Exception e) { e.printStackTrace(); } finally { release(writeLock); logger.info(Thread.currentThread().getName() + "释放了写锁"); } countDownLatch.countDown(); }).start(); } countDownLatch.await(); cf.close(); }
2.4、信号量
信号量用于控制对资源同时访问的进程或线程数。
@Test public void interProcessSemaphoreV2() throws InterruptedException { CuratorFramework cf = getCuratorFramework(); InterProcessSemaphoreV2 semaphore = new InterPr