zk-rw-lock/src/main/java/io/github/ehlxr/zkrwlock/v2/ZkLock.java

188 lines
6.7 KiB
Java
Raw Normal View History

2021-02-04 02:56:12 +00:00
package io.github.ehlxr.zkrwlock.v2;
2021-02-03 10:21:41 +00:00
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.RetryOneTime;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import java.util.List;
import java.util.stream.Collectors;
/**
* zk
2021-02-03 15:38:52 +00:00
* lock lock_01lock_02
* read_00001write_00001
2021-02-03 10:21:41 +00:00
*
* ,
*
* @author ehlxr
*/
public class ZkLock {
private static final String ROOTLOCK = "lock";
protected static final String SERVER = "localhost:2181";
protected static final Integer TIMEOUT = 2000000;
protected static final CuratorFramework ZK_CLIENT;
private final String name;
private final ReadWriteType readWriteType;
public String path;
/**
*
*
* .
*
*/
private Boolean shouldWrite = false;
static {
ZK_CLIENT = CuratorFrameworkFactory.builder()
.connectString(SERVER)
.sessionTimeoutMs(TIMEOUT)
.retryPolicy(new RetryOneTime(10000))
// 命名空间,用该客户端操作的东西都在该节点之下
.namespace(ROOTLOCK)
.build();
ZK_CLIENT.start();
if (ZK_CLIENT.getState() == CuratorFrameworkState.STARTED) {
System.out.println("启动成功");
}
}
public ZkLock(String name, ReadWriteType readWriteType) {
this.name = name;
this.readWriteType = readWriteType;
try {
if (ZK_CLIENT.checkExists().forPath("/" + name) == null) {
ZK_CLIENT.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/" + name);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void lock() throws Exception {
path = ZK_CLIENT.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/" + name + "/" + readWriteType.type);
attemptLock(path);
}
public void unLock() {
try {
ZK_CLIENT.delete()
.deletingChildrenIfNeeded()
.forPath(path);
} catch (Exception e) {
e.printStackTrace();
}
}
protected void attemptLock(String path) throws Exception {
GetChildrenBuilder children = ZK_CLIENT.getChildren();
List<String> list = children.forPath(getPath());
List<String> writeList = list.stream()
.filter(data -> data.contains(ReadWriteType.WRITE.type))
.sorted(String::compareTo)
.collect(Collectors.toList());
List<String> readList = list.stream()
.filter(data -> data.contains(ReadWriteType.READ.type))
.sorted(String::compareTo)
.collect(Collectors.toList());
if (readWriteType == ReadWriteType.READ) {
// 读锁判断最后一个写锁没有了就可以获得锁了
if (writeList.size() == 0) {
// 我是读锁,并且没有写锁,直接获得
2021-02-03 15:38:52 +00:00
// return;
2021-02-03 10:21:41 +00:00
} else {
// 读锁但是有写锁,监听最后一个写锁
String lastPath = writeList.get(writeList.size() - 1);
cirLock(lastPath);
}
} else {
// 写锁,判断自己是不是第一个,如果不是则必须得等到没有
if (writeList.size() == 1) {
// 获取到锁,已经没人获取到读锁了
if (readList.size() == 0 || shouldWrite) {
2021-02-03 15:38:52 +00:00
// return;
2021-02-03 10:21:41 +00:00
} else {
String first = readList.get(0);
cirLock(first);
}
} else {
String name = path.substring(getPath().length() + 1);
if (writeList.lastIndexOf(name) == 0) {
// 获取到锁
if (readList.size() == 0) {
2021-02-03 15:38:52 +00:00
// return;
2021-02-03 10:21:41 +00:00
} else {
String first = readList.get(0);
cirLock(first);
}
} else {
// 只需要监听前一个写锁的释放即可
String lastPath = writeList.get(writeList.lastIndexOf(name) - 1);
cirLock(lastPath);
}
}
}
// 没有写锁,全部都不阻塞
}
protected void cirLock(String lastPath) throws Exception {
// 获得上一个锁对象
NodeCache nodeCache = new NodeCache(ZK_CLIENT, getPath() + "/" + lastPath);
nodeCache.start();
nodeCache.getListenable().addListener(() -> {
ChildData currentData = nodeCache.getCurrentData();
if (currentData == null) {
synchronized (this) {
shouldWrite = true;
notifyAll();
}
}
});
synchronized (this) {
wait(1000);
}
attemptLock(path);
}
public enum ReadWriteType {
/**
*
*/
READ("read_"),
WRITE("write_");
private final String type;
ReadWriteType(String type) {
this.type = type;
}
}
private String getPath() {
return "/" + name;
}
}