commit 4e43ab53e994276f1d1744f09814261fb23c7098 Author: ehlxr Date: Wed Feb 3 18:21:41 2021 +0800 update at 2021-02-03 18:21:41 by ehlxr diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..8569dd1 --- /dev/null +++ b/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.4.2 + + + io.github.ehlxr + zk-rw-lock + 0.0.1-SNAPSHOT + zk-rw-lock + Demo project for Spring Boot + + 1.8 + 2020.0.0 + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.cloud + spring-cloud-starter-zookeeper-config + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + spring-milestones + Spring Milestones + https://repo.spring.io/milestone + + + + diff --git a/src/main/java/io/github/ehlxr/zkrwlock/ZkRwLockApplication.java b/src/main/java/io/github/ehlxr/zkrwlock/ZkRwLockApplication.java new file mode 100644 index 0000000..fb44848 --- /dev/null +++ b/src/main/java/io/github/ehlxr/zkrwlock/ZkRwLockApplication.java @@ -0,0 +1,13 @@ +package io.github.ehlxr.zkrwlock; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ZkRwLockApplication { + + public static void main(String[] args) { + SpringApplication.run(ZkRwLockApplication.class, args); + } + +} diff --git a/src/main/java/io/github/ehlxr/zkrwlock/controller/Controller.java b/src/main/java/io/github/ehlxr/zkrwlock/controller/Controller.java new file mode 100644 index 0000000..1cad2fd --- /dev/null +++ b/src/main/java/io/github/ehlxr/zkrwlock/controller/Controller.java @@ -0,0 +1,52 @@ +package io.github.ehlxr.zkrwlock.controller; + + +import io.github.ehlxr.zkrwlock.lockv2.ZkLock; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@SuppressWarnings("all") +public class Controller { + + String lockName = "test"; + /** + * ---------------非公平锁,重制命令每次执行都会优先于读取命令 + */ + @GetMapping("/test03") + public void test03() throws InterruptedException { + for (int i = 0; i < 100; i++) { + Thread.sleep(1000); + new Thread(()->{ + ZkLock lock = new ZkLock(lockName, ZkLock.ReadWriteType.READ); + + try { + lock.lock(); + Thread.sleep(1000); + System.out.println("读请求,耗时50毫秒"); + }catch (Exception e){ + + }finally { + lock.unLock(); + } + + }).start(); + } + } + @GetMapping("/test04") + public void test04() throws Exception { + ZkLock lock = new ZkLock(lockName, ZkLock.ReadWriteType.WRITE); + + lock.lock(); + System.out.println("写请求"); + Thread.sleep(2000); + lock.unLock(); + } + + + @GetMapping("/test05") + public void test05(){ + + + } +} diff --git a/src/main/java/io/github/ehlxr/zkrwlock/lock/ReadWriteLock.java b/src/main/java/io/github/ehlxr/zkrwlock/lock/ReadWriteLock.java new file mode 100644 index 0000000..3683afa --- /dev/null +++ b/src/main/java/io/github/ehlxr/zkrwlock/lock/ReadWriteLock.java @@ -0,0 +1,79 @@ +package io.github.ehlxr.zkrwlock.lock; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + + +/** + * 写锁优先 + */ +@SuppressWarnings("all") +public class ReadWriteLock extends ZkLock { + private static final String READ_WRITE_NODE = "lock_"; + + // 有写锁的时候读锁等待,写锁永远都可以插队 + public static String READ = "read_"; + public static String WRITE = "write_"; + + // 当前锁的状态 + private String state; + + public ReadWriteLock(String state) throws Exception { + this.state = state; + + + // 1. 创建临时有序节点 + path = zooKeeper.create(READ_WRITE_LOCK_PATH + "/" + READ_WRITE_NODE + state, "".getBytes() + , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + + // 2. 获得路径下所有节点 + + attemptLock(path); + + } + + @Override + protected void attemptLock(String path) throws Exception { + List list = zooKeeper.getChildren(READ_WRITE_LOCK_PATH, false); + List readList = list.stream().filter(data -> data.contains(READ)).collect(Collectors.toList()); + List writeList = list.stream().filter(data -> data.contains(WRITE)).collect(Collectors.toList()); + Collections.sort(readList); + Collections.sort(writeList); + if (READ.equals(state)) { + // 读锁的获取方式必须是前面没有写锁,并且自己是第一个,否则阻塞 + if (writeList.size() == 0) { + + int index = readList.indexOf(path.substring(READ_WRITE_LOCK_PATH.length() + 1)); + if (index == 0) { +// System.out.println("获取到锁对象"); + return; + } else { + cirLock(path, readList, index); + + } + } else { + // 有写锁,需要监听最后一个写锁 + cirLock(path, writeList, writeList.size()); + + } + } else { + // 写锁的获取方式是前面没有写锁 + int index = writeList.indexOf(path.substring(READ_WRITE_LOCK_PATH.length() + 1)); + if (index == 0) { +// System.out.println("写锁获取到锁对象"); + return; + } else { + // 监听上一个读锁 + cirLock(path, writeList, index); + } + + } + + } + + +} diff --git a/src/main/java/io/github/ehlxr/zkrwlock/lock/ZkLock.java b/src/main/java/io/github/ehlxr/zkrwlock/lock/ZkLock.java new file mode 100644 index 0000000..475ab8b --- /dev/null +++ b/src/main/java/io/github/ehlxr/zkrwlock/lock/ZkLock.java @@ -0,0 +1,132 @@ +package io.github.ehlxr.zkrwlock.lock; + +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +public abstract class ZkLock implements Closeable { + + + + private static final String ROOTLOCK = "/lock"; + protected static final String READ_WRITE_LOCK_PATH = "/lock/readWriteLock"; + + protected static final String SERVER = "EUREKA01:2181"; + + protected static final Integer TIMEOUT= 20000; + + protected static ZooKeeper zooKeeper; + + // 如果断开链接了,就需要全部暂停等待zk锁从新链接成功 + private static final Object reconnectLock = new Object(); + + protected String path ; + + Watcher watcher = event -> { + if (event.getType() == Watcher.Event.EventType.None) { + if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { + System.out.println("重新连接成功!"); + synchronized (reconnectLock){ + reconnectLock.notifyAll(); + } + } + } + + }; + static { + try { + zooKeeper = new ZooKeeper(SERVER, TIMEOUT,event -> { + if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { + System.out.println("重新连接成功!"); + synchronized (reconnectLock){ + reconnectLock.notifyAll(); + } + } + }); + + }catch (Exception e){ + System.out.println("创建zk失败"); + } + } + public ZkLock() throws Exception { + + init(); + } + + public void init() throws Exception{ + try { + // 创建持久节点 /lock + if (zooKeeper.exists(ROOTLOCK, false) == null){ + zooKeeper.create(ROOTLOCK, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + // 创建持久节点 /lock/unfairLock + if (zooKeeper.exists(READ_WRITE_LOCK_PATH, false) == null){ + zooKeeper.create(READ_WRITE_LOCK_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + }catch (Exception e){ + System.out.println(e); + if (e instanceof KeeperException.ConnectionLossException){ + // zookeeper 服务端断开连接,等待重新链接 + synchronized (reconnectLock){ + reconnectLock.wait(); + } + + init(); + } + } + } + + @Override + public void close() throws IOException { + try { + zooKeeper.delete(path, -1); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (KeeperException e) { + e.printStackTrace(); + } + } + + + protected void cirLock(String path, List list, int index) throws Exception { + // 监听上一个读锁 + String lastPath = list.get(index - 1); + Stat stat = zooKeeper.exists(READ_WRITE_LOCK_PATH + "/" + lastPath, event -> { + // KeeperState DisConnected Exipred 发生,临时节点可能也会被删除 + if (event.getType() == Watcher.Event.EventType.NodeDeleted) { + synchronized (this) { + this.notify(); + } + } + if (event.getState() == Watcher.Event.KeeperState.Disconnected || + event.getState() == Watcher.Event.KeeperState.Expired){ + System.out.println("掉线了,重新链接"); + try { + zooKeeper = new ZooKeeper(SERVER, TIMEOUT,watcher); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + if (stat == null) { + // 上一个节点消失了,再次重新获取锁 + attemptLock(path); + } else { + // 阻塞,等待锁释放 + synchronized (this) { + this.wait(); + } + attemptLock(path); + } + } + + + + protected abstract void attemptLock(String path) throws Exception; + + +} diff --git a/src/main/java/io/github/ehlxr/zkrwlock/lockv2/ZkLock.java b/src/main/java/io/github/ehlxr/zkrwlock/lockv2/ZkLock.java new file mode 100644 index 0000000..a8ebfa7 --- /dev/null +++ b/src/main/java/io/github/ehlxr/zkrwlock/lockv2/ZkLock.java @@ -0,0 +1,198 @@ +package io.github.ehlxr.zkrwlock.lockv2; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.GetChildrenBuilder; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.retry.RetryOneTime; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * zk 实现读写锁 + * 实现效果为: 在lock 节点下创建 自定义锁资源, lock_01 , lock_02 等 + * 锁资源下为有序临时节点 分为读节点和写节点 read_00001 write_00001 + * 获取读锁的方式为,锁资源下没有写节点,如果有则监听最后一个,读锁之间不会相互竞争 + * 获取写锁的方式也是写锁下没有最后一个节点,并且当前有读锁的时候需要监听当前读锁的结束,写锁之间会相互竞争 + * + * @author ehlxr + */ +public class ZkLock { + private static final String ROOTLOCK = "lock"; + protected static final String SERVER = "localhost:2181"; + protected static final Integer TIMEOUT = 2000000; + protected static final CuratorFramework ZK_CLIENT; + private final String name; + private final ReadWriteType readWriteType; + public String path; + /** + * 是否可以获取写锁的标志位,获取写锁的条件是 + * 处于写锁的第一个,并且当前没有读锁正在读取写锁的第一个可以通过有序数组排序,没有读锁则得通过监听最老的读锁释放之后,修改这个值 + * 这个标志为同样可以监听第二个写锁监听结束后变成第一个写锁的情况. + * 判断是否可以获得写锁的标志就是要么 是 写锁的第一个要么就是上一个监听的回掉生效了 + */ + private Boolean shouldWrite = false; + + static { + ZK_CLIENT = CuratorFrameworkFactory.builder() + // IP 地址 + 端口号,多个用逗号隔开 + .connectString(SERVER) + // 会话超时时间 + .sessionTimeoutMs(TIMEOUT) + // 重连机制 + .retryPolicy(new RetryOneTime(10000)) + // 命名空间,用该客户端操作的东西都在该节点之下 + .namespace(ROOTLOCK) + .build(); + + ZK_CLIENT.start(); + + if (ZK_CLIENT.getState() == CuratorFrameworkState.STARTED) { + System.out.println("启动成功"); + } + } + + public ZkLock(String name, ReadWriteType readWriteType) { + this.name = name; + this.readWriteType = readWriteType; + + try { + if (ZK_CLIENT.checkExists().forPath("/" + name) == null) { + ZK_CLIENT.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) + .forPath("/" + name); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void lock() throws Exception { + path = ZK_CLIENT.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) + .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) + .forPath("/" + name + "/" + readWriteType.type); + + + attemptLock(path); + } + + public void unLock() { + try { + ZK_CLIENT.delete() + .deletingChildrenIfNeeded() + .forPath(path); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + protected void attemptLock(String path) throws Exception { + GetChildrenBuilder children = ZK_CLIENT.getChildren(); + List list = children.forPath(getPath()); + + List writeList = list.stream() + .filter(data -> data.contains(ReadWriteType.WRITE.type)) + .sorted(String::compareTo) + .collect(Collectors.toList()); + + List readList = list.stream() + .filter(data -> data.contains(ReadWriteType.READ.type)) + .sorted(String::compareTo) + .collect(Collectors.toList()); + + + if (readWriteType == ReadWriteType.READ) { + // 读锁判断最后一个写锁没有了就可以获得锁了 + if (writeList.size() == 0) { + // 我是读锁,并且没有写锁,直接获得 + return; + } else { + // 读锁但是有写锁,监听最后一个写锁 + String lastPath = writeList.get(writeList.size() - 1); + cirLock(lastPath); + } + } else { + // 写锁,判断自己是不是第一个,如果不是则必须得等到没有 + if (writeList.size() == 1) { + // 获取到锁,已经没人获取到读锁了 + if (readList.size() == 0 || shouldWrite) { + + return; + } else { + String first = readList.get(0); + cirLock(first); + } + } else { + String name = path.substring(getPath().length() + 1); + if (writeList.lastIndexOf(name) == 0) { + // 获取到锁 + if (readList.size() == 0) { + return; + } else { + String first = readList.get(0); + cirLock(first); + } + } else { + // 只需要监听前一个写锁的释放即可 + String lastPath = writeList.get(writeList.lastIndexOf(name) - 1); + cirLock(lastPath); + } + } + } + + // 没有写锁,全部都不阻塞 + + } + + protected void cirLock(String lastPath) throws Exception { + // 获得上一个锁对象 + + NodeCache nodeCache = new NodeCache(ZK_CLIENT, getPath() + "/" + lastPath); + + nodeCache.start(); + + nodeCache.getListenable().addListener(() -> { + ChildData currentData = nodeCache.getCurrentData(); + if (currentData == null) { + synchronized (this) { + shouldWrite = true; + notifyAll(); + + } + } + }); + + synchronized (this) { + wait(1000); + } + + attemptLock(path); + } + + public enum ReadWriteType { + /** + * 锁类型 + */ + READ("read_"), + WRITE("write_"); + private final String type; + + ReadWriteType(String type) { + this.type = type; + } + } + + private String getPath() { + return "/" + name; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..d851de0 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,2 @@ +server: + port: 9080 \ No newline at end of file