diff --git a/did-common/pom.xml b/did-common/pom.xml
index 7b0b63c..781cb46 100644
--- a/did-common/pom.xml
+++ b/did-common/pom.xml
@@ -23,13 +23,4 @@
logback-classic
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
-
-
diff --git a/did-sdk/pom.xml b/did-sdk/pom.xml
index 968196d..1fb51e7 100644
--- a/did-sdk/pom.xml
+++ b/did-sdk/pom.xml
@@ -27,10 +27,6 @@
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
org.apache.maven.plugins
maven-shade-plugin
diff --git a/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java b/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java
index 31b3bb4..bbee449 100644
--- a/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java
+++ b/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java
@@ -1,7 +1,7 @@
package cn.ceres.did.client;
-import cn.ceres.did.sdk.SdkProto;
import cn.ceres.did.common.NettyUtil;
+import cn.ceres.did.sdk.SdkProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author ehlxr
*/
public abstract class AbstractClient implements Client {
- private Logger logger = LoggerFactory.getLogger(AbstractClient.class);
+ private final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
private final Semaphore asyncSemaphore = new Semaphore(100000);
private final Semaphore onewaySemaphore = new Semaphore(100000);
@@ -30,7 +30,7 @@ public abstract class AbstractClient implements Client {
public void init() {
asyncResponse = new ConcurrentHashMap<>(16);
workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() {
- private AtomicInteger index = new AtomicInteger(0);
+ private final AtomicInteger index = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
@@ -57,20 +57,17 @@ public abstract class AbstractClient implements Client {
try {
final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, null, null);
asyncResponse.put(rqid, responseFuture);
- channel.writeAndFlush(sdkProto).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) {
- if (channelFuture.isSuccess()) {
- //发送成功后立即跳出
- responseFuture.setIsSendStateOk(true);
- return;
- }
- // 代码执行到此说明发送失败,需要释放资源
- asyncResponse.remove(rqid);
- responseFuture.putResponse(null);
- responseFuture.setCause(channelFuture.cause());
- logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.");
+ channel.writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> {
+ if (channelFuture.isSuccess()) {
+ //发送成功后立即跳出
+ responseFuture.setIsSendStateOk(true);
+ return;
}
+ // 代码执行到此说明发送失败,需要释放资源
+ asyncResponse.remove(rqid);
+ responseFuture.putResponse(null);
+ responseFuture.setCause(channelFuture.cause());
+ logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.");
});
// 阻塞等待响应
SdkProto resultProto = responseFuture.waitResponse(timeoutMillis);
@@ -104,27 +101,24 @@ public abstract class AbstractClient implements Client {
final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, invokeCallback, asyncSemaphore);
asyncResponse.put(rqid, responseFuture);
try {
- channelFuture.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) {
- if (channelFuture.isSuccess()) {
- responseFuture.setIsSendStateOk(true);
- return;
- }
- // 代码执行到些说明发送失败,需要释放资源
- asyncResponse.remove(rqid);
- responseFuture.setCause(channelFuture.cause());
- responseFuture.putResponse(null);
-
- try {
- responseFuture.executeInvokeCallback();
- } catch (Exception e) {
- logger.warn("excute callback in writeAndFlush addListener, and callback throw", e);
- } finally {
- responseFuture.release();
- }
- logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.", channelFuture.cause());
+ channelFuture.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> {
+ if (channelFuture.isSuccess()) {
+ responseFuture.setIsSendStateOk(true);
+ return;
}
+ // 代码执行到些说明发送失败,需要释放资源
+ asyncResponse.remove(rqid);
+ responseFuture.setCause(channelFuture.cause());
+ responseFuture.putResponse(null);
+
+ try {
+ responseFuture.executeInvokeCallback();
+ } catch (Exception e) {
+ logger.warn("excute callback in writeAndFlush addListener, and callback throw", e);
+ } finally {
+ responseFuture.release();
+ }
+ logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.", channelFuture.cause());
});
} catch (Exception e) {
responseFuture.release();
@@ -151,13 +145,10 @@ public abstract class AbstractClient implements Client {
boolean acquired = onewaySemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
try {
- channelFuture.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) {
- onewaySemaphore.release();
- if (!channelFuture.isSuccess()) {
- logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.");
- }
+ channelFuture.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> {
+ onewaySemaphore.release();
+ if (!channelFuture.isSuccess()) {
+ logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.");
}
});
} catch (Exception e) {
@@ -167,7 +158,8 @@ public abstract class AbstractClient implements Client {
asyncResponse.remove(rqid);
}
} else {
- String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits());
+ String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d",
+ timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits());
logger.warn(info);
throw new Exception(info);
}
diff --git a/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java b/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java
index e9ce020..8d3d155 100644
--- a/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java
+++ b/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java
@@ -1,10 +1,10 @@
package cn.ceres.did.client;
+import cn.ceres.did.common.Constants;
+import cn.ceres.did.common.NettyUtil;
import cn.ceres.did.sdk.SdkClientDecoder;
import cn.ceres.did.sdk.SdkClientEncoder;
import cn.ceres.did.sdk.SdkProto;
-import cn.ceres.did.common.Constants;
-import cn.ceres.did.common.NettyUtil;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
@@ -17,7 +17,7 @@ import java.net.InetSocketAddress;
* @author ehlxr
*/
public class SdkClient extends AbstractClient {
- private Logger logger = LoggerFactory.getLogger(SdkClient.class);
+ private final Logger logger = LoggerFactory.getLogger(SdkClient.class);
private String host;
private int port;
@@ -47,12 +47,9 @@ public class SdkClient extends AbstractClient {
try {
channelFuture = bootstrap.connect((host == null || "".equals(host)) ? Constants.DEFAULT_HOST : host, port == 0 ? Constants.SDKS_PORT : port).sync();
- channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) {
- logger.warn("client channel close.", channelFuture.cause());
- shutdown();
- }
+ channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> {
+ logger.warn("client channel close.", channelFuture.cause());
+ shutdown();
});
InetSocketAddress address = (InetSocketAddress) channelFuture.channel().remoteAddress();
diff --git a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java b/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java
index 22303a4..cf35525 100644
--- a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java
+++ b/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java
@@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author ehlxr
*/
public class SdkProto {
- private static final AtomicInteger requestId = new AtomicInteger(0);
+ private static final AtomicInteger REQUEST_ID = new AtomicInteger(0);
/**
* 请求的ID
@@ -18,7 +18,7 @@ public class SdkProto {
private long did;
public SdkProto() {
- rqid = requestId.incrementAndGet();
+ rqid = REQUEST_ID.incrementAndGet();
did = 0;
}
diff --git a/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java b/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java
index ed444c5..ebddf64 100644
--- a/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java
+++ b/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java
@@ -1,7 +1,5 @@
package cn.ceres.did;
-import cn.ceres.did.client.InvokeCallback;
-import cn.ceres.did.client.ResponseFuture;
import cn.ceres.did.client.SdkClient;
import cn.ceres.did.sdk.SdkProto;
import org.junit.Test;
@@ -36,13 +34,10 @@ public class DidSdkTest {
for (int i = 0; i < NUM; i++) {
final SdkProto sdkProto = new SdkProto();
final int finalI = i;
- client.invokeAsync(sdkProto, 2000, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- System.out.println(finalI + " sendProto: " + sdkProto.toString());
- countDownLatch.countDown();
- System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto().toString());
- }
+ client.invokeAsync(sdkProto, 2000, responseFuture -> {
+ System.out.println(finalI + " sendProto: " + sdkProto.toString());
+ countDownLatch.countDown();
+ System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto().toString());
});
}
countDownLatch.await(10, TimeUnit.SECONDS);
diff --git a/did-server/pom.xml b/did-server/pom.xml
index f55a0ce..0b27395 100644
--- a/did-server/pom.xml
+++ b/did-server/pom.xml
@@ -23,10 +23,6 @@
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
org.apache.maven.plugins
maven-shade-plugin
diff --git a/did-server/src/main/java/cn/ceres/did/ServerStarter.java b/did-server/src/main/java/cn/ceres/did/ServerStarter.java
index 71a9ca4..7f0f7f6 100644
--- a/did-server/src/main/java/cn/ceres/did/ServerStarter.java
+++ b/did-server/src/main/java/cn/ceres/did/ServerStarter.java
@@ -1,9 +1,9 @@
package cn.ceres.did;
+import cn.ceres.did.common.Constants;
import cn.ceres.did.core.SnowFlake;
import cn.ceres.did.server.http.HttpServer;
import cn.ceres.did.server.sdk.SdkServer;
-import cn.ceres.did.common.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,13 +40,10 @@ public class ServerStarter {
sdkServer.init();
sdkServer.start();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- httpServer.shutdown();
- sdkServer.shutdown();
- System.exit(0);
- }
- });
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ httpServer.shutdown();
+ sdkServer.shutdown();
+ System.exit(0);
+ }));
}
}
diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java
index f61e5c9..cb677b9 100644
--- a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java
+++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java
@@ -1,8 +1,8 @@
package cn.ceres.did.server.sdk;
import cn.ceres.did.common.Constants;
-import cn.ceres.did.server.BaseServer;
import cn.ceres.did.core.SnowFlake;
+import cn.ceres.did.server.BaseServer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
@@ -18,7 +18,7 @@ public class SdkServer extends BaseServer {
public SdkServer(SnowFlake snowFlake) {
this.snowFlake = snowFlake;
- this.port = "".equals(Constants.getEnv("SDKS_PORT")) ? Constants.SDKS_PORT : Integer.valueOf(Constants.getEnv("SDKS_PORT"));
+ this.port = "".equals(Constants.getEnv("SDKS_PORT")) ? Constants.SDKS_PORT : Integer.parseInt(Constants.getEnv("SDKS_PORT"));
}
@Override
diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java
index 8c6ca60..4083543 100644
--- a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java
+++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java
@@ -3,7 +3,10 @@ package cn.ceres.did.server.sdk;
import cn.ceres.did.common.Constants;
import cn.ceres.did.common.NettyUtil;
import cn.ceres.did.core.SnowFlake;
-import io.netty.channel.*;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,12 +37,7 @@ public class SdkServerHandler extends SimpleChannelInboundHandler {
if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) {
try {
sdkProto.setDid(snowFlake.nextId());
- ctx.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) {
- semaphore.release();
- }
- });
+ ctx.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> semaphore.release());
} catch (Exception e) {
semaphore.release();
logger.error("SdkServerhandler error", e);
diff --git a/pom.xml b/pom.xml
index e5438ce..ebeecbd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,22 +16,34 @@
did
+
+ UTF-8
+ UTF-8
+
+ 1.8
+ 1.8
+
+ 4.1.6.Final
+ 1.1.7
+ 4.11
+
+
io.netty
netty-all
- 4.1.6.Final
+ ${netty.version}
ch.qos.logback
logback-classic
- 1.1.7
+ ${logback.version}
junit
junit
- 4.11
+ ${junit.version}
test
@@ -40,15 +52,6 @@
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.5.1
-
-
- 1.7
-
-
org.apache.maven.plugins
maven-shade-plugin