Curator框架应用实现Zookeeper分布式锁 | Word count: 2.3k | Reading time: 11min | Post View:
Curator框架应用实现Zookeeper分布式锁 CuratorFramework
Curator框架提供了一套高级的API, 简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制。
Curator主要解决了三类问题:
封装ZooKeeper client与ZooKeeper
server之间的连接处理 提供了一套Fluent风格的操作API
提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装
组成部分
Client: 是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法
Framework: 用来简化ZooKeeper高级功能的使用, 并增加了一些新的功能, 比如管理到ZooKeeper集群的连接, 重试处理 Recipes: 实现了通用ZooKeeper的
recipe, 该组件建立在Framework的基础之上
Utilities:各种ZooKeeper的工具类
Errors: 异常处理, 连接, 恢复等
Extensions: recipe扩展
Curator分布式锁 curator提供了四种分布式锁,分别是:
InterProcessMutex:分布式可重入排它锁
InterProcessSemaphoreMutex:分布式排它锁
InterProcessReadWriteLock:分布式读写锁
InterProcessMultiLock:将多个锁作为单个实体管理的容器
Zookeeper分布式锁实现
InterProcessMutex:分布式可重入排它锁,通过zookeeper节点不能重复的原理来让分布式环境的服务排队处理同一个数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;public class ZooKeeperLock { private static final String connetString = "116.196.113.135:2181" ; private static int sessionTimeOut = 2000 ; private static int count = 100 ; private static void process () { try { System.out.println(Thread.currentThread().getName() + ": acquire resource: " + count--); } catch (Exception e) { e.printStackTrace(); } } public static void main (String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000 , 3 ); final CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(connetString) .sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).build(); curatorFramework.start(); final InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "/lock" ); final CountDownLatch countDownLatch = new CountDownLatch(1 ); for (int i = 0 ; i < 100 ; i++) { new Thread(new Runnable() { @Override public void run () { try { countDownLatch.await(); mutex.acquire(); process(); } catch (Exception e) { e.printStackTrace(); }finally { try { mutex.release(); System.out.println(Thread.currentThread().getName() + ": release lock" ); } catch (Exception e) { e.printStackTrace(); } } } },"Thread" + i).start(); } Thread.sleep(100 ); countDownLatch.countDown(); } }
输出如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 "C:\Program Files\Java\jdk1.8.0_151\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 182.3684.2\lib\idea_rt.jar=10283:C:\Program Files\JetBrains\IntelliJ IDEA 182.3684.2\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_151\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\rt.jar;D:\zookeeper\target\classes;H:\maven\maven_xian\apache-maven-repository\org\apache\curator\curator-framework\2.12.0\curator-framework-2.12.0.jar;H:\maven\maven_xian\apache-maven-repository\org\apache\curator\curator-client\2.12.0\curator-client-2.12.0.jar;H:\maven\maven_xian\apache-maven-repository\org\apache\zookeeper\zookeeper\3.4.8\zookeeper-3.4.8.jar;H:\maven\maven_xian\apache-maven-repository\log4j\log4j\1.2.16\log4j-1.2.16.jar;H:\maven\maven_xian\apache-maven-repository\jline\jline\0.9.94\jline-0.9.94.jar;H:\maven\maven_xian\apache-maven-repository\io\netty\netty\3.7.0.Final\netty-3.7.0.Final.jar;H:\maven\maven_xian\apache-maven-repository\com\google\guava\guava\16.0.1\guava-16.0.1.jar;H:\maven\maven_xian\apache-maven-repository\org\slf4j\slf4j-api\1.7.6\slf4j-api-1.7.6.jar;H:\maven\maven_xian\apache-maven-repository\org\apache\curator\curator-recipes\2.12.0\curator-recipes-2.12.0.jar" com.usher.curator.ZooKeeperLockSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Thread22: acquire resource: 100 Thread22: release lock Thread55: acquire resource: 99 Thread55: release lock Thread91: acquire resource: 98 Thread91: release lock Thread1: acquire resource: 97 Thread1: release lock Thread88: acquire resource: 96 Thread88: release lock Thread35: acquire resource: 95 Thread35: release lock Thread86: acquire resource: 94 Thread86: release lock Thread37: acquire resource: 93 Thread37: release lock Thread58: acquire resource: 92 Thread58: release lock Thread7: acquire resource: 91 Thread7: release lock Thread94: acquire resource: 90 Thread94: release lock Thread80: acquire resource: 89
查看Zookeeper节点
Curator相关源码分析 InterProcessMutex源码解读 InterProcessMutex
内部有个ConcurrentMap
类型的threadData
属性,该属性会以线程对象为键,线程对应的LcokData
对象为值,记录每个锁的相关信息。在new一个InterProcessMutex
实例时,其构造器主要是为threadData
进行Map
初始化,校验锁的根节点的合法性并使用basePath
属性记录,此外还会实例化一个LockInternals
对象由属性internals
引用,LockInternals
是InterProcessMutex
加锁的核心。
加锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public void acquire () throws Exception { if (!this .internalLock(-1L , (TimeUnit)null )) { throw new IOException("Lost connection while trying to acquire lock: " + this .basePath); } } public boolean acquire (long time, TimeUnit unit) throws Exception { return this .internalLock(time, unit); } private boolean internalLock (long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread(); InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this .threadData.get(currentThread); if (lockData != null ) { lockData.lockCount.incrementAndGet(); return true ; } else { String lockPath = this .internals.attemptLock(time, unit, this .getLockNodeBytes()); if (lockPath != null ) { InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath); this .threadData.put(currentThread, newLockData); return true ; } else { return false ; } } }
竞争锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 String attemptLock (long time, TimeUnit unit, byte [] lockNodeBytes) throws Exception { long startMillis = System.currentTimeMillis(); Long millisToWait = unit != null ? unit.toMillis(time) : null ; byte [] localLockNodeBytes = this .revocable.get() != null ? new byte [0 ] : lockNodeBytes; int retryCount = 0 ; String ourPath = null ; boolean hasTheLock = false ; boolean isDone = false ; while (!isDone) { isDone = true ; try { ourPath = this .driver.createsTheLock(this .client, this .path, localLockNodeBytes); hasTheLock = this .internalLockLoop(startMillis, millisToWait, ourPath); } catch (NoNodeException var14) { if (!this .client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) { throw var14; } isDone = false ; } } return hasTheLock ? ourPath : null ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public String createsTheLock (CuratorFramework client, String path, byte [] lockNodeBytes) throws Exception { String ourPath; if ( lockNodeBytes != null ) { ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes); } else { ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } return ourPath; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 private boolean internalLockLoop (long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false ; boolean doDelete = false ; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1 ); PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true ; } else { String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized (this ) { try { client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true ; break ; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true ; throw e; } finally { if ( doDelete ) { deleteOurPath(ourPath); } } return haveTheLock; }
解锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public void release () throws Exception { Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData == null ) { throw new IllegalMonitorStateException("You do not own the lock: " + basePath); } int newLockCount = lockData.lockCount.decrementAndGet(); if ( newLockCount > 0 ) { return ; } if ( newLockCount < 0 ) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); } try { internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } }