commit 22789bb1314727f3506aa5b9c6920a4b06be835a Author: ehlxr Date: Tue Aug 14 15:21:56 2018 +0800 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f082134 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea +did.iml +**/target/ +**/dependency-reduced-pom.xml \ No newline at end of file diff --git a/.idea/compiler.xml b/.idea/compiler.xml new file mode 100644 index 0000000..722be16 --- /dev/null +++ b/.idea/compiler.xml @@ -0,0 +1,25 @@ + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/encodings.xml b/.idea/encodings.xml new file mode 100644 index 0000000..a749363 --- /dev/null +++ b/.idea/encodings.xml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..99b1a92 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,11 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..911d8a5 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/thriftCompiler.xml b/.idea/thriftCompiler.xml new file mode 100644 index 0000000..7bc123c --- /dev/null +++ b/.idea/thriftCompiler.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/did-common/pom.xml b/did-common/pom.xml new file mode 100644 index 0000000..7b0b63c --- /dev/null +++ b/did-common/pom.xml @@ -0,0 +1,35 @@ + + + + + did + cn.ceres.did + 1.0-SNAPSHOT + + 4.0.0 + + did-common + + did-common + + + + io.netty + netty-all + + + ch.qos.logback + logback-classic + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + diff --git a/did-common/src/main/java/cn/ceres/did/common/Constants.java b/did-common/src/main/java/cn/ceres/did/common/Constants.java new file mode 100644 index 0000000..824e289 --- /dev/null +++ b/did-common/src/main/java/cn/ceres/did/common/Constants.java @@ -0,0 +1,45 @@ +package cn.ceres.did.common; + +import java.util.Map; + +/** + * @author ehlxr + */ +public class Constants { + private static Map SYS_ENV = System.getenv(); + + public static String getEnv(String key) { + return SYS_ENV.get(key) == null ? "" : SYS_ENV.get(key); + } + + public static String DEFAULT_HOST = "localhost"; + /** + * HTTP协议和SDK协议服务器的端口 + */ + public static int HTTP_PORT = 16830; + public static int SDKS_PORT = 16831; + + /** + * HTTP协议和SDK协议的请求路径 + */ + public static String HTTP_REQUEST = "did"; + public static String SDKS_REQUEST = "did"; + + /** + * 数据中心的标识ID,取值范围:0~31 + * 机器或进程的标识ID,取值范围:0~31 + * 两个标识ID组合在分布式环境中必须唯一 + */ + public static long DATACENTER_ID = 1; + public static long MACHINES_SIGN = 1; + + /** + * 流量控制,表示每秒处理的并发数 + */ + public static int HANDLE_HTTP_TPS = 10000; + public static int HANDLE_SDKS_TPS = 50000; + public static int ACQUIRE_TIMEOUTMILLIS = 5000; + + private Constants() { + } +} diff --git a/did-common/src/main/java/cn/ceres/did/common/NettyUtil.java b/did-common/src/main/java/cn/ceres/did/common/NettyUtil.java new file mode 100644 index 0000000..e9c1e6e --- /dev/null +++ b/did-common/src/main/java/cn/ceres/did/common/NettyUtil.java @@ -0,0 +1,46 @@ +package cn.ceres.did.common; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.SocketAddress; + +/** + * @author ehlxr + */ +public class NettyUtil { + private static final Logger logger = LoggerFactory.getLogger(NettyUtil.class); + + /** + * 获取 Channel 的远程 IP 地址 + */ + public static String parseRemoteAddr(final Channel channel) { + if (null == channel) { + return ""; + } + SocketAddress remote = channel.remoteAddress(); + final String addr = remote != null ? remote.toString() : ""; + + if (addr.length() > 0) { + int index = addr.lastIndexOf("/"); + if (index >= 0) { + return addr.substring(index + 1); + } + return addr; + } + return ""; + } + + public static void closeChannel(Channel channel) { + final String addrRemote = parseRemoteAddr(channel); + channel.close().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + logger.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote, future.isSuccess()); + } + }); + } +} diff --git a/did-common/target/maven-archiver/pom.properties b/did-common/target/maven-archiver/pom.properties new file mode 100644 index 0000000..ca70ec0 --- /dev/null +++ b/did-common/target/maven-archiver/pom.properties @@ -0,0 +1,5 @@ +#Generated by Maven +#Tue Aug 14 15:16:52 CST 2018 +version=1.0-SNAPSHOT +groupId=cn.ceres.did +artifactId=did-common diff --git a/did-common/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst b/did-common/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst new file mode 100644 index 0000000..dc38126 --- /dev/null +++ b/did-common/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst @@ -0,0 +1,3 @@ +cn/ceres/did/common/NettyUtil.class +cn/ceres/did/common/Constants.class +cn/ceres/did/common/NettyUtil$1.class diff --git a/did-common/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst b/did-common/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst new file mode 100644 index 0000000..1fc940a --- /dev/null +++ b/did-common/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst @@ -0,0 +1,2 @@ +/Users/ehlxr/WorkSpaces/did/did-common/src/main/java/cn/ceres/did/common/Constants.java +/Users/ehlxr/WorkSpaces/did/did-common/src/main/java/cn/ceres/did/common/NettyUtil.java diff --git a/did-sdk/dependency-reduced-pom.xml b/did-sdk/dependency-reduced-pom.xml new file mode 100644 index 0000000..5db6d5f --- /dev/null +++ b/did-sdk/dependency-reduced-pom.xml @@ -0,0 +1,59 @@ + + + + did + cn.ceres.did + 1.0-SNAPSHOT + + 4.0.0 + did-sdk + did-sdk + + + + maven-compiler-plugin + + + maven-shade-plugin + + true + + + cn.ceres.did:did-common + + + + + + maven-source-plugin + + + + + + io.netty + netty-all + 4.1.6.Final + compile + + + ch.qos.logback + logback-classic + 1.1.7 + compile + + + ch.qos.logback + logback-core + 1.1.7 + compile + + + org.slf4j + slf4j-api + 1.7.20 + compile + + + + diff --git a/did-sdk/pom.xml b/did-sdk/pom.xml new file mode 100644 index 0000000..968196d --- /dev/null +++ b/did-sdk/pom.xml @@ -0,0 +1,52 @@ + + + + + did + cn.ceres.did + 1.0-SNAPSHOT + + 4.0.0 + + did-sdk + + did-sdk + + + cn.ceres.did + did-common + 1.0-SNAPSHOT + + + junit + junit + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-shade-plugin + + true + + + cn.ceres.did:did-common + + + + + + org.apache.maven.plugins + maven-source-plugin + + + + \ No newline at end of file diff --git a/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java b/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java new file mode 100644 index 0000000..31b3bb4 --- /dev/null +++ b/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java @@ -0,0 +1,179 @@ +package cn.ceres.did.client; + +import cn.ceres.did.sdk.SdkProto; +import cn.ceres.did.common.NettyUtil; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.nio.NioEventLoopGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author ehlxr + */ +public abstract class AbstractClient implements Client { + private Logger logger = LoggerFactory.getLogger(AbstractClient.class); + + private final Semaphore asyncSemaphore = new Semaphore(100000); + private final Semaphore onewaySemaphore = new Semaphore(100000); + + ConcurrentMap asyncResponse; + NioEventLoopGroup workGroup; + ChannelFuture channelFuture; + Bootstrap bootstrap; + + public void init() { + asyncResponse = new ConcurrentHashMap<>(16); + workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() { + private AtomicInteger index = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "WORK_" + index.incrementAndGet()); + } + }); + + bootstrap = new Bootstrap(); + } + + @Override + public void shutdown() { + if (workGroup != null) { + workGroup.shutdownGracefully(); + } + } + + + @Override + public SdkProto invokeSync(SdkProto sdkProto, long timeoutMillis) throws Exception { + final Channel channel = channelFuture.channel(); + if (channel.isActive()) { + final int rqid = sdkProto.getRqid(); + try { + final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, null, null); + asyncResponse.put(rqid, responseFuture); + channel.writeAndFlush(sdkProto).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) { + if (channelFuture.isSuccess()) { + //发送成功后立即跳出 + responseFuture.setIsSendStateOk(true); + return; + } + // 代码执行到此说明发送失败,需要释放资源 + asyncResponse.remove(rqid); + responseFuture.putResponse(null); + responseFuture.setCause(channelFuture.cause()); + logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed."); + } + }); + // 阻塞等待响应 + SdkProto resultProto = responseFuture.waitResponse(timeoutMillis); + if (null == resultProto) { + if (responseFuture.isSendStateOk()) { + throw new Exception(NettyUtil.parseRemoteAddr(channel) + timeoutMillis + responseFuture.getCause()); + } else { + throw new Exception(NettyUtil.parseRemoteAddr(channel), responseFuture.getCause()); + } + } + return resultProto; + } catch (Exception e) { + logger.error("invokeSync fail, addr is " + NettyUtil.parseRemoteAddr(channel), e); + throw new Exception(NettyUtil.parseRemoteAddr(channel), e); + } finally { + asyncResponse.remove(rqid); + } + } else { + NettyUtil.closeChannel(channel); + throw new Exception(NettyUtil.parseRemoteAddr(channel)); + } + } + + @Override + public void invokeAsync(SdkProto sdkProto, long timeoutMillis, final InvokeCallback invokeCallback) throws Exception { + final Channel channel = channelFuture.channel(); + if (channel.isOpen() && channel.isActive()) { + final int rqid = sdkProto.getRqid(); + boolean acquired = asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + if (acquired) { + final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, invokeCallback, asyncSemaphore); + asyncResponse.put(rqid, responseFuture); + try { + channelFuture.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) { + if (channelFuture.isSuccess()) { + responseFuture.setIsSendStateOk(true); + return; + } + // 代码执行到些说明发送失败,需要释放资源 + asyncResponse.remove(rqid); + responseFuture.setCause(channelFuture.cause()); + responseFuture.putResponse(null); + + try { + responseFuture.executeInvokeCallback(); + } catch (Exception e) { + logger.warn("excute callback in writeAndFlush addListener, and callback throw", e); + } finally { + responseFuture.release(); + } + logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.", channelFuture.cause()); + } + }); + } catch (Exception e) { + responseFuture.release(); + logger.warn("send a request to channel <" + NettyUtil.parseRemoteAddr(channel) + "> Exception", e); + throw new Exception(NettyUtil.parseRemoteAddr(channel), e); + } + } else { + String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d", + timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits()); + logger.warn(info); + throw new Exception(info); + } + } else { + NettyUtil.closeChannel(channel); + throw new Exception(NettyUtil.parseRemoteAddr(channel)); + } + } + + @Override + public void invokeOneWay(SdkProto sdkProto, long timeoutMillis) throws Exception { + final Channel channel = channelFuture.channel(); + if (channel.isActive()) { + final int rqid = sdkProto.getRqid(); + boolean acquired = onewaySemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + if (acquired) { + try { + channelFuture.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) { + onewaySemaphore.release(); + if (!channelFuture.isSuccess()) { + logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed."); + } + } + }); + } catch (Exception e) { + logger.warn("send a request to channel <" + NettyUtil.parseRemoteAddr(channel) + "> Exception"); + throw new Exception(NettyUtil.parseRemoteAddr(channel), e); + } finally { + asyncResponse.remove(rqid); + } + } else { + String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits()); + logger.warn(info); + throw new Exception(info); + } + } else { + NettyUtil.closeChannel(channel); + throw new Exception(NettyUtil.parseRemoteAddr(channel)); + } + } +} diff --git a/did-sdk/src/main/java/cn/ceres/did/client/Client.java b/did-sdk/src/main/java/cn/ceres/did/client/Client.java new file mode 100644 index 0000000..e791cc5 --- /dev/null +++ b/did-sdk/src/main/java/cn/ceres/did/client/Client.java @@ -0,0 +1,18 @@ +package cn.ceres.did.client; + +import cn.ceres.did.sdk.SdkProto; + +/** + * @author ehlxr + */ +public interface Client { + void start(); + + void shutdown(); + + SdkProto invokeSync(SdkProto proto, long timeoutMillis) throws Exception; + + void invokeAsync(SdkProto proto, long timeoutMillis, InvokeCallback invokeCallback) throws Exception; + + void invokeOneWay(SdkProto proto, long timeoutMillis) throws Exception; +} diff --git a/did-sdk/src/main/java/cn/ceres/did/client/InvokeCallback.java b/did-sdk/src/main/java/cn/ceres/did/client/InvokeCallback.java new file mode 100644 index 0000000..616da69 --- /dev/null +++ b/did-sdk/src/main/java/cn/ceres/did/client/InvokeCallback.java @@ -0,0 +1,9 @@ +package cn.ceres.did.client; + +/** + * @author ehlxr + */ +public interface InvokeCallback { + + void operationComplete(ResponseFuture responseFuture); +} diff --git a/did-sdk/src/main/java/cn/ceres/did/client/ResponseFuture.java b/did-sdk/src/main/java/cn/ceres/did/client/ResponseFuture.java new file mode 100644 index 0000000..4babb2f --- /dev/null +++ b/did-sdk/src/main/java/cn/ceres/did/client/ResponseFuture.java @@ -0,0 +1,109 @@ +package cn.ceres.did.client; + +import cn.ceres.did.sdk.SdkProto; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author ehlxr + */ +public class ResponseFuture { + private final int rqid; + private final long timeoutMillis; + private final Semaphore semaphore; + private final InvokeCallback invokeCallback; + private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false); + private final AtomicBoolean semaphoreReleaseOnlyOnce = new AtomicBoolean(false); + private final CountDownLatch countDownLatch = new CountDownLatch(1); + + private volatile Throwable cause; + private volatile SdkProto sdkProto; + private volatile boolean isSendStateOk; + + public ResponseFuture(int rqid, long timeoutMillis, InvokeCallback invokeCallback, Semaphore semaphore) { + this.rqid = rqid; + this.timeoutMillis = timeoutMillis; + this.invokeCallback = invokeCallback; + this.semaphore = semaphore; + this.isSendStateOk = false; + } + + /** + * 流量控制时,释放获取的信号量 + */ + public void release() { + if (this.semaphore != null) { + if (semaphoreReleaseOnlyOnce.compareAndSet(false, true)) { + this.semaphore.release(); + } + } + } + + public void executeInvokeCallback() { + if (invokeCallback != null) { + if (executeCallbackOnlyOnce.compareAndSet(false, true)) { + invokeCallback.operationComplete(this); + } + } + } + + public SdkProto waitResponse(final long timeoutMillis) throws InterruptedException { + this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); + return this.sdkProto; + } + + /** + * 同步请求时,释放阻塞的CountDown + */ + public void putResponse(SdkProto sdkProto) { + this.sdkProto = sdkProto; + this.countDownLatch.countDown(); + } + + public InvokeCallback getInvokeCallback() { + return invokeCallback; + } + + public SdkProto getSdkProto() { + return sdkProto; + } + + public void setSdkProto(SdkProto sdkProto) { + this.sdkProto = sdkProto; + } + + public boolean isSendStateOk() { + return isSendStateOk; + } + + public void setIsSendStateOk(boolean isSendStateOk) { + this.isSendStateOk = isSendStateOk; + } + + public Throwable getCause() { + return cause; + } + + public void setCause(Throwable cause) { + this.cause = cause; + } + + @Override + public String toString() { + return "ResponseFuture{" + + "rqid=" + rqid + + ", timeoutMillis=" + timeoutMillis + + ", semaphore=" + semaphore + + ", invokeCallback=" + invokeCallback + + ", executeCallbackOnlyOnce=" + executeCallbackOnlyOnce + + ", semaphoreReleaseOnlyOnce=" + semaphoreReleaseOnlyOnce + + ", countDownLatch=" + countDownLatch + + ", cause=" + cause + + ", sdkProto=" + sdkProto + + ", isSendStateOk=" + isSendStateOk + + '}'; + } +} diff --git a/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java b/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java new file mode 100644 index 0000000..e9ce020 --- /dev/null +++ b/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java @@ -0,0 +1,104 @@ +package cn.ceres.did.client; + +import cn.ceres.did.sdk.SdkClientDecoder; +import cn.ceres.did.sdk.SdkClientEncoder; +import cn.ceres.did.sdk.SdkProto; +import cn.ceres.did.common.Constants; +import cn.ceres.did.common.NettyUtil; +import io.netty.channel.*; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +/** + * @author ehlxr + */ +public class SdkClient extends AbstractClient { + private Logger logger = LoggerFactory.getLogger(SdkClient.class); + private String host; + private int port; + + public SdkClient(String host, int port) { + this.host = host; + this.port = port; + } + + public SdkClient() { + } + + @Override + public void start() { + bootstrap.group(workGroup) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) { + socketChannel.pipeline().addLast("SdkServerDecoder", new SdkClientDecoder(12)) + .addLast("SdkServerEncoder", new SdkClientEncoder()) + .addLast("SdkClientHandler", new SdkClientHandler()); + } + }); + + try { + channelFuture = bootstrap.connect((host == null || "".equals(host)) ? Constants.DEFAULT_HOST : host, port == 0 ? Constants.SDKS_PORT : port).sync(); + channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) { + logger.warn("client channel close.", channelFuture.cause()); + shutdown(); + } + }); + + InetSocketAddress address = (InetSocketAddress) channelFuture.channel().remoteAddress(); + logger.info("SdkClient start success, host is {}, port is {}", address.getHostName(), address.getPort()); + } catch (InterruptedException e) { + logger.error("SdkClient start error", e); + shutdown(); + } + } + + class SdkClientHandler extends SimpleChannelInboundHandler { + @Override + protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) { + final int rqid = sdkProto.getRqid(); + final ResponseFuture responseFuture = asyncResponse.get(rqid); + if (responseFuture != null) { + responseFuture.setSdkProto(sdkProto); + responseFuture.release(); + asyncResponse.remove(rqid); + + // 异步请求,执行回调函数 + if (responseFuture.getInvokeCallback() != null) { + responseFuture.executeInvokeCallback(); + } else { + // 同步请求,返回数据并释放CountDown + responseFuture.putResponse(sdkProto); + } + } else { + logger.warn("receive response, but not matched any request, address is {}", NettyUtil.parseRemoteAddr(ctx.channel())); + logger.warn("response data is {}", sdkProto.toString()); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.error("SdkHandler error", cause); + NettyUtil.closeChannel(ctx.channel()); + } + + } + + public void setHost(String host) { + this.host = host; + } + + public void setPort(int port) { + this.port = port; + } +} diff --git a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientDecoder.java b/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientDecoder.java new file mode 100644 index 0000000..a7e0e18 --- /dev/null +++ b/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientDecoder.java @@ -0,0 +1,47 @@ +package cn.ceres.did.sdk; + +import cn.ceres.did.common.NettyUtil; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.FixedLengthFrameDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author ehlxr + */ +public class SdkClientDecoder extends FixedLengthFrameDecoder { + private static final Logger logger = LoggerFactory.getLogger(SdkClientDecoder.class); + + public SdkClientDecoder(int frameLength) { + super(frameLength); + } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) { + ByteBuf buf = null; + try { + buf = (ByteBuf) super.decode(ctx, in); + if (buf == null) { + return null; + } + return new SdkProto(buf.readInt(), buf.readLong()); + } catch (Exception e) { + logger.error("SdkClientDecoder decode exception, " + NettyUtil.parseRemoteAddr(ctx.channel()), e); + NettyUtil.closeChannel(ctx.channel()); + } finally { + if (buf != null) { + buf.release(); + } + } + return null; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Channel channel = ctx.channel(); + logger.error("SdkServerDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause); + NettyUtil.closeChannel(channel); + } +} diff --git a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientEncoder.java b/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientEncoder.java new file mode 100644 index 0000000..70fa069 --- /dev/null +++ b/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientEncoder.java @@ -0,0 +1,29 @@ +package cn.ceres.did.sdk; + +import cn.ceres.did.common.NettyUtil; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author ehlxr + */ +public class SdkClientEncoder extends MessageToByteEncoder { + private static final Logger logger = LoggerFactory.getLogger(SdkClientEncoder.class); + + @Override + protected void encode(ChannelHandlerContext ctx, SdkProto sdkProto, ByteBuf out) { + out.writeInt(sdkProto.getRqid()); + out.writeLong(sdkProto.getDid()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Channel channel = ctx.channel(); + logger.error(String.format("SdkServerEncoder channel [%s] error and will be closed", NettyUtil.parseRemoteAddr(channel)), cause); + NettyUtil.closeChannel(channel); + } +} diff --git a/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java b/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java new file mode 100644 index 0000000..a1b5a9a --- /dev/null +++ b/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java @@ -0,0 +1,53 @@ +package cn.ceres.did.sdk; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author ehlxr + */ +public class SdkProto { + private static AtomicInteger requestId = new AtomicInteger(0); + + /** + * 请求的ID + */ + private int rqid; + /** + * 全局的 ID + */ + private long did; + + public SdkProto() { + rqid = requestId.incrementAndGet(); + did = 0; + } + + public SdkProto(int rqid, long did) { + this.rqid = rqid; + this.did = did; + } + + public int getRqid() { + return rqid; + } + + public void setRqid(int rqid) { + this.rqid = rqid; + } + + public long getDid() { + return did; + } + + public void setDid(long did) { + this.did = did; + } + + @Override + public String toString() { + return "SdkProto{" + + "rqid=" + rqid + + ", did=" + did + + '}'; + } +} diff --git a/did-sdk/src/test/java/cn/ceres/did/DidSdkPressAsyncTest.java b/did-sdk/src/test/java/cn/ceres/did/DidSdkPressAsyncTest.java new file mode 100644 index 0000000..e3801c4 --- /dev/null +++ b/did-sdk/src/test/java/cn/ceres/did/DidSdkPressAsyncTest.java @@ -0,0 +1,62 @@ +package cn.ceres.did; + +import cn.ceres.did.client.InvokeCallback; +import cn.ceres.did.client.ResponseFuture; +import cn.ceres.did.client.SdkClient; +import cn.ceres.did.sdk.SdkProto; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * 异步请求压测 + * + * @author ehlxr + */ +public class DidSdkPressAsyncTest { + private static final Logger logger = LoggerFactory.getLogger(DidSdkPressAsyncTest.class); + // 初始发送总量 + private static int NUM = 80000; + + @Test + public void pressAsyncTest() throws Exception { + SdkClient client = new SdkClient(); + client.init(); + client.start(); + + long start; + long end; + long cast; + long amount = 0; + long allcast = 0; + + for (int k = 0; k < 10; k++) { + final CountDownLatch countDownLatch = new CountDownLatch(NUM); + start = System.currentTimeMillis(); + for (int i = 0; i < NUM; i++) { + final SdkProto sdkProto = new SdkProto(); + client.invokeAsync(sdkProto, 5000, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + countDownLatch.countDown(); + } + }); + } + + end = System.currentTimeMillis(); + cast = (end - start); + allcast += cast; + countDownLatch.await(10, TimeUnit.SECONDS); + + logger.info("invokeAsync test num is: {}, cast time: {} millsec, throughput: {} send/millsec", NUM, cast, (double) NUM / cast); + amount += NUM; + NUM = NUM + 5000; + TimeUnit.SECONDS.sleep(2); + } + + logger.info("invokeAsync test all num is: {}, all cast time: {} millsec, all throughput: {} send/millsec", amount, allcast, (double) amount / allcast); + } +} diff --git a/did-sdk/src/test/java/cn/ceres/did/DidSdkPressSyncTest.java b/did-sdk/src/test/java/cn/ceres/did/DidSdkPressSyncTest.java new file mode 100644 index 0000000..3ea844b --- /dev/null +++ b/did-sdk/src/test/java/cn/ceres/did/DidSdkPressSyncTest.java @@ -0,0 +1,52 @@ +package cn.ceres.did; + +import cn.ceres.did.client.SdkClient; +import cn.ceres.did.sdk.SdkProto; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * 同步请求压测 + * + * @author ehlxr + */ +public class DidSdkPressSyncTest { + private static final Logger logger = LoggerFactory.getLogger(DidSdkPressSyncTest.class); + private static int NUM = 60000; + + @Test + public void pressSyncTest() throws Exception { + SdkClient client = new SdkClient(); + client.init(); + client.start(); + + long start; + long end; + long cast; + long amount = 0; + long allcast = 0; + + for (int k = 0; k < 20; k++) { + start = System.currentTimeMillis(); + for (int i = 0; i < NUM; i++) { + final SdkProto sdkProto = new SdkProto(); + client.invokeSync(sdkProto, 5000); + } + + end = System.currentTimeMillis(); + cast = (end - start); + allcast += cast; + + logger.info("invokeSync test num is: {}, cast time: {} millsec, throughput: {} send/millsec", NUM, cast, (double) NUM / cast); + + amount += NUM; + NUM += 5000; + TimeUnit.SECONDS.sleep(2); + } + + logger.info("invokeSync test all num is: {}, all cast time: {} millsec, all throughput: {} send/millsec", amount, allcast, (double) amount / allcast); + } +} diff --git a/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java b/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java new file mode 100644 index 0000000..e7db853 --- /dev/null +++ b/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java @@ -0,0 +1,51 @@ +package cn.ceres.did; + +import cn.ceres.did.client.InvokeCallback; +import cn.ceres.did.client.ResponseFuture; +import cn.ceres.did.client.SdkClient; +import cn.ceres.did.sdk.SdkProto; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * @author ehlxr + */ +public class DidSdkTest { + private static final int NUM = 100; + + @Test + public void didSdkTest() throws Exception { + SdkClient client = new SdkClient(); + client.init(); + client.start(); + + // 测试同步请求,关注rqid是否对应 + for (int i = 0; i < NUM; i++) { + SdkProto sdkProto = new SdkProto(); + System.out.println(i + " sendProto: " + sdkProto.toString()); + SdkProto resultProto = client.invokeSync(sdkProto, 2000); + System.out.println(i + " resultProto: " + resultProto.toString()); + } + System.out.println("invokeync test finish"); + + // 测试异步请求,关注rqid是否对应 + final CountDownLatch countDownLatch = new CountDownLatch(NUM); + for (int i = 0; i < NUM; i++) { + final SdkProto sdkProto = new SdkProto(); + final int finalI = i; + client.invokeAsync(sdkProto, 2000, new InvokeCallback() { + @Override + public void operationComplete(ResponseFuture responseFuture) { + System.out.println(finalI + " sendProto: " + sdkProto.toString()); + countDownLatch.countDown(); + System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto().toString()); + } + }); + } + countDownLatch.await(10, TimeUnit.SECONDS); + System.out.println("invokeAsync test finish"); + + } +} diff --git a/did-sdk/target/maven-archiver/pom.properties b/did-sdk/target/maven-archiver/pom.properties new file mode 100644 index 0000000..e16081b --- /dev/null +++ b/did-sdk/target/maven-archiver/pom.properties @@ -0,0 +1,5 @@ +#Generated by Maven +#Tue Aug 14 15:16:54 CST 2018 +version=1.0-SNAPSHOT +groupId=cn.ceres.did +artifactId=did-sdk diff --git a/did-sdk/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst b/did-sdk/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst new file mode 100644 index 0000000..e5cca4b --- /dev/null +++ b/did-sdk/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst @@ -0,0 +1,15 @@ +cn/ceres/did/client/SdkClient$2.class +cn/ceres/did/client/InvokeCallback.class +cn/ceres/did/client/SdkClient.class +cn/ceres/did/client/SdkClient$1.class +cn/ceres/did/client/Client.class +cn/ceres/did/client/AbstractClient$4.class +cn/ceres/did/client/ResponseFuture.class +cn/ceres/did/client/AbstractClient.class +cn/ceres/did/client/AbstractClient$1.class +cn/ceres/did/client/AbstractClient$2.class +cn/ceres/did/sdk/SdkProto.class +cn/ceres/did/sdk/SdkClientEncoder.class +cn/ceres/did/client/SdkClient$SdkClientHandler.class +cn/ceres/did/sdk/SdkClientDecoder.class +cn/ceres/did/client/AbstractClient$3.class diff --git a/did-sdk/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst b/did-sdk/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst new file mode 100644 index 0000000..d3fa299 --- /dev/null +++ b/did-sdk/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst @@ -0,0 +1,8 @@ +/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/client/ResponseFuture.java +/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java +/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientDecoder.java +/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/client/Client.java +/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java +/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientEncoder.java +/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java +/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/client/InvokeCallback.java diff --git a/did-sdk/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst b/did-sdk/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst new file mode 100644 index 0000000..921ecc3 --- /dev/null +++ b/did-sdk/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/createdFiles.lst @@ -0,0 +1,5 @@ +cn/ceres/did/DidSdkPressAsyncTest$1.class +cn/ceres/did/DidSdkPressAsyncTest.class +cn/ceres/did/DidSdkTest.class +cn/ceres/did/DidSdkTest$1.class +cn/ceres/did/DidSdkPressSyncTest.class diff --git a/did-sdk/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst b/did-sdk/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst new file mode 100644 index 0000000..d676690 --- /dev/null +++ b/did-sdk/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst @@ -0,0 +1,3 @@ +/Users/ehlxr/WorkSpaces/did/did-sdk/src/test/java/cn/ceres/did/DidSdkPressAsyncTest.java +/Users/ehlxr/WorkSpaces/did/did-sdk/src/test/java/cn/ceres/did/DidSdkPressSyncTest.java +/Users/ehlxr/WorkSpaces/did/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java diff --git a/did-server/dependency-reduced-pom.xml b/did-server/dependency-reduced-pom.xml new file mode 100644 index 0000000..c1b9fc4 --- /dev/null +++ b/did-server/dependency-reduced-pom.xml @@ -0,0 +1,36 @@ + + + + did + cn.ceres.did + 1.0-SNAPSHOT + + 4.0.0 + did-server + did-server + + + + maven-compiler-plugin + + + maven-shade-plugin + + + + + + cn.ceres.did.ServerStarter + + + + + + + + maven-source-plugin + + + + + diff --git a/did-server/pom.xml b/did-server/pom.xml new file mode 100644 index 0000000..38aee74 --- /dev/null +++ b/did-server/pom.xml @@ -0,0 +1,51 @@ + + + + + did + cn.ceres.did + 1.0-SNAPSHOT + + 4.0.0 + + did-server + + did-server + + + + cn.ceres.did + did-common + 1.0-SNAPSHOT + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-shade-plugin + + + + + + cn.ceres.did.ServerStarter + + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + + \ No newline at end of file diff --git a/did-server/src/main/java/cn/ceres/did/ServerStarter.java b/did-server/src/main/java/cn/ceres/did/ServerStarter.java new file mode 100644 index 0000000..7287a32 --- /dev/null +++ b/did-server/src/main/java/cn/ceres/did/ServerStarter.java @@ -0,0 +1,52 @@ +package cn.ceres.did; + +import cn.ceres.did.core.SnowFlake; +import cn.ceres.did.server.http.HttpServer; +import cn.ceres.did.server.sdk.SdkServer; +import cn.ceres.did.common.Constants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 两个服务器进程最好用同一个 SnowFlake 实例,部署在分布式环境时,SnowFlake 的 datacenterId 和 machineId 作为联合键必须全局唯一,否则多个节点的服务可能产生相同的 ID + * + * @author ehlxr + */ +public class ServerStarter { + private static final Logger logger = LoggerFactory.getLogger(ServerStarter.class); + + public static void main(String[] args) { + long datacenterId = Constants.DATACENTER_ID; + long machineId = Constants.MACHINES_SIGN; + + if (args != null && args.length == 2) { + datacenterId = Long.valueOf(args[0]); + machineId = Long.valueOf(args[1]); + } + + datacenterId = "".equals(Constants.getEnv("DATACENTER_ID")) ? datacenterId : Long.valueOf(Constants.getEnv("DATACENTER_ID")); + machineId = "".equals(Constants.getEnv("MACHINES_SIGN")) ? machineId : Long.valueOf(Constants.getEnv("MACHINES_SIGN")); + logger.info("SnowFlake datacenterId is: {}, machineId is: {}", datacenterId, machineId); + + final SnowFlake snowFlake = new SnowFlake(datacenterId, machineId); + + // 启动 Http 服务器 + final HttpServer httpServer = new HttpServer(snowFlake); + httpServer.init(); + httpServer.start(); + + // 启动 Sdk 服务器 + final SdkServer sdkServer = new SdkServer(snowFlake); + sdkServer.init(); + sdkServer.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + httpServer.shutdown(); + sdkServer.shutdown(); + System.exit(0); + } + }); + } +} diff --git a/did-server/src/main/java/cn/ceres/did/core/SnowFlake.java b/did-server/src/main/java/cn/ceres/did/core/SnowFlake.java new file mode 100644 index 0000000..9efbf5c --- /dev/null +++ b/did-server/src/main/java/cn/ceres/did/core/SnowFlake.java @@ -0,0 +1,130 @@ +package cn.ceres.did.core; + +/** + * twitter 的 snowflake 算法 -- java 实现 + * 协议格式:0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 + * 协议解释:0 - 41位时间戳 - 5位数据中心标识 - 5位机器标识 - 12位序列号 + * + * @author ehlxr + */ +public class SnowFlake { + /** + * 起始的时间戳,可以修改为服务第一次启动的时间 + * 一旦服务已经开始使用,起始时间戳就不能改变 + * + * 2018/8/14 00:00:00 + */ + private final static long START_STMP = 1534176000000L; + + /** + * 序列号占用的位数 + */ + private final static long SEQUENCE_BIT = 12; + /** + * 机器标识占用的位数 + */ + private final static long MACHINE_BIT = 5; + /** + * 数据中心占用的位数 + */ + private final static long DATACENTER_BIT = 5; + + /** + * 每一部分的最大值 + */ + private final static long MAX_DATACENTER_NUM = ~(-1L << DATACENTER_BIT); + private final static long MAX_MACHINE_NUM = ~(-1L << MACHINE_BIT); + private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BIT); + + /** + * 每一部分向左的位移 + */ + private final static long MACHINE_LEFT = SEQUENCE_BIT; + private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT; + private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT; + + /** + * 数据中心 + */ + private long datacenterId; + /** + * 机器标识 + */ + private long machineId; + /** + * 序列号 + */ + private long sequence = 0L; + /** + * 上一次时间戳 + */ + private long lastStmp = -1L; + + + /** + * 通过单例模式来获取实例 + * 分布式部署服务时,数据节点标识和机器标识作为联合键必须唯一 + * + * @param datacenterId 数据节点标识ID + * @param machineId 机器标识ID + */ + public SnowFlake(long datacenterId, long machineId) { + if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) { + throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0"); + } + if (machineId > MAX_MACHINE_NUM || machineId < 0) { + throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0"); + } + this.datacenterId = datacenterId; + this.machineId = machineId; + } + + /** + * 产生下一个ID + */ + public synchronized long nextId() { + long currStmp = getNewstmp(); + if (currStmp < lastStmp) { + throw new RuntimeException("Clock moved backwards. Refusing to generate id"); + } + + if (currStmp == lastStmp) { + //相同毫秒内,序列号自增 + sequence = (sequence + 1) & MAX_SEQUENCE; + //同一毫秒的序列数已经达到最大 + if (sequence == 0L) { + currStmp = getNextMill(); + } + } else { + //不同毫秒内,序列号置为0 + sequence = 0L; + } + + lastStmp = currStmp; + + // 时间戳部分 | 数据中心部分 | 机器标识部分 | 序列号部分 + return (currStmp - START_STMP) << TIMESTMP_LEFT | datacenterId << DATACENTER_LEFT | machineId << MACHINE_LEFT | sequence; + } + + private long getNextMill() { + long mill = getNewstmp(); + while (mill <= lastStmp) { + mill = getNewstmp(); + } + return mill; + } + + private long getNewstmp() { + return System.currentTimeMillis(); + } + + public static void main(String[] args) { + SnowFlake snowFlake = new SnowFlake(2, 3); + long start = System.currentTimeMillis(); + for (int i = 0; i < (1 << 18); i++) { + System.out.println(i + ": " + snowFlake.nextId()); + } + long end = System.currentTimeMillis(); + System.out.println(end - start); + } +} diff --git a/did-server/src/main/java/cn/ceres/did/server/BaseServer.java b/did-server/src/main/java/cn/ceres/did/server/BaseServer.java new file mode 100644 index 0000000..b1128b9 --- /dev/null +++ b/did-server/src/main/java/cn/ceres/did/server/BaseServer.java @@ -0,0 +1,64 @@ +package cn.ceres.did.server; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author ehlxr + */ +public abstract class BaseServer implements Server { + protected Logger logger = LoggerFactory.getLogger(getClass()); + + protected DefaultEventLoopGroup defLoopGroup; + protected NioEventLoopGroup bossGroup; + protected NioEventLoopGroup workGroup; + protected ChannelFuture channelFuture; + protected ServerBootstrap serverBootstrap; + protected int port; + + public void init() { + defLoopGroup = new DefaultEventLoopGroup(8, new ThreadFactory() { + private AtomicInteger index = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "DEFAULTEVENTLOOPGROUP_" + index.incrementAndGet()); + } + }); + bossGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { + private AtomicInteger index = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "BOSS_" + index.incrementAndGet()); + } + }); + workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() { + private AtomicInteger index = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "WORK_" + index.incrementAndGet()); + } + }); + + serverBootstrap = new ServerBootstrap(); + } + + @Override + public void shutdown() { + if (defLoopGroup != null) { + defLoopGroup.shutdownGracefully(); + } + bossGroup.shutdownGracefully(); + workGroup.shutdownGracefully(); + logger.info("Server EventLoopGroup shutdown finish"); + } +} diff --git a/did-server/src/main/java/cn/ceres/did/server/Server.java b/did-server/src/main/java/cn/ceres/did/server/Server.java new file mode 100644 index 0000000..5c92289 --- /dev/null +++ b/did-server/src/main/java/cn/ceres/did/server/Server.java @@ -0,0 +1,16 @@ +package cn.ceres.did.server; + +/** + * @author ehlxr + */ +public interface Server { + /** + * 启动服务 + */ + void start(); + + /** + * 关闭服务 + */ + void shutdown(); +} diff --git a/did-server/src/main/java/cn/ceres/did/server/http/HttpServer.java b/did-server/src/main/java/cn/ceres/did/server/http/HttpServer.java new file mode 100644 index 0000000..707f4ba --- /dev/null +++ b/did-server/src/main/java/cn/ceres/did/server/http/HttpServer.java @@ -0,0 +1,75 @@ +package cn.ceres.did.server.http; + +import cn.ceres.did.common.Constants; +import cn.ceres.did.core.SnowFlake; +import cn.ceres.did.server.BaseServer; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; + +import java.net.InetSocketAddress; + +/** + * Http服务器,使用Netty中的Http协议栈, + * 实现中支持多条请求路径,对于不存在的请求路径返回404状态码 + * 如:http://localhost:8099/getTime + * + * @author ehlxr + */ +public class HttpServer extends BaseServer { + private SnowFlake snowFlake; + + public HttpServer(SnowFlake snowFlake) { + this.snowFlake = snowFlake; + this.port = Constants.HTTP_PORT; + } + + @Override + public void init() { + super.init(); + serverBootstrap.group(bossGroup, workGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, false) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_BACKLOG, 1024) + .localAddress(new InetSocketAddress(port)) + .childHandler(new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(defLoopGroup, + new HttpRequestDecoder(), + new HttpObjectAggregator(65536), + new HttpResponseEncoder(), + new HttpServerHandler(snowFlake) + ); + } + }); + + } + + @Override + public void start() { + try { + channelFuture = serverBootstrap.bind().sync(); + InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress(); + logger.info("HttpServer start success, port is:{}", addr.getPort()); + } catch (InterruptedException e) { + logger.error("HttpServer start fail,", e); + } + } + + @Override + public void shutdown() { + if (defLoopGroup != null) { + defLoopGroup.shutdownGracefully(); + } + bossGroup.shutdownGracefully(); + workGroup.shutdownGracefully(); + } + +} diff --git a/did-server/src/main/java/cn/ceres/did/server/http/HttpServerHandler.java b/did-server/src/main/java/cn/ceres/did/server/http/HttpServerHandler.java new file mode 100644 index 0000000..80cce6f --- /dev/null +++ b/did-server/src/main/java/cn/ceres/did/server/http/HttpServerHandler.java @@ -0,0 +1,80 @@ +package cn.ceres.did.server.http; + +import cn.ceres.did.common.Constants; +import cn.ceres.did.common.NettyUtil; +import cn.ceres.did.core.SnowFlake; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * 自定义的处理器,目前支持三种请求: + * getTime: 获取服务器当前时间; + * clientInfo: 获取请求客户端的User-Agent信息 + * 其它: 返回404状态,并且提示404信息 + * + * @author ehlxr + */ +public class HttpServerHandler extends SimpleChannelInboundHandler { + private final Logger logger = LoggerFactory.getLogger(getClass()); + /** + * 通过信号量来控制流量 + */ + private Semaphore semaphore = new Semaphore(Constants.HANDLE_HTTP_TPS); + private SnowFlake snowFlake; + + public HttpServerHandler(SnowFlake snowFlake) { + this.snowFlake = snowFlake; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { + String uri = getUriNoSprit(request); + logger.info("request uri is: {}", uri); + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + if (Constants.HTTP_REQUEST.equals(uri)) { + if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) { + try { + long id = snowFlake.nextId(); + logger.info("HttpServerHandler id is: {}", id); + response.content().writeBytes(("" + id).getBytes()); + } catch (Exception e) { + semaphore.release(); + logger.error("HttpServerHandler error", e); + } + } else { + String info = String.format("HttpServerHandler tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d availablePermit: %d", + Constants.ACQUIRE_TIMEOUTMILLIS, this.semaphore.getQueueLength(), this.semaphore.availablePermits()); + logger.warn(info); + throw new Exception(info); + } + } else { + response.content().writeBytes(("Unsupported uri: " + uri).getBytes()); + } + + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Channel channel = ctx.channel(); + logger.error("HttpServerHandler channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause); + NettyUtil.closeChannel(channel); + } + + + private String getUriNoSprit(FullHttpRequest request) { + String uri = request.uri(); + if (uri.startsWith("/")) { + uri = uri.substring(1); + } + return uri; + } +} diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkProto.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkProto.java new file mode 100644 index 0000000..120c7d5 --- /dev/null +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkProto.java @@ -0,0 +1,44 @@ +package cn.ceres.did.server.sdk; + +/** + * @author ehlxr + */ +public class SdkProto { + /** + * 请求的ID + */ + private int rqid; + /** + * 全局的 ID + */ + private long did; + + SdkProto(int rqid, long did) { + this.rqid = rqid; + this.did = did; + } + + public int getRqid() { + return rqid; + } + + public void setRqid(int rqid) { + this.rqid = rqid; + } + + public long getDid() { + return did; + } + + public void setDid(long did) { + this.did = did; + } + + @Override + public String toString() { + return "SdkProto{" + + "rqid=" + rqid + + ", did=" + did + + '}'; + } +} diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java new file mode 100644 index 0000000..d98e846 --- /dev/null +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java @@ -0,0 +1,56 @@ +package cn.ceres.did.server.sdk; + +import cn.ceres.did.common.Constants; +import cn.ceres.did.server.BaseServer; +import cn.ceres.did.core.SnowFlake; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +import java.net.InetSocketAddress; + +/** + * @author ehlxr + */ +public class SdkServer extends BaseServer { + private SnowFlake snowFlake; + + public SdkServer(SnowFlake snowFlake) { + this.snowFlake = snowFlake; + this.port = Constants.SDKS_PORT; + } + + @Override + public void init() { + super.init(); + serverBootstrap.group(bossGroup, workGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_BACKLOG, 1024) + .localAddress(new InetSocketAddress(port)) + .childHandler(new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(defLoopGroup, + new SdkServerDecoder(12), + new SdkServerEncoder(), + new SdkServerHandler(snowFlake) + ); + } + }); + } + + @Override + public void start() { + try { + channelFuture = serverBootstrap.bind().sync(); + InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress(); + logger.info("SdkServer start success, port is:{}", addr.getPort()); + } catch (InterruptedException e) { + logger.error("SdkServer start fail,", e); + } + } +} diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerDecoder.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerDecoder.java new file mode 100644 index 0000000..a1f7a6e --- /dev/null +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerDecoder.java @@ -0,0 +1,47 @@ +package cn.ceres.did.server.sdk; + +import cn.ceres.did.common.NettyUtil; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.FixedLengthFrameDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author ehlxr + */ +public class SdkServerDecoder extends FixedLengthFrameDecoder { + private static final Logger logger = LoggerFactory.getLogger(SdkServerDecoder.class); + + SdkServerDecoder(int frameLength) { + super(frameLength); + } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) { + ByteBuf buf = null; + try { + buf = (ByteBuf) super.decode(ctx, in); + if (buf == null) { + return null; + } + return new SdkProto(buf.readInt(), buf.readLong()); + } catch (Exception e) { + logger.error("decode exception, " + NettyUtil.parseRemoteAddr(ctx.channel()), e); + NettyUtil.closeChannel(ctx.channel()); + }finally { + if (buf != null) { + buf.release(); + } + } + return null; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Channel channel = ctx.channel(); + logger.error("SdkServerDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel),cause); + NettyUtil.closeChannel(channel); + } +} diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerEncoder.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerEncoder.java new file mode 100644 index 0000000..12758be --- /dev/null +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerEncoder.java @@ -0,0 +1,30 @@ +package cn.ceres.did.server.sdk; + +import cn.ceres.did.common.NettyUtil; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author ehlxr + */ +public class SdkServerEncoder extends MessageToByteEncoder { + private static final Logger logger = LoggerFactory.getLogger(SdkServerEncoder.class); + + + @Override + protected void encode(ChannelHandlerContext channelHandlerContext, SdkProto sdkProto, ByteBuf out) { + out.writeInt(sdkProto.getRqid()); + out.writeLong(sdkProto.getDid()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Channel channel = ctx.channel(); + logger.error("SdkServerEncoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause); + NettyUtil.closeChannel(channel); + } +} diff --git a/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java new file mode 100644 index 0000000..a68d838 --- /dev/null +++ b/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java @@ -0,0 +1,64 @@ +package cn.ceres.did.server.sdk; + +import cn.ceres.did.common.Constants; +import cn.ceres.did.common.NettyUtil; +import cn.ceres.did.core.SnowFlake; +import io.netty.channel.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * 通过雪花算法生成唯一 ID,写入 Channel 返回 + * + * @author ehlxr + */ +public class SdkServerHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(SdkServerHandler.class); + /** + * 通过信号量来控制流量 + */ + private Semaphore semaphore = new Semaphore(Constants.HANDLE_SDKS_TPS); + private SnowFlake snowFlake; + + SdkServerHandler(SnowFlake snowFlake) { + this.snowFlake = snowFlake; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof SdkProto) { + SdkProto sdkProto = (SdkProto) msg; + if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) { + try { + sdkProto.setDid(snowFlake.nextId()); + ctx.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) { + semaphore.release(); + } + }); + } catch (Exception e) { + semaphore.release(); + logger.error("SdkServerhandler error", e); + } + } else { + sdkProto.setDid(-1); + ctx.channel().writeAndFlush(sdkProto); + String info = String.format("SdkServerHandler tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d availablePermit: %d", + Constants.ACQUIRE_TIMEOUTMILLIS, this.semaphore.getQueueLength(), this.semaphore.availablePermits()); + logger.warn(info); + throw new Exception(info); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Channel channel = ctx.channel(); + logger.error("SdkServerHandler channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause); + NettyUtil.closeChannel(channel); + } +} diff --git a/did-server/src/main/resources/logback.xml b/did-server/src/main/resources/logback.xml new file mode 100644 index 0000000..7ae8319 --- /dev/null +++ b/did-server/src/main/resources/logback.xml @@ -0,0 +1,46 @@ + + + + + %date [%thread] %-5level %logger{35}:%line - %msg%n + UTF-8 + + + + + /data/logs/did/did-server-error.log + + /data/logs/did/did-server-error.log.%d{yyyy-MM-dd} + 30 + + + ERROR + ACCEPT + DENY + + + %date [%thread] %-5level %logger{35}:%line - %msg%n + UTF-8 + + + + + /data/logs/did/did-server.log + + /data/logs/did/did-server.log.%d{yyyy-MM-dd} + 30 + + + %date [%thread] %-5level %logger{35}:%line - %msg%n + UTF-8 + + + + + + + + + + + \ No newline at end of file diff --git a/did-server/target/classes/logback.xml b/did-server/target/classes/logback.xml new file mode 100644 index 0000000..7ae8319 --- /dev/null +++ b/did-server/target/classes/logback.xml @@ -0,0 +1,46 @@ + + + + + %date [%thread] %-5level %logger{35}:%line - %msg%n + UTF-8 + + + + + /data/logs/did/did-server-error.log + + /data/logs/did/did-server-error.log.%d{yyyy-MM-dd} + 30 + + + ERROR + ACCEPT + DENY + + + %date [%thread] %-5level %logger{35}:%line - %msg%n + UTF-8 + + + + + /data/logs/did/did-server.log + + /data/logs/did/did-server.log.%d{yyyy-MM-dd} + 30 + + + %date [%thread] %-5level %logger{35}:%line - %msg%n + UTF-8 + + + + + + + + + + + \ No newline at end of file diff --git a/did-server/target/maven-archiver/pom.properties b/did-server/target/maven-archiver/pom.properties new file mode 100644 index 0000000..c60b3d1 --- /dev/null +++ b/did-server/target/maven-archiver/pom.properties @@ -0,0 +1,5 @@ +#Generated by Maven +#Tue Aug 14 15:16:53 CST 2018 +version=1.0-SNAPSHOT +groupId=cn.ceres.did +artifactId=did-server diff --git a/did-server/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst b/did-server/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst new file mode 100644 index 0000000..3cf9319 --- /dev/null +++ b/did-server/target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst @@ -0,0 +1,18 @@ +cn/ceres/did/server/BaseServer$3.class +cn/ceres/did/server/http/HttpServer.class +cn/ceres/did/server/sdk/SdkProto.class +cn/ceres/did/server/Server.class +cn/ceres/did/server/sdk/SdkServerDecoder.class +cn/ceres/did/server/sdk/SdkServer$1.class +cn/ceres/did/core/SnowFlake.class +cn/ceres/did/server/sdk/SdkServerEncoder.class +cn/ceres/did/server/BaseServer.class +cn/ceres/did/server/sdk/SdkServerHandler.class +cn/ceres/did/ServerStarter.class +cn/ceres/did/ServerStarter$1.class +cn/ceres/did/server/http/HttpServerHandler.class +cn/ceres/did/server/http/HttpServer$1.class +cn/ceres/did/server/sdk/SdkServer.class +cn/ceres/did/server/BaseServer$1.class +cn/ceres/did/server/sdk/SdkServerHandler$1.class +cn/ceres/did/server/BaseServer$2.class diff --git a/did-server/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst b/did-server/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst new file mode 100644 index 0000000..8029867 --- /dev/null +++ b/did-server/target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst @@ -0,0 +1,11 @@ +/Users/ehlxr/WorkSpaces/did/did-server/src/main/java/cn/ceres/did/server/http/HttpServerHandler.java +/Users/ehlxr/WorkSpaces/did/did-server/src/main/java/cn/ceres/did/server/http/HttpServer.java +/Users/ehlxr/WorkSpaces/did/did-server/src/main/java/cn/ceres/did/core/SnowFlake.java +/Users/ehlxr/WorkSpaces/did/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerDecoder.java +/Users/ehlxr/WorkSpaces/did/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerHandler.java +/Users/ehlxr/WorkSpaces/did/did-server/src/main/java/cn/ceres/did/ServerStarter.java +/Users/ehlxr/WorkSpaces/did/did-server/src/main/java/cn/ceres/did/server/BaseServer.java +/Users/ehlxr/WorkSpaces/did/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServer.java +/Users/ehlxr/WorkSpaces/did/did-server/src/main/java/cn/ceres/did/server/sdk/SdkProto.java +/Users/ehlxr/WorkSpaces/did/did-server/src/main/java/cn/ceres/did/server/Server.java +/Users/ehlxr/WorkSpaces/did/did-server/src/main/java/cn/ceres/did/server/sdk/SdkServerEncoder.java diff --git a/did-server/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst b/did-server/target/maven-status/maven-compiler-plugin/testCompile/default-testCompile/inputFiles.lst new file mode 100644 index 0000000..e69de29 diff --git a/did.iml b/did.iml new file mode 100644 index 0000000..78b2cc5 --- /dev/null +++ b/did.iml @@ -0,0 +1,2 @@ + + \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..e5438ce --- /dev/null +++ b/pom.xml @@ -0,0 +1,82 @@ + + + + 4.0.0 + + cn.ceres.did + did + pom + 1.0-SNAPSHOT + + did-server + did-sdk + did-common + + + did + + + + + io.netty + netty-all + 4.1.6.Final + + + ch.qos.logback + logback-classic + 1.1.7 + + + junit + junit + 4.11 + test + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.5.1 + + 1.7 + 1.7 + + + + org.apache.maven.plugins + maven-shade-plugin + 3.0.0 + + + package + + shade + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + + jar-no-fork + + + + + + + + +