Curator框架应用实现Zookeeper分布式锁

CuratorFramework

Curator框架提供了一套高级的API, 简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制。

Curator主要解决了三类问题:

  • 封装ZooKeeper client与ZooKeeper

  • server之间的连接处理 提供了一套Fluent风格的操作API

  • 提供ZooKeeper各种应用场景(recipe, 比如共享锁服务, 集群领导选举机制)的抽象封装

组成部分

  • Client: 是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法
  • Framework: 用来简化ZooKeeper高级功能的使用, 并增加了一些新的功能, 比如管理到ZooKeeper集群的连接, 重试处理 Recipes: 实现了通用ZooKeeper的
  • recipe, 该组件建立在Framework的基础之上
  • Utilities:各种ZooKeeper的工具类
  • Errors: 异常处理, 连接, 恢复等
  • Extensions: recipe扩展

Curator分布式锁

curator提供了四种分布式锁,分别是:

curator的四种锁方案

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

Zookeeper分布式锁实现

InterProcessMutex:分布式可重入排它锁,通过zookeeper节点不能重复的原理来让分布式环境的服务排队处理同一个数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* @Author: Usher
* @Description:
*/
public class ZooKeeperLock {
private static final String connetString = "116.196.113.135:2181";
private static int sessionTimeOut = 2000;
private static int count = 100;

private static void process() {
try {
System.out.println(Thread.currentThread().getName() + ": acquire resource: " + count--);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

//创建工厂连接
final CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(connetString)
.sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).build();

curatorFramework.start();

//创建分布式可重入排他锁,监听客户端为curatorFramework,锁的根节点为/locks
final InterProcessMutex mutex = new InterProcessMutex(curatorFramework, "/lock");
final CountDownLatch countDownLatch = new CountDownLatch(1);

for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
//加锁
mutex.acquire();
process();
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
//释放锁
mutex.release();
System.out.println(Thread.currentThread().getName() + ": release lock");
} catch (Exception e) {
e.printStackTrace();
}
}
}
},"Thread" + i).start();
}

Thread.sleep(100);
countDownLatch.countDown();

}
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
"C:\Program Files\Java\jdk1.8.0_151\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 182.3684.2\lib\idea_rt.jar=10283:C:\Program Files\JetBrains\IntelliJ IDEA 182.3684.2\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_151\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\rt.jar;D:\zookeeper\target\classes;H:\maven\maven_xian\apache-maven-repository\org\apache\curator\curator-framework\2.12.0\curator-framework-2.12.0.jar;H:\maven\maven_xian\apache-maven-repository\org\apache\curator\curator-client\2.12.0\curator-client-2.12.0.jar;H:\maven\maven_xian\apache-maven-repository\org\apache\zookeeper\zookeeper\3.4.8\zookeeper-3.4.8.jar;H:\maven\maven_xian\apache-maven-repository\log4j\log4j\1.2.16\log4j-1.2.16.jar;H:\maven\maven_xian\apache-maven-repository\jline\jline\0.9.94\jline-0.9.94.jar;H:\maven\maven_xian\apache-maven-repository\io\netty\netty\3.7.0.Final\netty-3.7.0.Final.jar;H:\maven\maven_xian\apache-maven-repository\com\google\guava\guava\16.0.1\guava-16.0.1.jar;H:\maven\maven_xian\apache-maven-repository\org\slf4j\slf4j-api\1.7.6\slf4j-api-1.7.6.jar;H:\maven\maven_xian\apache-maven-repository\org\apache\curator\curator-recipes\2.12.0\curator-recipes-2.12.0.jar" com.usher.curator.ZooKeeperLock
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Thread22: acquire resource: 100
Thread22: release lock
Thread55: acquire resource: 99
Thread55: release lock
Thread91: acquire resource: 98
Thread91: release lock
Thread1: acquire resource: 97
Thread1: release lock
Thread88: acquire resource: 96
Thread88: release lock
Thread35: acquire resource: 95
Thread35: release lock
Thread86: acquire resource: 94
Thread86: release lock
Thread37: acquire resource: 93
Thread37: release lock
Thread58: acquire resource: 92
Thread58: release lock
Thread7: acquire resource: 91
Thread7: release lock
Thread94: acquire resource: 90
Thread94: release lock
Thread80: acquire resource: 89

查看Zookeeper节点

Curator相关源码分析

InterProcessMutex源码解读

InterProcessMutex内部有个ConcurrentMap类型的threadData属性,该属性会以线程对象为键,线程对应的LcokData对象为值,记录每个锁的相关信息。在new一个InterProcessMutex实例时,其构造器主要是为threadData进行Map初始化,校验锁的根节点的合法性并使用basePath属性记录,此外还会实例化一个LockInternals对象由属性internals引用,LockInternalsInterProcessMutex加锁的核心。

加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// InterProcessMutex.class
public void acquire() throws Exception {
//acquire内部就直接internalLock方法,传了-1的等待时间
if (!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}

public boolean acquire(long time, TimeUnit unit) throws Exception {
return this.internalLock(time, unit);
}
/**
*internalLock方法首先判断是否是重入锁,通过ConcurrentMap维护线程和一个原子计数器, *非重入锁的话,再通过attemptLock去获取锁,该方法返回一个锁对应节点的路径
*若该路径不为空,代表当前线程获得到了锁,然后为当前线程创建对应的LcokData并记录进 *threadData中。
*/
private boolean internalLock(long time, TimeUnit unit) throws Exception {
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
if (lockData != null) {
// 锁的可重入性
lockData.lockCount.incrementAndGet();
return true;
} else {
// 加锁并返回锁节点
String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
if (lockPath != null) {
InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
this.threadData.put(currentThread, newLockData);
return true;
} else {
return false;
}
}
}

竞争锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// LockInternals.class
//attemptLock在这里进行循环等待,循环中的异常捕捉中是根据客户端的重试策略进行重试。createsTheLock方法去创建节点,internalLockLoop去判断当前节点是否是最小节点
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
long startMillis = System.currentTimeMillis();
Long millisToWait = unit != null ? unit.toMillis(time) : null;
byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;

while(!isDone) {
isDone = true;
try {
// 创建锁节点
ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
// 竞争锁
//只有该方法返回true时,attemptLock()才会返回锁节点路径,才会加锁成功
//锁的竞争实现是由internalLockLoop进行
hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
} catch (NoNodeException var14) {
if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,
System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
throw var14;
}

isDone = false;
}
}

return hasTheLock ? ourPath : null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//createsTheLock就是调用curator封装的api去创建临时有序节点
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
//internalLockLoop锁判断,内部就是driver.getsTheLock去判断是否是当前目录下最小节点,如果是的话,返回获取锁成功,否则的话对previousSequencePath进行监听,监听动作完成后再对等待时间进行重新判断
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}

while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
// 获取所有子节点
List<String> children = getSortedChildren();
// 获取当前锁节点
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
// 使用锁驱动加锁
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
// 阻塞等待上一个锁释放
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
// 已经超时,设置删除节点标识
doDelete = true; // timed out - delete our node
break;
}
// 根据时间设置阻塞时间
wait(millisToWait);
}
else
{// 未设置超时一直阻塞
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )// 删除已超时的锁节点
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}

解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//解锁相对来说不难理解,先判断map里面是否存在当前线程的锁计数,不存在抛出异常,如果存在,进行原子减一操作,releaseLock内部就是删除节点操作,小于0的时候,从map里面移除
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/

Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}

int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{
internals.releaseLock(lockData.lockPath);
}
finally
{
threadData.remove(currentThread);
}
}