Zookeeper实现分布式锁

Zookeeper节点

节点是zookeeper中数据存储的基础结构,zk的数据模型就是基于好多个节点的树结构,但zk规定每个节点的引用规则是路径引用。每个节点中包含子节点引用、存储数据、访问权限以及节点元数据等四部分。 zookeeper对于自身节点的监听者提供事件通知功能 。

节点类型

Zookeeper中提供了四种类型的节点,各种类型节点及其区别如下:

  • 持久节点(PERSISTENT):节点创建后,就一直存在,直到有删除操作来主动清除这个节点
  • 持久顺序节点(PERSISTENT_SEQUENTIAL):保留持久节点的特性,额外的特性是,每个节点会为其第一层子节点维护一个顺序,记录每个子节点创建的先后顺序,ZK会自动为给定节点名加上一个数字后缀(自增的),作为新的节点名。
  • 临时节点(EPHEMERAL):和持久节点不同的是,临时节点的生命周期和客户端会话绑定,当然也可以主动删除。
  • 临时顺序节点(EPHEMERAL_SEQUENTIAL):保留临时节点的特性,额外的特性如持久顺序节点的额外特性。

zookeeper具备了实现分布式锁的基础条件:多进程共享、可以存储锁信息、有主动通知的机制。

实现机制

1.在某父节点下创建临时有序节点

2.判断创建的节点是否是当前父节点下所有子节点中序号最小的

3.是序号最小的成功获取锁,否则监听比自己小的那个节点,进行watch,当该节点被删除的时候通知当前节点,重新获取锁

4.解锁的时候删除当前节点

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
* @Author: Usher
* @Description:
*/
public class ZkClientLock {

private ZkClient zkClient;
private String name;
private String curLockPath;// 当前锁
private CountDownLatch countDownLatch;
private static final String PARENT_LOCK_PATH = "/ZkClientLock"; // 锁的根节点

private ZkClientLock(ZkClient zkClient, String name) {
this.zkClient = zkClient;
this.name = name;
}

/**
* 加锁方法,先尝试加锁,不能加锁则等待上一个锁的释放
*/
private void lock() {
//判断父节点是否存在,不存在就创建
if (!zkClient.exists(PARENT_LOCK_PATH)) {
try {
zkClient.createPersistent(PARENT_LOCK_PATH);
} catch (Exception e) {
e.printStackTrace();

}
}
//创建当前目录下的临时有序节点
curLockPath = zkClient.createEphemeralSequential(PARENT_LOCK_PATH + "/", System.currentTimeMillis());
//校验是否最小节点,如果是,成功获取锁,否则的话拿比自己小的节点,并做监听
checkMinNode(curLockPath);
}

private void unLock() {
System.out.println("Delete: " + curLockPath + ": unLock");
zkClient.delete(curLockPath);
}


/**
*
* @param curLockPath
* @return
*/
private void checkMinNode(String curLockPath) {
//获取当前目录下所有子节点
List<String> children = zkClient.getChildren(PARENT_LOCK_PATH);
Collections.sort(children);

int index = children.indexOf(curLockPath.substring(PARENT_LOCK_PATH.length() + 1));

if (index == 0) {
System.out.println(name + ":lock success");
if (countDownLatch != null) {
countDownLatch.countDown();
}
} else {
// 加锁失败,设置前一节点为等待锁节点
String prevNodePath = PARENT_LOCK_PATH + "/" + children.get(index - 1);
//等待前一个节点释放的监听
waitForLock(prevNodePath);
}
}
//waitForLock等待比自己小的节点,subscribeDataChanges监听一个节点的变化,handleDataDeleted里面再次做checkMinNode的判断
private void waitForLock(String prevNodePath) {
System.out.println(name + " current path : " + curLockPath + ":fail add listener" + " wait path :" + prevNodePath);
// 设置计数器,使用计数器阻塞线程
countDownLatch = new CountDownLatch(1);
// 监听等待锁节点
zkClient.subscribeDataChanges(prevNodePath, new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) {

}

@Override
public void handleDataDeleted(String s) {
System.out.println("prevNodePath node is done");
checkMinNode(curLockPath);
}
});

//监听完毕后,再判断一次此节点是否存在,因为在监听的过程中有可能之前小的那个节点重新释放了锁,如果之前节点不存在的话,无需在这里等待,
if (!zkClient.exists(prevNodePath)) {
return;
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch = null;
}

public static void main(String[] args) {
// 超时时间
int sessionTimeout = 30000;
String ip = "116.196.113.135:2181";
final ZkClient zk = new ZkClient(ip, sessionTimeout);

for (int i = 0; i < 10; i++) {
final String s = "Thread" + i;

new Thread(new Runnable() {
@Override
public void run() {
ZkClientLock zkClientLock = new ZkClientLock(zk, s);
zkClientLock.lock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
zkClientLock.unLock();
}
},s).start();
}
}

}

从输出结果中可以看出,每个线程只有在上一个节点被删除后才能执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"C:\Program Files\Java\jdk1.8.0_151\bin\java.exe" "-javaagent:J:\IntelliJ IDEA 2018.2.4\lib\idea_rt.jar=11542:J:\IntelliJ IDEA 2018.2.4\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_151\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_151\jre\lib\rt.jar;D:\zookeeper\target\classes;H:\maven\maven_xian\apache-maven-repository\org\apache\curator\curator-framework\2.12.0\curator-framework-2.12.0.jar;H:\maven\maven_xian\apache-maven-repository\org\apache\curator\curator-client\2.12.0\curator-client-2.12.0.jar;H:\maven\maven_xian\apache-maven-repository\com\google\guava\guava\16.0.1\guava-16.0.1.jar;H:\maven\maven_xian\apache-maven-repository\org\slf4j\slf4j-api\1.7.6\slf4j-api-1.7.6.jar;H:\maven\maven_xian\apache-maven-repository\org\apache\curator\curator-recipes\2.12.0\curator-recipes-2.12.0.jar;H:\maven\maven_xian\apache-maven-repository\com\101tec\zkclient\0.2\zkclient-0.2.jar;H:\maven\maven_xian\apache-maven-repository\org\apache\zookeeper\zookeeper\3.3.1\zookeeper-3.3.1.jar;H:\maven\maven_xian\apache-maven-repository\jline\jline\0.9.94\jline-0.9.94.jar;H:\maven\maven_xian\apache-maven-repository\log4j\log4j\1.2.14\log4j-1.2.14.jar" com.usher.ZkClient.ZkClientLock
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkEventThread).
log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN Please initialize the log4j system properly.
Thread1:lock success
Thread4 current path : /ZkClientLock/0000000039:fail add listener wait path :/ZkClientLock/0000000038
Thread5 current path : /ZkClientLock/0000000031:fail add listener wait path :/ZkClientLock/0000000030
Thread7 current path : /ZkClientLock/0000000038:fail add listener wait path :/ZkClientLock/0000000037
Thread6 current path : /ZkClientLock/0000000037:fail add listener wait path :/ZkClientLock/0000000036
Thread2 current path : /ZkClientLock/0000000036:fail add listener wait path :/ZkClientLock/0000000035
Thread0 current path : /ZkClientLock/0000000035:fail add listener wait path :/ZkClientLock/0000000034
Thread3 current path : /ZkClientLock/0000000034:fail add listener wait path :/ZkClientLock/0000000033
Thread9 current path : /ZkClientLock/0000000033:fail add listener wait path :/ZkClientLock/0000000032
Thread8 current path : /ZkClientLock/0000000032:fail add listener wait path :/ZkClientLock/0000000031
Delete: /ZkClientLock/0000000030: unLock
prevNodePath:/ZkClientLock/0000000030 node is done
Thread5:lock success
Delete: /ZkClientLock/0000000031: unLock
prevNodePath:/ZkClientLock/0000000031 node is done
Thread8:lock success
Delete: /ZkClientLock/0000000032: unLock
prevNodePath:/ZkClientLock/0000000032 node is done
Thread9:lock success
Delete: /ZkClientLock/0000000033: unLock
prevNodePath:/ZkClientLock/0000000033 node is done
Thread3:lock success
Delete: /ZkClientLock/0000000034: unLock
prevNodePath:/ZkClientLock/0000000034 node is done
Thread0:lock success
Delete: /ZkClientLock/0000000035: unLock
prevNodePath:/ZkClientLock/0000000035 node is done
Thread2:lock success
Delete: /ZkClientLock/0000000036: unLock
prevNodePath:/ZkClientLock/0000000036 node is done
Thread6:lock success
Delete: /ZkClientLock/0000000037: unLock
prevNodePath:/ZkClientLock/0000000037 node is done
Thread7:lock success
Delete: /ZkClientLock/0000000038: unLock
prevNodePath:/ZkClientLock/0000000038 node is done
Thread4:lock success
Delete: /ZkClientLock/0000000039: unLock

Process finished with exit code 0