diff --git a/did-common/src/main/java/cn/ceres/did/common/Constants.java b/did-common/src/main/java/cn/ceres/did/common/Constants.java index 9eaf980..8ca0c14 100644 --- a/did-common/src/main/java/cn/ceres/did/common/Constants.java +++ b/did-common/src/main/java/cn/ceres/did/common/Constants.java @@ -12,34 +12,42 @@ public class Constants { return SYS_ENV.get(key) == null ? "" : SYS_ENV.get(key); } - public static String DEFAULT_HOST = "localhost"; + public static String SERVER_HOST = "localhost"; /** - * HTTP协议和SDK协议服务器的端口 + * HTTP 协议和 SDK 协议服务器默认端口 */ public static int HTTP_PORT = 16830; - public static int SDKS_PORT = 16831; + public static int SDK_PORT = 16831; /** - * HTTP协议和SDK协议的请求路径 - */ - public static String HTTP_REQUEST = "did"; - public static String SDKS_REQUEST = "did"; - - /** - * 数据中心的标识ID,取值范围:0~31 - * 机器或进程的标识ID,取值范围:0~31 - * 两个标识ID组合在分布式环境中必须唯一 + * 数据中心默认标识 ID,取值范围:0~31 + * 机器或进程默认标识 ID,取值范围:0~31 + *

+ * 两个标识 ID 组合在分布式环境中必须唯一 */ public static long DATACENTER_ID = 1; public static long MACHINES_ID = 1; /** - * 流量控制,表示每秒处理的并发数 + * Server 流量控制,表示每秒处理的并发数 */ public static int HANDLE_HTTP_TPS = 10000; - public static int HANDLE_SDKS_TPS = 50000; + public static int HANDLE_SDK_TPS = 50000; + + /** + * sdk client 流量控制,表示每秒处理的并发数 + */ + public static int SDK_CLIENT_ASYNC_TPS = 100000; + public static int SDK_CLIENT_ONEWAY_TPS = 100000; + public static int ACQUIRE_TIMEOUTMILLIS = 5000; + /** + * sdk client 默认超时时间 + */ + public static int SDK_CLIENT_TIMEOUTMILLIS = 2000; + + private Constants() { } } diff --git a/did-common/src/main/java/cn/ceres/did/common/NettyUtil.java b/did-common/src/main/java/cn/ceres/did/common/NettyUtil.java index e9c1e6e..f716402 100644 --- a/did-common/src/main/java/cn/ceres/did/common/NettyUtil.java +++ b/did-common/src/main/java/cn/ceres/did/common/NettyUtil.java @@ -1,7 +1,6 @@ package cn.ceres.did.common; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,11 +35,8 @@ public class NettyUtil { public static void closeChannel(Channel channel) { final String addrRemote = parseRemoteAddr(channel); - channel.close().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - logger.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote, future.isSuccess()); - } - }); + channel.close().addListener((ChannelFutureListener) future -> + logger.info("closeChannel: close the connection to remote address[{}] result: {}", + addrRemote, future.isSuccess())); } } diff --git a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java b/did-common/src/main/java/cn/ceres/did/common/SdkProto.java similarity index 96% rename from did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java rename to did-common/src/main/java/cn/ceres/did/common/SdkProto.java index cf35525..76f2a96 100644 --- a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java +++ b/did-common/src/main/java/cn/ceres/did/common/SdkProto.java @@ -1,4 +1,4 @@ -package cn.ceres.did.sdk; +package cn.ceres.did.common; import java.util.concurrent.atomic.AtomicInteger; diff --git a/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java b/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java index 26dd13c..3329154 100644 --- a/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java +++ b/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java @@ -1,7 +1,8 @@ package cn.ceres.did.client; +import cn.ceres.did.common.Constants; import cn.ceres.did.common.NettyUtil; -import cn.ceres.did.sdk.SdkProto; +import cn.ceres.did.common.SdkProto; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -10,36 +11,43 @@ import io.netty.channel.nio.NioEventLoopGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.*; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author ehlxr */ +@SuppressWarnings({"unused", "UnusedReturnValue"}) public abstract class AbstractClient implements Client { private final Logger logger = LoggerFactory.getLogger(AbstractClient.class); - private final Semaphore asyncSemaphore = new Semaphore(100000); - private final Semaphore onewaySemaphore = new Semaphore(100000); + private final Semaphore asyncSemaphore = new Semaphore(Constants.SDK_CLIENT_ASYNC_TPS); + private final Semaphore onewaySemaphore = new Semaphore(Constants.SDK_CLIENT_ONEWAY_TPS); - ConcurrentMap asyncResponse; NioEventLoopGroup workGroup; ChannelFuture channelFuture; Bootstrap bootstrap; - int timeoutMillis = 2000; - - public int getTimeoutMillis() { - return timeoutMillis; - } + int timeoutMillis; + String host; + int port; public void setTimeoutMillis(int timeoutMillis) { this.timeoutMillis = timeoutMillis; } - public void init() { - asyncResponse = new ConcurrentHashMap<>(16); - workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() { + public void setHost(String host) { + this.host = host; + } + + public void setPort(int port) { + this.port = port; + } + + void init() { + workGroup = new NioEventLoopGroup(new ThreadFactory() { private final AtomicInteger index = new AtomicInteger(0); @Override @@ -53,47 +61,49 @@ public abstract class AbstractClient implements Client { @Override public void shutdown() { - if (workGroup != null) { - workGroup.shutdownGracefully(); + logger.info("SDK Client shutdowning......"); + try { + if (workGroup != null) { + workGroup.shutdownGracefully().sync(); + } + } catch (Exception e) { + logger.error("Client EventLoopGroup shutdown error.", e); } + + logger.info("SDK Client shutdown finish!"); } - @Override - public SdkProto invokeSync(SdkProto sdkProto, long timeoutMillis) throws Exception { + public SdkProto invokeSync(long timeoutMillis) throws Exception { final Channel channel = channelFuture.channel(); if (channel.isActive()) { + final SdkProto sdkProto = new SdkProto(); final int rqid = sdkProto.getRqid(); try { - final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, null, null); - asyncResponse.put(rqid, responseFuture); + final ResponseFuture responseFuture = new ResponseFuture(timeoutMillis, null, null); + REPONSE_MAP.put(rqid, responseFuture); channel.writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> { if (channelFuture.isSuccess()) { //发送成功后立即跳出 - responseFuture.setIsSendStateOk(true); return; } // 代码执行到此说明发送失败,需要释放资源 - asyncResponse.remove(rqid); + REPONSE_MAP.remove(rqid); responseFuture.putResponse(null); responseFuture.setCause(channelFuture.cause()); - logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed."); + logger.warn("send a request command to channel <{}> failed.", NettyUtil.parseRemoteAddr(channel)); }); // 阻塞等待响应 SdkProto resultProto = responseFuture.waitResponse(timeoutMillis); if (null == resultProto) { - if (responseFuture.isSendStateOk()) { - throw new Exception(NettyUtil.parseRemoteAddr(channel) + timeoutMillis + responseFuture.getCause()); - } else { - throw new Exception(NettyUtil.parseRemoteAddr(channel), responseFuture.getCause()); - } + throw new Exception(NettyUtil.parseRemoteAddr(channel), responseFuture.getCause()); } return resultProto; } catch (Exception e) { logger.error("invokeSync fail, addr is " + NettyUtil.parseRemoteAddr(channel), e); throw new Exception(NettyUtil.parseRemoteAddr(channel), e); } finally { - asyncResponse.remove(rqid); + REPONSE_MAP.remove(rqid); } } else { NettyUtil.closeChannel(channel); @@ -102,22 +112,21 @@ public abstract class AbstractClient implements Client { } @Override - public void invokeAsync(SdkProto sdkProto, long timeoutMillis, final InvokeCallback invokeCallback) throws Exception { + public void invokeAsync(long timeoutMillis, InvokeCallback invokeCallback) throws Exception { final Channel channel = channelFuture.channel(); if (channel.isOpen() && channel.isActive()) { + final SdkProto sdkProto = new SdkProto(); final int rqid = sdkProto.getRqid(); - boolean acquired = asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); - if (acquired) { - final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, invokeCallback, asyncSemaphore); - asyncResponse.put(rqid, responseFuture); + if (asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)) { + final ResponseFuture responseFuture = new ResponseFuture(timeoutMillis, invokeCallback, asyncSemaphore); + REPONSE_MAP.put(rqid, responseFuture); try { - channelFuture.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> { + channelFuture.channel().writeAndFlush(sdkProto).addListener(channelFuture -> { if (channelFuture.isSuccess()) { - responseFuture.setIsSendStateOk(true); return; } // 代码执行到些说明发送失败,需要释放资源 - asyncResponse.remove(rqid); + REPONSE_MAP.remove(rqid); responseFuture.setCause(channelFuture.cause()); responseFuture.putResponse(null); @@ -128,47 +137,18 @@ public abstract class AbstractClient implements Client { } finally { responseFuture.release(); } - logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.", channelFuture.cause()); + logger.warn("send a request command to channel <{}> failed.", + NettyUtil.parseRemoteAddr(channel), channelFuture.cause()); }); } catch (Exception e) { responseFuture.release(); - logger.warn("send a request to channel <" + NettyUtil.parseRemoteAddr(channel) + "> Exception", e); + logger.warn("send a request to channel <{}> Exception", + NettyUtil.parseRemoteAddr(channel), e); throw new Exception(NettyUtil.parseRemoteAddr(channel), e); } } else { - String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d", - timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits()); - logger.warn(info); - throw new Exception(info); - } - } else { - NettyUtil.closeChannel(channel); - throw new Exception(NettyUtil.parseRemoteAddr(channel)); - } - } - - @Override - public void invokeOneWay(SdkProto sdkProto, long timeoutMillis) throws Exception { - final Channel channel = channelFuture.channel(); - if (channel.isActive()) { - final int rqid = sdkProto.getRqid(); - boolean acquired = onewaySemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); - if (acquired) { - try { - channelFuture.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> { - onewaySemaphore.release(); - if (!channelFuture.isSuccess()) { - logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed."); - } - }); - } catch (Exception e) { - logger.warn("send a request to channel <" + NettyUtil.parseRemoteAddr(channel) + "> Exception"); - throw new Exception(NettyUtil.parseRemoteAddr(channel), e); - } finally { - asyncResponse.remove(rqid); - } - } else { - String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d", + String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + + "nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits()); logger.warn(info); throw new Exception(info); @@ -182,4 +162,12 @@ public abstract class AbstractClient implements Client { public long invoke() throws Exception { return invoke(timeoutMillis); } + + public SdkProto invokeSync() throws Exception { + return invokeSync(timeoutMillis); + } + + public void invokeAsync(InvokeCallback invokeCallback) throws Exception { + invokeAsync(timeoutMillis, invokeCallback); + } } diff --git a/did-sdk/src/main/java/cn/ceres/did/client/Client.java b/did-sdk/src/main/java/cn/ceres/did/client/Client.java index 3f851c0..be12061 100644 --- a/did-sdk/src/main/java/cn/ceres/did/client/Client.java +++ b/did-sdk/src/main/java/cn/ceres/did/client/Client.java @@ -1,22 +1,53 @@ package cn.ceres.did.client; -import cn.ceres.did.sdk.SdkProto; + +import cn.ceres.did.common.SdkProto; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * @author ehlxr */ public interface Client { + ConcurrentMap REPONSE_MAP = new ConcurrentHashMap<>(); + + /** + * 启动 sdk client + */ void start(); + /** + * 停止 sdk client + */ void shutdown(); - SdkProto invokeSync(SdkProto proto, long timeoutMillis) throws Exception; + /** + * 同步调用 + * + * @param timeoutMillis 超时时间 + * @return {@link SdkProto} + * @throws Exception 调用异常 + */ + SdkProto invokeSync(long timeoutMillis) throws Exception; - void invokeAsync(SdkProto proto, long timeoutMillis, InvokeCallback invokeCallback) throws Exception; - - void invokeOneWay(SdkProto proto, long timeoutMillis) throws Exception; + /** + * 异步调用 + * + * @param timeoutMillis 超时时间 + * @param invokeCallback 回调接口 + * @throws Exception 调用异常 + */ + void invokeAsync(long timeoutMillis, InvokeCallback invokeCallback) throws Exception; + /** + * 获取 id + * + * @param timeoutMillis 超时时间 + * @return id + * @throws Exception 调用异常 + */ default long invoke(long timeoutMillis) throws Exception { - return invokeSync(new SdkProto(), timeoutMillis).getDid(); + return invokeSync(timeoutMillis).getDid(); } } diff --git a/did-sdk/src/main/java/cn/ceres/did/client/InvokeCallback.java b/did-sdk/src/main/java/cn/ceres/did/client/InvokeCallback.java index 616da69..3ae0741 100644 --- a/did-sdk/src/main/java/cn/ceres/did/client/InvokeCallback.java +++ b/did-sdk/src/main/java/cn/ceres/did/client/InvokeCallback.java @@ -5,5 +5,10 @@ package cn.ceres.did.client; */ public interface InvokeCallback { + /** + * 异步回调处理方法 + * + * @param responseFuture {@link ResponseFuture} + */ void operationComplete(ResponseFuture responseFuture); } diff --git a/did-sdk/src/main/java/cn/ceres/did/client/ResponseFuture.java b/did-sdk/src/main/java/cn/ceres/did/client/ResponseFuture.java index 4babb2f..e2a419e 100644 --- a/did-sdk/src/main/java/cn/ceres/did/client/ResponseFuture.java +++ b/did-sdk/src/main/java/cn/ceres/did/client/ResponseFuture.java @@ -1,6 +1,6 @@ package cn.ceres.did.client; -import cn.ceres.did.sdk.SdkProto; +import cn.ceres.did.common.SdkProto; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; @@ -11,7 +11,6 @@ import java.util.concurrent.atomic.AtomicBoolean; * @author ehlxr */ public class ResponseFuture { - private final int rqid; private final long timeoutMillis; private final Semaphore semaphore; private final InvokeCallback invokeCallback; @@ -21,14 +20,11 @@ public class ResponseFuture { private volatile Throwable cause; private volatile SdkProto sdkProto; - private volatile boolean isSendStateOk; - public ResponseFuture(int rqid, long timeoutMillis, InvokeCallback invokeCallback, Semaphore semaphore) { - this.rqid = rqid; + public ResponseFuture(long timeoutMillis, InvokeCallback invokeCallback, Semaphore semaphore) { this.timeoutMillis = timeoutMillis; this.invokeCallback = invokeCallback; this.semaphore = semaphore; - this.isSendStateOk = false; } /** @@ -75,14 +71,6 @@ public class ResponseFuture { this.sdkProto = sdkProto; } - public boolean isSendStateOk() { - return isSendStateOk; - } - - public void setIsSendStateOk(boolean isSendStateOk) { - this.isSendStateOk = isSendStateOk; - } - public Throwable getCause() { return cause; } @@ -94,7 +82,6 @@ public class ResponseFuture { @Override public String toString() { return "ResponseFuture{" + - "rqid=" + rqid + ", timeoutMillis=" + timeoutMillis + ", semaphore=" + semaphore + ", invokeCallback=" + invokeCallback + @@ -103,7 +90,6 @@ public class ResponseFuture { ", countDownLatch=" + countDownLatch + ", cause=" + cause + ", sdkProto=" + sdkProto + - ", isSendStateOk=" + isSendStateOk + '}'; } } diff --git a/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java b/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java index 088f6a7..50c05b4 100644 --- a/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java +++ b/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java @@ -1,11 +1,12 @@ package cn.ceres.did.client; +import cn.ceres.did.client.handler.SdkClientDecoder; +import cn.ceres.did.client.handler.SdkClientEncoder; +import cn.ceres.did.client.handler.SdkClientHandler; import cn.ceres.did.common.Constants; -import cn.ceres.did.common.NettyUtil; -import cn.ceres.did.sdk.SdkClientDecoder; -import cn.ceres.did.sdk.SdkClientEncoder; -import cn.ceres.did.sdk.SdkProto; -import io.netty.channel.*; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import org.slf4j.Logger; @@ -18,19 +19,26 @@ import java.net.InetSocketAddress; */ public class SdkClient extends AbstractClient { private final Logger logger = LoggerFactory.getLogger(SdkClient.class); - private String host; - private int port; public SdkClient(String host, int port) { + this(host, port, Constants.SDK_CLIENT_TIMEOUTMILLIS); + } + + public SdkClient(String host, int port, int timeoutMillis) { this.host = host; this.port = port; + this.timeoutMillis = timeoutMillis; } public SdkClient() { + this("".equals(Constants.getEnv("SDK_HOST")) ? Constants.SERVER_HOST : Constants.getEnv("HTTP_PORT"), + "".equals(Constants.getEnv("SDK_PORT")) ? Constants.SDK_PORT : Integer.parseInt(Constants.getEnv("SDK_PORT"))); } @Override public void start() { + init(); + bootstrap.group(workGroup) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // .option(ChannelOption.TCP_NODELAY, true) @@ -46,8 +54,7 @@ public class SdkClient extends AbstractClient { }); try { - channelFuture = bootstrap.connect((host == null || "".equals(host)) ? Constants.DEFAULT_HOST : host, - port == 0 ? Constants.SDKS_PORT : port).sync(); + channelFuture = bootstrap.connect(host, port).sync(); channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> { logger.warn("client channel close.", channelFuture.cause()); @@ -62,42 +69,4 @@ public class SdkClient extends AbstractClient { } } - class SdkClientHandler extends SimpleChannelInboundHandler { - @Override - protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) { - final int rqid = sdkProto.getRqid(); - final ResponseFuture responseFuture = asyncResponse.get(rqid); - if (responseFuture != null) { - responseFuture.setSdkProto(sdkProto); - responseFuture.release(); - asyncResponse.remove(rqid); - - // 异步请求,执行回调函数 - if (responseFuture.getInvokeCallback() != null) { - responseFuture.executeInvokeCallback(); - } else { - // 同步请求,返回数据并释放CountDown - responseFuture.putResponse(sdkProto); - } - } else { - logger.warn("receive response, but not matched any request, address is {}", NettyUtil.parseRemoteAddr(ctx.channel())); - logger.warn("response data is {}", sdkProto.toString()); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - logger.error("SdkHandler error", cause); - NettyUtil.closeChannel(ctx.channel()); - } - - } - - public void setHost(String host) { - this.host = host; - } - - public void setPort(int port) { - this.port = port; - } } diff --git a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientDecoder.java b/did-sdk/src/main/java/cn/ceres/did/client/handler/SdkClientDecoder.java similarity index 50% rename from did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientDecoder.java rename to did-sdk/src/main/java/cn/ceres/did/client/handler/SdkClientDecoder.java index a7e0e18..d6be472 100644 --- a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientDecoder.java +++ b/did-sdk/src/main/java/cn/ceres/did/client/handler/SdkClientDecoder.java @@ -1,6 +1,31 @@ -package cn.ceres.did.sdk; +/* + * The MIT License (MIT) + * + * Copyright © 2020 xrv + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package cn.ceres.did.client.handler; import cn.ceres.did.common.NettyUtil; +import cn.ceres.did.common.SdkProto; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -10,9 +35,10 @@ import org.slf4j.LoggerFactory; /** * @author ehlxr + * @since 2021-01-20 14:42. */ public class SdkClientDecoder extends FixedLengthFrameDecoder { - private static final Logger logger = LoggerFactory.getLogger(SdkClientDecoder.class); + private final Logger logger = LoggerFactory.getLogger(SdkClientDecoder.class); public SdkClientDecoder(int frameLength) { super(frameLength); diff --git a/did-sdk/src/main/java/cn/ceres/did/client/handler/SdkClientEncoder.java b/did-sdk/src/main/java/cn/ceres/did/client/handler/SdkClientEncoder.java new file mode 100644 index 0000000..487a9e4 --- /dev/null +++ b/did-sdk/src/main/java/cn/ceres/did/client/handler/SdkClientEncoder.java @@ -0,0 +1,59 @@ +/* + * The MIT License (MIT) + * + * Copyright © 2020 xrv + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package cn.ceres.did.client.handler; + +import cn.ceres.did.client.SdkClient; +import cn.ceres.did.common.NettyUtil; +import cn.ceres.did.common.SdkProto; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author ehlxr + * @since 2021-01-20 14:43. + */ +public class SdkClientEncoder extends MessageToByteEncoder { + private final Logger logger = LoggerFactory.getLogger(SdkClient.class); + + + @Override + protected void encode(ChannelHandlerContext ctx, SdkProto sdkProto, ByteBuf out) { + out.writeInt(sdkProto.getRqid()); + out.writeLong(sdkProto.getDid()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Channel channel = ctx.channel(); + logger.error(String.format("SdkServerEncoder channel [%s] error and will be closed", + NettyUtil.parseRemoteAddr(channel)), cause); + + NettyUtil.closeChannel(channel); + } +} diff --git a/did-sdk/src/main/java/cn/ceres/did/client/handler/SdkClientHandler.java b/did-sdk/src/main/java/cn/ceres/did/client/handler/SdkClientHandler.java new file mode 100644 index 0000000..9960b6a --- /dev/null +++ b/did-sdk/src/main/java/cn/ceres/did/client/handler/SdkClientHandler.java @@ -0,0 +1,70 @@ +/* + * The MIT License (MIT) + * + * Copyright © 2020 xrv + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package cn.ceres.did.client.handler; + +import cn.ceres.did.client.Client; +import cn.ceres.did.client.ResponseFuture; +import cn.ceres.did.common.NettyUtil; +import cn.ceres.did.common.SdkProto; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author ehlxr + * @since 2021-01-20 14:43. + */ +public class SdkClientHandler extends SimpleChannelInboundHandler { + private final Logger logger = LoggerFactory.getLogger(SdkClientHandler.class); + + @Override + protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) { + final int rqid = sdkProto.getRqid(); + final ResponseFuture responseFuture = Client.REPONSE_MAP.get(rqid); + if (responseFuture != null) { + responseFuture.setSdkProto(sdkProto); + responseFuture.release(); + Client.REPONSE_MAP.remove(rqid); + + // 异步请求,执行回调函数 + if (responseFuture.getInvokeCallback() != null) { + responseFuture.executeInvokeCallback(); + } else { + // 同步请求,返回数据并释放 CountDown + responseFuture.putResponse(sdkProto); + } + } else { + logger.warn("receive response {}, but not matched any request, address is {}", + sdkProto, NettyUtil.parseRemoteAddr(ctx.channel())); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error("SdkHandler error", cause); + NettyUtil.closeChannel(ctx.channel()); + } +} diff --git a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientEncoder.java b/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientEncoder.java deleted file mode 100644 index 70fa069..0000000 --- a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientEncoder.java +++ /dev/null @@ -1,29 +0,0 @@ -package cn.ceres.did.sdk; - -import cn.ceres.did.common.NettyUtil; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author ehlxr - */ -public class SdkClientEncoder extends MessageToByteEncoder { - private static final Logger logger = LoggerFactory.getLogger(SdkClientEncoder.class); - - @Override - protected void encode(ChannelHandlerContext ctx, SdkProto sdkProto, ByteBuf out) { - out.writeInt(sdkProto.getRqid()); - out.writeLong(sdkProto.getDid()); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - Channel channel = ctx.channel(); - logger.error(String.format("SdkServerEncoder channel [%s] error and will be closed", NettyUtil.parseRemoteAddr(channel)), cause); - NettyUtil.closeChannel(channel); - } -} diff --git a/did-sdk/src/test/java/cn/ceres/did/DidSdkPressTest.java b/did-sdk/src/test/java/cn/ceres/did/DidSdkPressTest.java index af55c2c..7868446 100644 --- a/did-sdk/src/test/java/cn/ceres/did/DidSdkPressTest.java +++ b/did-sdk/src/test/java/cn/ceres/did/DidSdkPressTest.java @@ -1,7 +1,6 @@ package cn.ceres.did; import cn.ceres.did.client.SdkClient; -import cn.ceres.did.sdk.SdkProto; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -21,9 +20,7 @@ public class DidSdkPressTest { @Before public void init() { - client = new SdkClient("127.0.0.1", 16831); - // client = new SdkClient(); - client.init(); + client = new SdkClient(); client.start(); } @@ -32,7 +29,6 @@ public class DidSdkPressTest { client.shutdown(); } - @Test public void asyncTest() throws Exception { long start; @@ -47,8 +43,7 @@ public class DidSdkPressTest { final CountDownLatch countDownLatch = new CountDownLatch(NUM); start = System.currentTimeMillis(); for (int i = 0; i < NUM; i++) { - final SdkProto sdkProto = new SdkProto(); - client.invokeAsync(sdkProto, 5000, responseFuture -> countDownLatch.countDown()); + client.invokeAsync(responseFuture -> countDownLatch.countDown()); } // countDownLatch.await(10, TimeUnit.SECONDS); @@ -74,12 +69,11 @@ public class DidSdkPressTest { long amount = 0; long allcast = 0; - for (int k = 0; k < 20; k++) { + for (int k = 0; k < 10; k++) { start = System.currentTimeMillis(); int NUM = 60000; for (int i = 0; i < NUM; i++) { - final SdkProto sdkProto = new SdkProto(); - client.invokeSync(sdkProto, 5000); + client.invokeSync(); } end = System.currentTimeMillis(); diff --git a/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java b/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java index cd533bc..ef42d0a 100644 --- a/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java +++ b/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java @@ -1,7 +1,8 @@ package cn.ceres.did; import cn.ceres.did.client.SdkClient; -import cn.ceres.did.sdk.SdkProto; +import cn.ceres.did.common.SdkProto; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -17,37 +18,35 @@ public class DidSdkTest { @Before public void init() { - client = new SdkClient("127.0.0.1", 16831); - // SdkClient client = new SdkClient(); - client.init(); + client = new SdkClient("127.0.0.1", 16831, 5000); client.start(); } + @After + public void destroy() { + client.shutdown(); + } + @Test public void didSdkTest() throws Exception { // 测试同步请求,关注rqid是否对应 for (int i = 0; i < NUM; i++) { - SdkProto sdkProto = new SdkProto(); - System.out.println(i + " sendProto: " + sdkProto.toString()); - SdkProto resultProto = client.invokeSync(sdkProto, 2000); - System.out.println(i + " resultProto: " + resultProto.toString()); + SdkProto resultProto = client.invokeSync(); + System.out.println(i + " resultProto: " + resultProto); } System.out.println("invokeync test finish"); // 测试异步请求,关注rqid是否对应 final CountDownLatch countDownLatch = new CountDownLatch(NUM); for (int i = 0; i < NUM; i++) { - final SdkProto sdkProto = new SdkProto(); final int finalI = i; - client.invokeAsync(sdkProto, 2000, responseFuture -> { - System.out.println(finalI + " sendProto: " + sdkProto.toString()); + client.invokeAsync(responseFuture -> { countDownLatch.countDown(); - System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto().toString()); + System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto()); }); } countDownLatch.await(10, TimeUnit.SECONDS); System.out.println("invokeAsync test finish"); - } @Test diff --git a/did-server/src/main/java/cn/ceres/did/core/SnowFlake.java b/did-server/src/main/java/cn/ceres/did/core/SnowFlake.java index e824d67..356979b 100644 --- a/did-server/src/main/java/cn/ceres/did/core/SnowFlake.java +++ b/did-server/src/main/java/cn/ceres/did/core/SnowFlake.java @@ -8,7 +8,7 @@ package cn.ceres.did.core; * 协议格式:0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 * 协议解释:0 - 41 位时间戳 - 5 位数据中心标识 - 5 位机器标识 - 12 位序列号 *

- * 1 位标识,由于 long 基本类型在 Java 中是带符号的,最高位是符号位,正数是 0,负数是 1,所以 id 一般是正数,最高位是 0 + * 1 位标识,由于 Long 基本类型在 Java 中是带符号的,最高位是符号位,正数是 0,负数是 1,所以 id 一般是正数,最高位是 0 *

* 41 位时间截(毫秒级),注意,41 位时间截不是存储当前时间的时间截,而是存储时间截的差值(当前时间截 - 开始时间截)得到的值,这里的的开始时间截。 * 一般是我们的 id 生成器开始使用的时间,由我们程序来指定的(如下下面程序 START_STMP 属性)。41 位的时间截,可以使用 69 年,(1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69 diff --git a/did-server/src/main/java/cn/ceres/did/server/BaseServer.java b/did-server/src/main/java/cn/ceres/did/server/BaseServer.java index 1d56681..7711f89 100644 --- a/did-server/src/main/java/cn/ceres/did/server/BaseServer.java +++ b/did-server/src/main/java/cn/ceres/did/server/BaseServer.java @@ -1,5 +1,6 @@ package cn.ceres.did.server; +import cn.ceres.did.core.SnowFlake; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.DefaultEventLoopGroup; @@ -22,6 +23,7 @@ public abstract class BaseServer implements Server { protected ChannelFuture channelFuture; protected ServerBootstrap serverBootstrap; protected int port; + protected SnowFlake snowFlake; public void init() { defLoopGroup = new DefaultEventLoopGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { diff --git a/did-server/src/main/java/cn/ceres/did/server/http/HttpServer.java b/did-server/src/main/java/cn/ceres/did/server/http/HttpServer.java index aefda65..ab0f466 100644 --- a/did-server/src/main/java/cn/ceres/did/server/http/HttpServer.java +++ b/did-server/src/main/java/cn/ceres/did/server/http/HttpServer.java @@ -14,20 +14,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Http服务器,使用Netty中的Http协议栈, - * 实现中支持多条请求路径,对于不存在的请求路径返回404状态码 - * 如:http://localhost:8099/getTime + * Http 服务器,使用 Netty 中的 Http 协议栈, * * @author ehlxr */ public class HttpServer extends BaseServer { protected Logger logger = LoggerFactory.getLogger(HttpServer.class); - private final SnowFlake snowFlake; + public HttpServer(SnowFlake snowFlake, int port) { + this.snowFlake = snowFlake; + this.port = port; + } public HttpServer(SnowFlake snowFlake) { this.snowFlake = snowFlake; - this.port = "".equals(Constants.getEnv("HTTP_PORT")) ? Constants.HTTP_PORT : Integer.parseInt(Constants.getEnv("HTTP_PORT")); + this.port = "".equals(Constants.getEnv("HTTP_PORT")) ? + Constants.HTTP_PORT : + Integer.parseInt(Constants.getEnv("HTTP_PORT")); } @Override @@ -57,10 +60,6 @@ public class HttpServer extends BaseServer { try { channelFuture = serverBootstrap.bind(port).sync(); logger.info("HttpServer start success, port is:{}", port); - - // channelFuture = serverBootstrap.bind().sync(); - // InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress(); - // logger.info("HttpServer start success, port is:{}", addr.getPort()); } catch (InterruptedException e) { logger.error("HttpServer start fail,", e); } diff --git a/did-server/src/main/java/cn/ceres/did/server/http/HttpServerHandler.java b/did-server/src/main/java/cn/ceres/did/server/http/HttpServerHandler.java index c01b65c..eb512d6 100644 --- a/did-server/src/main/java/cn/ceres/did/server/http/HttpServerHandler.java +++ b/did-server/src/main/java/cn/ceres/did/server/http/HttpServerHandler.java @@ -15,11 +15,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** - * 自定义的处理器,目前支持三种请求: - * getTime: 获取服务器当前时间; - * clientInfo: 获取请求客户端的User-Agent信息 - * 其它: 返回404状态,并且提示404信息 - * * @author ehlxr */ public class HttpServerHandler extends SimpleChannelInboundHandler { diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkProto.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkProto.java deleted file mode 100644 index 120c7d5..0000000 --- a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkProto.java +++ /dev/null @@ -1,44 +0,0 @@ -package cn.ceres.did.server.sdk; - -/** - * @author ehlxr - */ -public class SdkProto { - /** - * 请求的ID - */ - private int rqid; - /** - * 全局的 ID - */ - private long did; - - SdkProto(int rqid, long did) { - this.rqid = rqid; - this.did = did; - } - - public int getRqid() { - return rqid; - } - - public void setRqid(int rqid) { - this.rqid = rqid; - } - - public long getDid() { - return did; - } - - public void setDid(long did) { - this.did = did; - } - - @Override - public String toString() { - return "SdkProto{" + - "rqid=" + rqid + - ", did=" + did + - '}'; - } -} diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java index 774a68e..85b17bc 100644 --- a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java @@ -15,11 +15,17 @@ import org.slf4j.LoggerFactory; */ public class SdkServer extends BaseServer { protected Logger logger = LoggerFactory.getLogger(SdkServer.class); - private final SnowFlake snowFlake; public SdkServer(SnowFlake snowFlake) { this.snowFlake = snowFlake; - this.port = "".equals(Constants.getEnv("SDKS_PORT")) ? Constants.SDKS_PORT : Integer.parseInt(Constants.getEnv("SDKS_PORT")); + this.port = "".equals(Constants.getEnv("SDK_PORT")) ? + Constants.SDK_PORT : + Integer.parseInt(Constants.getEnv("SDK_PORT")); + } + + public SdkServer(SnowFlake snowFlake, int port) { + this.snowFlake = snowFlake; + this.port = port; } @Override @@ -48,10 +54,6 @@ public class SdkServer extends BaseServer { try { channelFuture = serverBootstrap.bind(port).sync(); logger.info("SdkServer start success, port is:{}", port); - - // channelFuture = serverBootstrap.bind().sync(); - // InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress(); - // logger.info("SdkServer start success, port is:{}", addr.getPort()); } catch (InterruptedException e) { logger.error("SdkServer start fail,", e); } diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerDecoder.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerDecoder.java index a1f7a6e..0764144 100644 --- a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerDecoder.java +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerDecoder.java @@ -1,6 +1,7 @@ package cn.ceres.did.server.sdk; import cn.ceres.did.common.NettyUtil; +import cn.ceres.did.common.SdkProto; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerEncoder.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerEncoder.java index 12758be..1c1c112 100644 --- a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerEncoder.java +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerEncoder.java @@ -1,6 +1,7 @@ package cn.ceres.did.server.sdk; import cn.ceres.did.common.NettyUtil; +import cn.ceres.did.common.SdkProto; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java index 1f55fc0..c29fa11 100644 --- a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java @@ -2,6 +2,7 @@ package cn.ceres.did.server.sdk; import cn.ceres.did.common.Constants; import cn.ceres.did.common.NettyUtil; +import cn.ceres.did.common.SdkProto; import cn.ceres.did.core.SnowFlake; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; @@ -23,7 +24,7 @@ public class SdkServerHandler extends SimpleChannelInboundHandler { /** * 通过信号量来控制流量 */ - private final Semaphore semaphore = new Semaphore(Constants.HANDLE_SDKS_TPS); + private final Semaphore semaphore = new Semaphore(Constants.HANDLE_SDK_TPS); private final SnowFlake snowFlake; SdkServerHandler(SnowFlake snowFlake) {