From e06dcecd75f94613b43a14d2920959b0bd927ad3 Mon Sep 17 00:00:00 2001 From: ehlxr Date: Tue, 19 Jan 2021 18:39:42 +0800 Subject: [PATCH] Optimized code --- .../cn/ceres/did/client/AbstractClient.java | 14 ++++++++++++++ .../main/java/cn/ceres/did/client/Client.java | 4 ++++ .../java/cn/ceres/did/client/SdkClient.java | 8 +++++--- .../java/cn/ceres/did/DidSdkPressTest.java | 11 ++--------- .../test/java/cn/ceres/did/DidSdkTest.java | 19 ++++++++++++++++--- .../cn/ceres/did/server/http/HttpServer.java | 19 +++++++++---------- .../cn/ceres/did/server/sdk/SdkServer.java | 18 +++++++++--------- .../did/server/sdk/SdkServerHandler.java | 4 +--- pom.xml | 2 +- 9 files changed, 61 insertions(+), 38 deletions(-) diff --git a/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java b/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java index bbee449..26dd13c 100644 --- a/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java +++ b/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java @@ -27,6 +27,16 @@ public abstract class AbstractClient implements Client { ChannelFuture channelFuture; Bootstrap bootstrap; + int timeoutMillis = 2000; + + public int getTimeoutMillis() { + return timeoutMillis; + } + + public void setTimeoutMillis(int timeoutMillis) { + this.timeoutMillis = timeoutMillis; + } + public void init() { asyncResponse = new ConcurrentHashMap<>(16); workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() { @@ -168,4 +178,8 @@ public abstract class AbstractClient implements Client { throw new Exception(NettyUtil.parseRemoteAddr(channel)); } } + + public long invoke() throws Exception { + return invoke(timeoutMillis); + } } diff --git a/did-sdk/src/main/java/cn/ceres/did/client/Client.java b/did-sdk/src/main/java/cn/ceres/did/client/Client.java index e791cc5..3f851c0 100644 --- a/did-sdk/src/main/java/cn/ceres/did/client/Client.java +++ b/did-sdk/src/main/java/cn/ceres/did/client/Client.java @@ -15,4 +15,8 @@ public interface Client { void invokeAsync(SdkProto proto, long timeoutMillis, InvokeCallback invokeCallback) throws Exception; void invokeOneWay(SdkProto proto, long timeoutMillis) throws Exception; + + default long invoke(long timeoutMillis) throws Exception { + return invokeSync(new SdkProto(), timeoutMillis).getDid(); + } } diff --git a/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java b/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java index 8d3d155..088f6a7 100644 --- a/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java +++ b/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java @@ -33,8 +33,8 @@ public class SdkClient extends AbstractClient { public void start() { bootstrap.group(workGroup) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.SO_KEEPALIVE, true) + // .option(ChannelOption.TCP_NODELAY, true) + // .option(ChannelOption.SO_KEEPALIVE, true) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override @@ -46,7 +46,9 @@ public class SdkClient extends AbstractClient { }); try { - channelFuture = bootstrap.connect((host == null || "".equals(host)) ? Constants.DEFAULT_HOST : host, port == 0 ? Constants.SDKS_PORT : port).sync(); + channelFuture = bootstrap.connect((host == null || "".equals(host)) ? Constants.DEFAULT_HOST : host, + port == 0 ? Constants.SDKS_PORT : port).sync(); + channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> { logger.warn("client channel close.", channelFuture.cause()); shutdown(); diff --git a/did-sdk/src/test/java/cn/ceres/did/DidSdkPressTest.java b/did-sdk/src/test/java/cn/ceres/did/DidSdkPressTest.java index 8331ca4..af55c2c 100644 --- a/did-sdk/src/test/java/cn/ceres/did/DidSdkPressTest.java +++ b/did-sdk/src/test/java/cn/ceres/did/DidSdkPressTest.java @@ -1,7 +1,5 @@ package cn.ceres.did; -import cn.ceres.did.client.InvokeCallback; -import cn.ceres.did.client.ResponseFuture; import cn.ceres.did.client.SdkClient; import cn.ceres.did.sdk.SdkProto; import org.junit.After; @@ -23,7 +21,7 @@ public class DidSdkPressTest { @Before public void init() { - client = new SdkClient("127.0.0.1",16831); + client = new SdkClient("127.0.0.1", 16831); // client = new SdkClient(); client.init(); client.start(); @@ -50,12 +48,7 @@ public class DidSdkPressTest { start = System.currentTimeMillis(); for (int i = 0; i < NUM; i++) { final SdkProto sdkProto = new SdkProto(); - client.invokeAsync(sdkProto, 5000, new InvokeCallback() { - @Override - public void operationComplete(ResponseFuture responseFuture) { - countDownLatch.countDown(); - } - }); + client.invokeAsync(sdkProto, 5000, responseFuture -> countDownLatch.countDown()); } // countDownLatch.await(10, TimeUnit.SECONDS); diff --git a/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java b/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java index ebddf64..cd533bc 100644 --- a/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java +++ b/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java @@ -2,6 +2,7 @@ package cn.ceres.did; import cn.ceres.did.client.SdkClient; import cn.ceres.did.sdk.SdkProto; +import org.junit.Before; import org.junit.Test; import java.util.concurrent.CountDownLatch; @@ -12,14 +13,18 @@ import java.util.concurrent.TimeUnit; */ public class DidSdkTest { private static final int NUM = 10; + SdkClient client; - @Test - public void didSdkTest() throws Exception { - SdkClient client = new SdkClient("127.0.0.1", 16831); + @Before + public void init() { + client = new SdkClient("127.0.0.1", 16831); // SdkClient client = new SdkClient(); client.init(); client.start(); + } + @Test + public void didSdkTest() throws Exception { // 测试同步请求,关注rqid是否对应 for (int i = 0; i < NUM; i++) { SdkProto sdkProto = new SdkProto(); @@ -44,4 +49,12 @@ public class DidSdkTest { System.out.println("invokeAsync test finish"); } + + @Test + public void testInvoke() throws Exception { + System.out.println(client.invoke()); + + client.setTimeoutMillis(3000); + System.out.println(client.invoke()); + } } diff --git a/did-server/src/main/java/cn/ceres/did/server/http/HttpServer.java b/did-server/src/main/java/cn/ceres/did/server/http/HttpServer.java index 5964ac0..c5a7df2 100644 --- a/did-server/src/main/java/cn/ceres/did/server/http/HttpServer.java +++ b/did-server/src/main/java/cn/ceres/did/server/http/HttpServer.java @@ -11,8 +11,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; -import java.net.InetSocketAddress; - /** * Http服务器,使用Netty中的Http协议栈, * 实现中支持多条请求路径,对于不存在的请求路径返回404状态码 @@ -33,12 +31,11 @@ public class HttpServer extends BaseServer { super.init(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) - .option(ChannelOption.SO_KEEPALIVE, false) - .option(ChannelOption.TCP_NODELAY, true) + // .option(ChannelOption.SO_KEEPALIVE, false) + // .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_BACKLOG, 1024) - .localAddress(new InetSocketAddress(port)) + // .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer() { - @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(defLoopGroup, @@ -49,15 +46,17 @@ public class HttpServer extends BaseServer { ); } }); - } @Override public void start() { try { - channelFuture = serverBootstrap.bind().sync(); - InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress(); - logger.info("HttpServer start success, port is:{}", addr.getPort()); + channelFuture = serverBootstrap.bind(port).sync(); + logger.info("HttpServer start success, port is:{}", port); + + // channelFuture = serverBootstrap.bind().sync(); + // InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress(); + // logger.info("HttpServer start success, port is:{}", addr.getPort()); } catch (InterruptedException e) { logger.error("HttpServer start fail,", e); } diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java index cb677b9..f0170b7 100644 --- a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java @@ -8,8 +8,6 @@ import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; -import java.net.InetSocketAddress; - /** * @author ehlxr */ @@ -26,12 +24,11 @@ public class SdkServer extends BaseServer { super.init(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.TCP_NODELAY, true) + // .option(ChannelOption.SO_KEEPALIVE, true) + // .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_BACKLOG, 1024) - .localAddress(new InetSocketAddress(port)) + // .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer() { - @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(defLoopGroup, @@ -46,9 +43,12 @@ public class SdkServer extends BaseServer { @Override public void start() { try { - channelFuture = serverBootstrap.bind().sync(); - InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress(); - logger.info("SdkServer start success, port is:{}", addr.getPort()); + channelFuture = serverBootstrap.bind(port).sync(); + logger.info("SdkServer start success, port is:{}", port); + + // channelFuture = serverBootstrap.bind().sync(); + // InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress(); + // logger.info("SdkServer start success, port is:{}", addr.getPort()); } catch (InterruptedException e) { logger.error("SdkServer start fail,", e); } diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java index 4083543..1f55fc0 100644 --- a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java @@ -32,11 +32,10 @@ public class SdkServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) throws Exception { - // if (msg instanceof SdkProto) { - // SdkProto sdkProto = (SdkProto) msg; if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) { try { sdkProto.setDid(snowFlake.nextId()); + ctx.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> semaphore.release()); } catch (Exception e) { semaphore.release(); @@ -50,7 +49,6 @@ public class SdkServerHandler extends SimpleChannelInboundHandler { logger.warn(info); throw new Exception(info); } - // } } @Override diff --git a/pom.xml b/pom.xml index ebeecbd..84ff83c 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ 1.8 1.8 - 4.1.6.Final + 4.1.58.Final 1.1.7 4.11