From d34eddd49200575a883edee0ec4b4d67df82bb16 Mon Sep 17 00:00:00 2001 From: ehlxr Date: Tue, 19 Jan 2021 14:45:49 +0800 Subject: [PATCH] Optimized code --- did-common/pom.xml | 9 --- did-sdk/pom.xml | 4 - .../cn/ceres/did/client/AbstractClient.java | 80 +++++++++---------- .../java/cn/ceres/did/client/SdkClient.java | 15 ++-- .../main/java/cn/ceres/did/sdk/SdkProto.java | 4 +- .../test/java/cn/ceres/did/DidSdkTest.java | 13 +-- did-server/pom.xml | 4 - .../main/java/cn/ceres/did/ServerStarter.java | 15 ++-- .../cn/ceres/did/server/sdk/SdkServer.java | 4 +- .../did/server/sdk/SdkServerHandler.java | 12 ++- pom.xml | 27 ++++--- 11 files changed, 76 insertions(+), 111 deletions(-) diff --git a/did-common/pom.xml b/did-common/pom.xml index 7b0b63c..781cb46 100644 --- a/did-common/pom.xml +++ b/did-common/pom.xml @@ -23,13 +23,4 @@ logback-classic - - - - - org.apache.maven.plugins - maven-compiler-plugin - - - diff --git a/did-sdk/pom.xml b/did-sdk/pom.xml index 968196d..1fb51e7 100644 --- a/did-sdk/pom.xml +++ b/did-sdk/pom.xml @@ -27,10 +27,6 @@ - - org.apache.maven.plugins - maven-compiler-plugin - org.apache.maven.plugins maven-shade-plugin diff --git a/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java b/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java index 31b3bb4..bbee449 100644 --- a/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java +++ b/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java @@ -1,7 +1,7 @@ package cn.ceres.did.client; -import cn.ceres.did.sdk.SdkProto; import cn.ceres.did.common.NettyUtil; +import cn.ceres.did.sdk.SdkProto; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger; * @author ehlxr */ public abstract class AbstractClient implements Client { - private Logger logger = LoggerFactory.getLogger(AbstractClient.class); + private final Logger logger = LoggerFactory.getLogger(AbstractClient.class); private final Semaphore asyncSemaphore = new Semaphore(100000); private final Semaphore onewaySemaphore = new Semaphore(100000); @@ -30,7 +30,7 @@ public abstract class AbstractClient implements Client { public void init() { asyncResponse = new ConcurrentHashMap<>(16); workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() { - private AtomicInteger index = new AtomicInteger(0); + private final AtomicInteger index = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { @@ -57,20 +57,17 @@ public abstract class AbstractClient implements Client { try { final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, null, null); asyncResponse.put(rqid, responseFuture); - channel.writeAndFlush(sdkProto).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) { - if (channelFuture.isSuccess()) { - //发送成功后立即跳出 - responseFuture.setIsSendStateOk(true); - return; - } - // 代码执行到此说明发送失败,需要释放资源 - asyncResponse.remove(rqid); - responseFuture.putResponse(null); - responseFuture.setCause(channelFuture.cause()); - logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed."); + channel.writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> { + if (channelFuture.isSuccess()) { + //发送成功后立即跳出 + responseFuture.setIsSendStateOk(true); + return; } + // 代码执行到此说明发送失败,需要释放资源 + asyncResponse.remove(rqid); + responseFuture.putResponse(null); + responseFuture.setCause(channelFuture.cause()); + logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed."); }); // 阻塞等待响应 SdkProto resultProto = responseFuture.waitResponse(timeoutMillis); @@ -104,27 +101,24 @@ public abstract class AbstractClient implements Client { final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, invokeCallback, asyncSemaphore); asyncResponse.put(rqid, responseFuture); try { - channelFuture.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) { - if (channelFuture.isSuccess()) { - responseFuture.setIsSendStateOk(true); - return; - } - // 代码执行到些说明发送失败,需要释放资源 - asyncResponse.remove(rqid); - responseFuture.setCause(channelFuture.cause()); - responseFuture.putResponse(null); - - try { - responseFuture.executeInvokeCallback(); - } catch (Exception e) { - logger.warn("excute callback in writeAndFlush addListener, and callback throw", e); - } finally { - responseFuture.release(); - } - logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.", channelFuture.cause()); + channelFuture.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> { + if (channelFuture.isSuccess()) { + responseFuture.setIsSendStateOk(true); + return; } + // 代码执行到些说明发送失败,需要释放资源 + asyncResponse.remove(rqid); + responseFuture.setCause(channelFuture.cause()); + responseFuture.putResponse(null); + + try { + responseFuture.executeInvokeCallback(); + } catch (Exception e) { + logger.warn("excute callback in writeAndFlush addListener, and callback throw", e); + } finally { + responseFuture.release(); + } + logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.", channelFuture.cause()); }); } catch (Exception e) { responseFuture.release(); @@ -151,13 +145,10 @@ public abstract class AbstractClient implements Client { boolean acquired = onewaySemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { try { - channelFuture.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) { - onewaySemaphore.release(); - if (!channelFuture.isSuccess()) { - logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed."); - } + channelFuture.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> { + onewaySemaphore.release(); + if (!channelFuture.isSuccess()) { + logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed."); } }); } catch (Exception e) { @@ -167,7 +158,8 @@ public abstract class AbstractClient implements Client { asyncResponse.remove(rqid); } } else { - String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits()); + String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d", + timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits()); logger.warn(info); throw new Exception(info); } diff --git a/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java b/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java index e9ce020..8d3d155 100644 --- a/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java +++ b/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java @@ -1,10 +1,10 @@ package cn.ceres.did.client; +import cn.ceres.did.common.Constants; +import cn.ceres.did.common.NettyUtil; import cn.ceres.did.sdk.SdkClientDecoder; import cn.ceres.did.sdk.SdkClientEncoder; import cn.ceres.did.sdk.SdkProto; -import cn.ceres.did.common.Constants; -import cn.ceres.did.common.NettyUtil; import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; @@ -17,7 +17,7 @@ import java.net.InetSocketAddress; * @author ehlxr */ public class SdkClient extends AbstractClient { - private Logger logger = LoggerFactory.getLogger(SdkClient.class); + private final Logger logger = LoggerFactory.getLogger(SdkClient.class); private String host; private int port; @@ -47,12 +47,9 @@ public class SdkClient extends AbstractClient { try { channelFuture = bootstrap.connect((host == null || "".equals(host)) ? Constants.DEFAULT_HOST : host, port == 0 ? Constants.SDKS_PORT : port).sync(); - channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) { - logger.warn("client channel close.", channelFuture.cause()); - shutdown(); - } + channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> { + logger.warn("client channel close.", channelFuture.cause()); + shutdown(); }); InetSocketAddress address = (InetSocketAddress) channelFuture.channel().remoteAddress(); diff --git a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java b/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java index 22303a4..cf35525 100644 --- a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java +++ b/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java @@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicInteger; * @author ehlxr */ public class SdkProto { - private static final AtomicInteger requestId = new AtomicInteger(0); + private static final AtomicInteger REQUEST_ID = new AtomicInteger(0); /** * 请求的ID @@ -18,7 +18,7 @@ public class SdkProto { private long did; public SdkProto() { - rqid = requestId.incrementAndGet(); + rqid = REQUEST_ID.incrementAndGet(); did = 0; } diff --git a/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java b/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java index ed444c5..ebddf64 100644 --- a/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java +++ b/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java @@ -1,7 +1,5 @@ package cn.ceres.did; -import cn.ceres.did.client.InvokeCallback; -import cn.ceres.did.client.ResponseFuture; import cn.ceres.did.client.SdkClient; import cn.ceres.did.sdk.SdkProto; import org.junit.Test; @@ -36,13 +34,10 @@ public class DidSdkTest { for (int i = 0; i < NUM; i++) { final SdkProto sdkProto = new SdkProto(); final int finalI = i; - client.invokeAsync(sdkProto, 2000, new InvokeCallback() { - @Override - public void operationComplete(ResponseFuture responseFuture) { - System.out.println(finalI + " sendProto: " + sdkProto.toString()); - countDownLatch.countDown(); - System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto().toString()); - } + client.invokeAsync(sdkProto, 2000, responseFuture -> { + System.out.println(finalI + " sendProto: " + sdkProto.toString()); + countDownLatch.countDown(); + System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto().toString()); }); } countDownLatch.await(10, TimeUnit.SECONDS); diff --git a/did-server/pom.xml b/did-server/pom.xml index f55a0ce..0b27395 100644 --- a/did-server/pom.xml +++ b/did-server/pom.xml @@ -23,10 +23,6 @@ - - org.apache.maven.plugins - maven-compiler-plugin - org.apache.maven.plugins maven-shade-plugin diff --git a/did-server/src/main/java/cn/ceres/did/ServerStarter.java b/did-server/src/main/java/cn/ceres/did/ServerStarter.java index 71a9ca4..7f0f7f6 100644 --- a/did-server/src/main/java/cn/ceres/did/ServerStarter.java +++ b/did-server/src/main/java/cn/ceres/did/ServerStarter.java @@ -1,9 +1,9 @@ package cn.ceres.did; +import cn.ceres.did.common.Constants; import cn.ceres.did.core.SnowFlake; import cn.ceres.did.server.http.HttpServer; import cn.ceres.did.server.sdk.SdkServer; -import cn.ceres.did.common.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,13 +40,10 @@ public class ServerStarter { sdkServer.init(); sdkServer.start(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - httpServer.shutdown(); - sdkServer.shutdown(); - System.exit(0); - } - }); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + httpServer.shutdown(); + sdkServer.shutdown(); + System.exit(0); + })); } } diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java index f61e5c9..cb677b9 100644 --- a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java @@ -1,8 +1,8 @@ package cn.ceres.did.server.sdk; import cn.ceres.did.common.Constants; -import cn.ceres.did.server.BaseServer; import cn.ceres.did.core.SnowFlake; +import cn.ceres.did.server.BaseServer; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; @@ -18,7 +18,7 @@ public class SdkServer extends BaseServer { public SdkServer(SnowFlake snowFlake) { this.snowFlake = snowFlake; - this.port = "".equals(Constants.getEnv("SDKS_PORT")) ? Constants.SDKS_PORT : Integer.valueOf(Constants.getEnv("SDKS_PORT")); + this.port = "".equals(Constants.getEnv("SDKS_PORT")) ? Constants.SDKS_PORT : Integer.parseInt(Constants.getEnv("SDKS_PORT")); } @Override diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java index 8c6ca60..4083543 100644 --- a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java @@ -3,7 +3,10 @@ package cn.ceres.did.server.sdk; import cn.ceres.did.common.Constants; import cn.ceres.did.common.NettyUtil; import cn.ceres.did.core.SnowFlake; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,12 +37,7 @@ public class SdkServerHandler extends SimpleChannelInboundHandler { if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) { try { sdkProto.setDid(snowFlake.nextId()); - ctx.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) { - semaphore.release(); - } - }); + ctx.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> semaphore.release()); } catch (Exception e) { semaphore.release(); logger.error("SdkServerhandler error", e); diff --git a/pom.xml b/pom.xml index e5438ce..ebeecbd 100644 --- a/pom.xml +++ b/pom.xml @@ -16,22 +16,34 @@ did + + UTF-8 + UTF-8 + + 1.8 + 1.8 + + 4.1.6.Final + 1.1.7 + 4.11 + + io.netty netty-all - 4.1.6.Final + ${netty.version} ch.qos.logback logback-classic - 1.1.7 + ${logback.version} junit junit - 4.11 + ${junit.version} test @@ -40,15 +52,6 @@ - - org.apache.maven.plugins - maven-compiler-plugin - 3.5.1 - - 1.7 - 1.7 - - org.apache.maven.plugins maven-shade-plugin