Optimized code

This commit is contained in:
ehlxr 2021-01-20 15:25:58 +08:00
parent eaeeb56e93
commit 13e80884d7
23 changed files with 341 additions and 282 deletions

View File

@ -12,34 +12,42 @@ public class Constants {
return SYS_ENV.get(key) == null ? "" : SYS_ENV.get(key); return SYS_ENV.get(key) == null ? "" : SYS_ENV.get(key);
} }
public static String DEFAULT_HOST = "localhost"; public static String SERVER_HOST = "localhost";
/** /**
* HTTP协议和SDK协议服务器的端口 * HTTP 协议和 SDK 协议服务器默认端口
*/ */
public static int HTTP_PORT = 16830; public static int HTTP_PORT = 16830;
public static int SDKS_PORT = 16831; public static int SDK_PORT = 16831;
/** /**
* HTTP协议和SDK协议的请求路径 * 数据中心默认标识 ID取值范围0~31
*/ * 机器或进程默认标识 ID取值范围0~31
public static String HTTP_REQUEST = "did"; * <p>
public static String SDKS_REQUEST = "did"; * 两个标识 ID 组合在分布式环境中必须唯一
/**
* 数据中心的标识ID取值范围0~31
* 机器或进程的标识ID取值范围0~31
* 两个标识ID组合在分布式环境中必须唯一
*/ */
public static long DATACENTER_ID = 1; public static long DATACENTER_ID = 1;
public static long MACHINES_ID = 1; public static long MACHINES_ID = 1;
/** /**
* 流量控制表示每秒处理的并发数 * Server 流量控制表示每秒处理的并发数
*/ */
public static int HANDLE_HTTP_TPS = 10000; public static int HANDLE_HTTP_TPS = 10000;
public static int HANDLE_SDKS_TPS = 50000; public static int HANDLE_SDK_TPS = 50000;
/**
* sdk client 流量控制表示每秒处理的并发数
*/
public static int SDK_CLIENT_ASYNC_TPS = 100000;
public static int SDK_CLIENT_ONEWAY_TPS = 100000;
public static int ACQUIRE_TIMEOUTMILLIS = 5000; public static int ACQUIRE_TIMEOUTMILLIS = 5000;
/**
* sdk client 默认超时时间
*/
public static int SDK_CLIENT_TIMEOUTMILLIS = 2000;
private Constants() { private Constants() {
} }
} }

View File

@ -1,7 +1,6 @@
package cn.ceres.did.common; package cn.ceres.did.common;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -36,11 +35,8 @@ public class NettyUtil {
public static void closeChannel(Channel channel) { public static void closeChannel(Channel channel) {
final String addrRemote = parseRemoteAddr(channel); final String addrRemote = parseRemoteAddr(channel);
channel.close().addListener(new ChannelFutureListener() { channel.close().addListener((ChannelFutureListener) future ->
@Override logger.info("closeChannel: close the connection to remote address[{}] result: {}",
public void operationComplete(ChannelFuture future) { addrRemote, future.isSuccess()));
logger.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote, future.isSuccess());
}
});
} }
} }

View File

@ -1,4 +1,4 @@
package cn.ceres.did.sdk; package cn.ceres.did.common;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;

View File

@ -1,7 +1,8 @@
package cn.ceres.did.client; package cn.ceres.did.client;
import cn.ceres.did.common.Constants;
import cn.ceres.did.common.NettyUtil; import cn.ceres.did.common.NettyUtil;
import cn.ceres.did.sdk.SdkProto; import cn.ceres.did.common.SdkProto;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -10,36 +11,43 @@ import io.netty.channel.nio.NioEventLoopGroup;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.concurrent.*; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
* @author ehlxr * @author ehlxr
*/ */
@SuppressWarnings({"unused", "UnusedReturnValue"})
public abstract class AbstractClient implements Client { public abstract class AbstractClient implements Client {
private final Logger logger = LoggerFactory.getLogger(AbstractClient.class); private final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
private final Semaphore asyncSemaphore = new Semaphore(100000); private final Semaphore asyncSemaphore = new Semaphore(Constants.SDK_CLIENT_ASYNC_TPS);
private final Semaphore onewaySemaphore = new Semaphore(100000); private final Semaphore onewaySemaphore = new Semaphore(Constants.SDK_CLIENT_ONEWAY_TPS);
ConcurrentMap<Integer, ResponseFuture> asyncResponse;
NioEventLoopGroup workGroup; NioEventLoopGroup workGroup;
ChannelFuture channelFuture; ChannelFuture channelFuture;
Bootstrap bootstrap; Bootstrap bootstrap;
int timeoutMillis = 2000; int timeoutMillis;
String host;
public int getTimeoutMillis() { int port;
return timeoutMillis;
}
public void setTimeoutMillis(int timeoutMillis) { public void setTimeoutMillis(int timeoutMillis) {
this.timeoutMillis = timeoutMillis; this.timeoutMillis = timeoutMillis;
} }
public void init() { public void setHost(String host) {
asyncResponse = new ConcurrentHashMap<>(16); this.host = host;
workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() { }
public void setPort(int port) {
this.port = port;
}
void init() {
workGroup = new NioEventLoopGroup(new ThreadFactory() {
private final AtomicInteger index = new AtomicInteger(0); private final AtomicInteger index = new AtomicInteger(0);
@Override @Override
@ -53,47 +61,49 @@ public abstract class AbstractClient implements Client {
@Override @Override
public void shutdown() { public void shutdown() {
if (workGroup != null) { logger.info("SDK Client shutdowning......");
workGroup.shutdownGracefully(); try {
if (workGroup != null) {
workGroup.shutdownGracefully().sync();
}
} catch (Exception e) {
logger.error("Client EventLoopGroup shutdown error.", e);
} }
logger.info("SDK Client shutdown finish!");
} }
@Override @Override
public SdkProto invokeSync(SdkProto sdkProto, long timeoutMillis) throws Exception { public SdkProto invokeSync(long timeoutMillis) throws Exception {
final Channel channel = channelFuture.channel(); final Channel channel = channelFuture.channel();
if (channel.isActive()) { if (channel.isActive()) {
final SdkProto sdkProto = new SdkProto();
final int rqid = sdkProto.getRqid(); final int rqid = sdkProto.getRqid();
try { try {
final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, null, null); final ResponseFuture responseFuture = new ResponseFuture(timeoutMillis, null, null);
asyncResponse.put(rqid, responseFuture); REPONSE_MAP.put(rqid, responseFuture);
channel.writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> { channel.writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) { if (channelFuture.isSuccess()) {
//发送成功后立即跳出 //发送成功后立即跳出
responseFuture.setIsSendStateOk(true);
return; return;
} }
// 代码执行到此说明发送失败需要释放资源 // 代码执行到此说明发送失败需要释放资源
asyncResponse.remove(rqid); REPONSE_MAP.remove(rqid);
responseFuture.putResponse(null); responseFuture.putResponse(null);
responseFuture.setCause(channelFuture.cause()); responseFuture.setCause(channelFuture.cause());
logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed."); logger.warn("send a request command to channel <{}> failed.", NettyUtil.parseRemoteAddr(channel));
}); });
// 阻塞等待响应 // 阻塞等待响应
SdkProto resultProto = responseFuture.waitResponse(timeoutMillis); SdkProto resultProto = responseFuture.waitResponse(timeoutMillis);
if (null == resultProto) { if (null == resultProto) {
if (responseFuture.isSendStateOk()) { throw new Exception(NettyUtil.parseRemoteAddr(channel), responseFuture.getCause());
throw new Exception(NettyUtil.parseRemoteAddr(channel) + timeoutMillis + responseFuture.getCause());
} else {
throw new Exception(NettyUtil.parseRemoteAddr(channel), responseFuture.getCause());
}
} }
return resultProto; return resultProto;
} catch (Exception e) { } catch (Exception e) {
logger.error("invokeSync fail, addr is " + NettyUtil.parseRemoteAddr(channel), e); logger.error("invokeSync fail, addr is " + NettyUtil.parseRemoteAddr(channel), e);
throw new Exception(NettyUtil.parseRemoteAddr(channel), e); throw new Exception(NettyUtil.parseRemoteAddr(channel), e);
} finally { } finally {
asyncResponse.remove(rqid); REPONSE_MAP.remove(rqid);
} }
} else { } else {
NettyUtil.closeChannel(channel); NettyUtil.closeChannel(channel);
@ -102,22 +112,21 @@ public abstract class AbstractClient implements Client {
} }
@Override @Override
public void invokeAsync(SdkProto sdkProto, long timeoutMillis, final InvokeCallback invokeCallback) throws Exception { public void invokeAsync(long timeoutMillis, InvokeCallback invokeCallback) throws Exception {
final Channel channel = channelFuture.channel(); final Channel channel = channelFuture.channel();
if (channel.isOpen() && channel.isActive()) { if (channel.isOpen() && channel.isActive()) {
final SdkProto sdkProto = new SdkProto();
final int rqid = sdkProto.getRqid(); final int rqid = sdkProto.getRqid();
boolean acquired = asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)) {
if (acquired) { final ResponseFuture responseFuture = new ResponseFuture(timeoutMillis, invokeCallback, asyncSemaphore);
final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, invokeCallback, asyncSemaphore); REPONSE_MAP.put(rqid, responseFuture);
asyncResponse.put(rqid, responseFuture);
try { try {
channelFuture.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> { channelFuture.channel().writeAndFlush(sdkProto).addListener(channelFuture -> {
if (channelFuture.isSuccess()) { if (channelFuture.isSuccess()) {
responseFuture.setIsSendStateOk(true);
return; return;
} }
// 代码执行到些说明发送失败需要释放资源 // 代码执行到些说明发送失败需要释放资源
asyncResponse.remove(rqid); REPONSE_MAP.remove(rqid);
responseFuture.setCause(channelFuture.cause()); responseFuture.setCause(channelFuture.cause());
responseFuture.putResponse(null); responseFuture.putResponse(null);
@ -128,47 +137,18 @@ public abstract class AbstractClient implements Client {
} finally { } finally {
responseFuture.release(); responseFuture.release();
} }
logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.", channelFuture.cause()); logger.warn("send a request command to channel <{}> failed.",
NettyUtil.parseRemoteAddr(channel), channelFuture.cause());
}); });
} catch (Exception e) { } catch (Exception e) {
responseFuture.release(); responseFuture.release();
logger.warn("send a request to channel <" + NettyUtil.parseRemoteAddr(channel) + "> Exception", e); logger.warn("send a request to channel <{}> Exception",
NettyUtil.parseRemoteAddr(channel), e);
throw new Exception(NettyUtil.parseRemoteAddr(channel), e); throw new Exception(NettyUtil.parseRemoteAddr(channel), e);
} }
} else { } else {
String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d", String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " +
timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits()); "nums: %d semaphoreAsyncValue: %d",
logger.warn(info);
throw new Exception(info);
}
} else {
NettyUtil.closeChannel(channel);
throw new Exception(NettyUtil.parseRemoteAddr(channel));
}
}
@Override
public void invokeOneWay(SdkProto sdkProto, long timeoutMillis) throws Exception {
final Channel channel = channelFuture.channel();
if (channel.isActive()) {
final int rqid = sdkProto.getRqid();
boolean acquired = onewaySemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
try {
channelFuture.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> {
onewaySemaphore.release();
if (!channelFuture.isSuccess()) {
logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.");
}
});
} catch (Exception e) {
logger.warn("send a request to channel <" + NettyUtil.parseRemoteAddr(channel) + "> Exception");
throw new Exception(NettyUtil.parseRemoteAddr(channel), e);
} finally {
asyncResponse.remove(rqid);
}
} else {
String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d",
timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits()); timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits());
logger.warn(info); logger.warn(info);
throw new Exception(info); throw new Exception(info);
@ -182,4 +162,12 @@ public abstract class AbstractClient implements Client {
public long invoke() throws Exception { public long invoke() throws Exception {
return invoke(timeoutMillis); return invoke(timeoutMillis);
} }
public SdkProto invokeSync() throws Exception {
return invokeSync(timeoutMillis);
}
public void invokeAsync(InvokeCallback invokeCallback) throws Exception {
invokeAsync(timeoutMillis, invokeCallback);
}
} }

View File

@ -1,22 +1,53 @@
package cn.ceres.did.client; package cn.ceres.did.client;
import cn.ceres.did.sdk.SdkProto;
import cn.ceres.did.common.SdkProto;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/** /**
* @author ehlxr * @author ehlxr
*/ */
public interface Client { public interface Client {
ConcurrentMap<Integer, ResponseFuture> REPONSE_MAP = new ConcurrentHashMap<>();
/**
* 启动 sdk client
*/
void start(); void start();
/**
* 停止 sdk client
*/
void shutdown(); void shutdown();
SdkProto invokeSync(SdkProto proto, long timeoutMillis) throws Exception; /**
* 同步调用
*
* @param timeoutMillis 超时时间
* @return {@link SdkProto}
* @throws Exception 调用异常
*/
SdkProto invokeSync(long timeoutMillis) throws Exception;
void invokeAsync(SdkProto proto, long timeoutMillis, InvokeCallback invokeCallback) throws Exception; /**
* 异步调用
void invokeOneWay(SdkProto proto, long timeoutMillis) throws Exception; *
* @param timeoutMillis 超时时间
* @param invokeCallback 回调接口
* @throws Exception 调用异常
*/
void invokeAsync(long timeoutMillis, InvokeCallback invokeCallback) throws Exception;
/**
* 获取 id
*
* @param timeoutMillis 超时时间
* @return id
* @throws Exception 调用异常
*/
default long invoke(long timeoutMillis) throws Exception { default long invoke(long timeoutMillis) throws Exception {
return invokeSync(new SdkProto(), timeoutMillis).getDid(); return invokeSync(timeoutMillis).getDid();
} }
} }

View File

@ -5,5 +5,10 @@ package cn.ceres.did.client;
*/ */
public interface InvokeCallback { public interface InvokeCallback {
/**
* 异步回调处理方法
*
* @param responseFuture {@link ResponseFuture}
*/
void operationComplete(ResponseFuture responseFuture); void operationComplete(ResponseFuture responseFuture);
} }

View File

@ -1,6 +1,6 @@
package cn.ceres.did.client; package cn.ceres.did.client;
import cn.ceres.did.sdk.SdkProto; import cn.ceres.did.common.SdkProto;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
@ -11,7 +11,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
* @author ehlxr * @author ehlxr
*/ */
public class ResponseFuture { public class ResponseFuture {
private final int rqid;
private final long timeoutMillis; private final long timeoutMillis;
private final Semaphore semaphore; private final Semaphore semaphore;
private final InvokeCallback invokeCallback; private final InvokeCallback invokeCallback;
@ -21,14 +20,11 @@ public class ResponseFuture {
private volatile Throwable cause; private volatile Throwable cause;
private volatile SdkProto sdkProto; private volatile SdkProto sdkProto;
private volatile boolean isSendStateOk;
public ResponseFuture(int rqid, long timeoutMillis, InvokeCallback invokeCallback, Semaphore semaphore) { public ResponseFuture(long timeoutMillis, InvokeCallback invokeCallback, Semaphore semaphore) {
this.rqid = rqid;
this.timeoutMillis = timeoutMillis; this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback; this.invokeCallback = invokeCallback;
this.semaphore = semaphore; this.semaphore = semaphore;
this.isSendStateOk = false;
} }
/** /**
@ -75,14 +71,6 @@ public class ResponseFuture {
this.sdkProto = sdkProto; this.sdkProto = sdkProto;
} }
public boolean isSendStateOk() {
return isSendStateOk;
}
public void setIsSendStateOk(boolean isSendStateOk) {
this.isSendStateOk = isSendStateOk;
}
public Throwable getCause() { public Throwable getCause() {
return cause; return cause;
} }
@ -94,7 +82,6 @@ public class ResponseFuture {
@Override @Override
public String toString() { public String toString() {
return "ResponseFuture{" + return "ResponseFuture{" +
"rqid=" + rqid +
", timeoutMillis=" + timeoutMillis + ", timeoutMillis=" + timeoutMillis +
", semaphore=" + semaphore + ", semaphore=" + semaphore +
", invokeCallback=" + invokeCallback + ", invokeCallback=" + invokeCallback +
@ -103,7 +90,6 @@ public class ResponseFuture {
", countDownLatch=" + countDownLatch + ", countDownLatch=" + countDownLatch +
", cause=" + cause + ", cause=" + cause +
", sdkProto=" + sdkProto + ", sdkProto=" + sdkProto +
", isSendStateOk=" + isSendStateOk +
'}'; '}';
} }
} }

View File

@ -1,11 +1,12 @@
package cn.ceres.did.client; package cn.ceres.did.client;
import cn.ceres.did.client.handler.SdkClientDecoder;
import cn.ceres.did.client.handler.SdkClientEncoder;
import cn.ceres.did.client.handler.SdkClientHandler;
import cn.ceres.did.common.Constants; import cn.ceres.did.common.Constants;
import cn.ceres.did.common.NettyUtil; import io.netty.channel.ChannelFutureListener;
import cn.ceres.did.sdk.SdkClientDecoder; import io.netty.channel.ChannelInitializer;
import cn.ceres.did.sdk.SdkClientEncoder; import io.netty.channel.ChannelOption;
import cn.ceres.did.sdk.SdkProto;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -18,19 +19,26 @@ import java.net.InetSocketAddress;
*/ */
public class SdkClient extends AbstractClient { public class SdkClient extends AbstractClient {
private final Logger logger = LoggerFactory.getLogger(SdkClient.class); private final Logger logger = LoggerFactory.getLogger(SdkClient.class);
private String host;
private int port;
public SdkClient(String host, int port) { public SdkClient(String host, int port) {
this(host, port, Constants.SDK_CLIENT_TIMEOUTMILLIS);
}
public SdkClient(String host, int port, int timeoutMillis) {
this.host = host; this.host = host;
this.port = port; this.port = port;
this.timeoutMillis = timeoutMillis;
} }
public SdkClient() { public SdkClient() {
this("".equals(Constants.getEnv("SDK_HOST")) ? Constants.SERVER_HOST : Constants.getEnv("HTTP_PORT"),
"".equals(Constants.getEnv("SDK_PORT")) ? Constants.SDK_PORT : Integer.parseInt(Constants.getEnv("SDK_PORT")));
} }
@Override @Override
public void start() { public void start() {
init();
bootstrap.group(workGroup) bootstrap.group(workGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
// .option(ChannelOption.TCP_NODELAY, true) // .option(ChannelOption.TCP_NODELAY, true)
@ -46,8 +54,7 @@ public class SdkClient extends AbstractClient {
}); });
try { try {
channelFuture = bootstrap.connect((host == null || "".equals(host)) ? Constants.DEFAULT_HOST : host, channelFuture = bootstrap.connect(host, port).sync();
port == 0 ? Constants.SDKS_PORT : port).sync();
channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> { channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> {
logger.warn("client channel close.", channelFuture.cause()); logger.warn("client channel close.", channelFuture.cause());
@ -62,42 +69,4 @@ public class SdkClient extends AbstractClient {
} }
} }
class SdkClientHandler extends SimpleChannelInboundHandler<SdkProto> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) {
final int rqid = sdkProto.getRqid();
final ResponseFuture responseFuture = asyncResponse.get(rqid);
if (responseFuture != null) {
responseFuture.setSdkProto(sdkProto);
responseFuture.release();
asyncResponse.remove(rqid);
// 异步请求执行回调函数
if (responseFuture.getInvokeCallback() != null) {
responseFuture.executeInvokeCallback();
} else {
// 同步请求返回数据并释放CountDown
responseFuture.putResponse(sdkProto);
}
} else {
logger.warn("receive response, but not matched any request, address is {}", NettyUtil.parseRemoteAddr(ctx.channel()));
logger.warn("response data is {}", sdkProto.toString());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("SdkHandler error", cause);
NettyUtil.closeChannel(ctx.channel());
}
}
public void setHost(String host) {
this.host = host;
}
public void setPort(int port) {
this.port = port;
}
} }

View File

@ -1,6 +1,31 @@
package cn.ceres.did.sdk; /*
* The MIT License (MIT)
*
* Copyright © 2020 xrv <xrg@live.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package cn.ceres.did.client.handler;
import cn.ceres.did.common.NettyUtil; import cn.ceres.did.common.NettyUtil;
import cn.ceres.did.common.SdkProto;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -10,9 +35,10 @@ import org.slf4j.LoggerFactory;
/** /**
* @author ehlxr * @author ehlxr
* @since 2021-01-20 14:42.
*/ */
public class SdkClientDecoder extends FixedLengthFrameDecoder { public class SdkClientDecoder extends FixedLengthFrameDecoder {
private static final Logger logger = LoggerFactory.getLogger(SdkClientDecoder.class); private final Logger logger = LoggerFactory.getLogger(SdkClientDecoder.class);
public SdkClientDecoder(int frameLength) { public SdkClientDecoder(int frameLength) {
super(frameLength); super(frameLength);

View File

@ -0,0 +1,59 @@
/*
* The MIT License (MIT)
*
* Copyright © 2020 xrv <xrg@live.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package cn.ceres.did.client.handler;
import cn.ceres.did.client.SdkClient;
import cn.ceres.did.common.NettyUtil;
import cn.ceres.did.common.SdkProto;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author ehlxr
* @since 2021-01-20 14:43.
*/
public class SdkClientEncoder extends MessageToByteEncoder<SdkProto> {
private final Logger logger = LoggerFactory.getLogger(SdkClient.class);
@Override
protected void encode(ChannelHandlerContext ctx, SdkProto sdkProto, ByteBuf out) {
out.writeInt(sdkProto.getRqid());
out.writeLong(sdkProto.getDid());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Channel channel = ctx.channel();
logger.error(String.format("SdkServerEncoder channel [%s] error and will be closed",
NettyUtil.parseRemoteAddr(channel)), cause);
NettyUtil.closeChannel(channel);
}
}

View File

@ -0,0 +1,70 @@
/*
* The MIT License (MIT)
*
* Copyright © 2020 xrv <xrg@live.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package cn.ceres.did.client.handler;
import cn.ceres.did.client.Client;
import cn.ceres.did.client.ResponseFuture;
import cn.ceres.did.common.NettyUtil;
import cn.ceres.did.common.SdkProto;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author ehlxr
* @since 2021-01-20 14:43.
*/
public class SdkClientHandler extends SimpleChannelInboundHandler<SdkProto> {
private final Logger logger = LoggerFactory.getLogger(SdkClientHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) {
final int rqid = sdkProto.getRqid();
final ResponseFuture responseFuture = Client.REPONSE_MAP.get(rqid);
if (responseFuture != null) {
responseFuture.setSdkProto(sdkProto);
responseFuture.release();
Client.REPONSE_MAP.remove(rqid);
// 异步请求执行回调函数
if (responseFuture.getInvokeCallback() != null) {
responseFuture.executeInvokeCallback();
} else {
// 同步请求返回数据并释放 CountDown
responseFuture.putResponse(sdkProto);
}
} else {
logger.warn("receive response {}, but not matched any request, address is {}",
sdkProto, NettyUtil.parseRemoteAddr(ctx.channel()));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("SdkHandler error", cause);
NettyUtil.closeChannel(ctx.channel());
}
}

View File

@ -1,29 +0,0 @@
package cn.ceres.did.sdk;
import cn.ceres.did.common.NettyUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author ehlxr
*/
public class SdkClientEncoder extends MessageToByteEncoder<SdkProto> {
private static final Logger logger = LoggerFactory.getLogger(SdkClientEncoder.class);
@Override
protected void encode(ChannelHandlerContext ctx, SdkProto sdkProto, ByteBuf out) {
out.writeInt(sdkProto.getRqid());
out.writeLong(sdkProto.getDid());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Channel channel = ctx.channel();
logger.error(String.format("SdkServerEncoder channel [%s] error and will be closed", NettyUtil.parseRemoteAddr(channel)), cause);
NettyUtil.closeChannel(channel);
}
}

View File

@ -1,7 +1,6 @@
package cn.ceres.did; package cn.ceres.did;
import cn.ceres.did.client.SdkClient; import cn.ceres.did.client.SdkClient;
import cn.ceres.did.sdk.SdkProto;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -21,9 +20,7 @@ public class DidSdkPressTest {
@Before @Before
public void init() { public void init() {
client = new SdkClient("127.0.0.1", 16831); client = new SdkClient();
// client = new SdkClient();
client.init();
client.start(); client.start();
} }
@ -32,7 +29,6 @@ public class DidSdkPressTest {
client.shutdown(); client.shutdown();
} }
@Test @Test
public void asyncTest() throws Exception { public void asyncTest() throws Exception {
long start; long start;
@ -47,8 +43,7 @@ public class DidSdkPressTest {
final CountDownLatch countDownLatch = new CountDownLatch(NUM); final CountDownLatch countDownLatch = new CountDownLatch(NUM);
start = System.currentTimeMillis(); start = System.currentTimeMillis();
for (int i = 0; i < NUM; i++) { for (int i = 0; i < NUM; i++) {
final SdkProto sdkProto = new SdkProto(); client.invokeAsync(responseFuture -> countDownLatch.countDown());
client.invokeAsync(sdkProto, 5000, responseFuture -> countDownLatch.countDown());
} }
// countDownLatch.await(10, TimeUnit.SECONDS); // countDownLatch.await(10, TimeUnit.SECONDS);
@ -74,12 +69,11 @@ public class DidSdkPressTest {
long amount = 0; long amount = 0;
long allcast = 0; long allcast = 0;
for (int k = 0; k < 20; k++) { for (int k = 0; k < 10; k++) {
start = System.currentTimeMillis(); start = System.currentTimeMillis();
int NUM = 60000; int NUM = 60000;
for (int i = 0; i < NUM; i++) { for (int i = 0; i < NUM; i++) {
final SdkProto sdkProto = new SdkProto(); client.invokeSync();
client.invokeSync(sdkProto, 5000);
} }
end = System.currentTimeMillis(); end = System.currentTimeMillis();

View File

@ -1,7 +1,8 @@
package cn.ceres.did; package cn.ceres.did;
import cn.ceres.did.client.SdkClient; import cn.ceres.did.client.SdkClient;
import cn.ceres.did.sdk.SdkProto; import cn.ceres.did.common.SdkProto;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -17,37 +18,35 @@ public class DidSdkTest {
@Before @Before
public void init() { public void init() {
client = new SdkClient("127.0.0.1", 16831); client = new SdkClient("127.0.0.1", 16831, 5000);
// SdkClient client = new SdkClient();
client.init();
client.start(); client.start();
} }
@After
public void destroy() {
client.shutdown();
}
@Test @Test
public void didSdkTest() throws Exception { public void didSdkTest() throws Exception {
// 测试同步请求关注rqid是否对应 // 测试同步请求关注rqid是否对应
for (int i = 0; i < NUM; i++) { for (int i = 0; i < NUM; i++) {
SdkProto sdkProto = new SdkProto(); SdkProto resultProto = client.invokeSync();
System.out.println(i + " sendProto: " + sdkProto.toString()); System.out.println(i + " resultProto: " + resultProto);
SdkProto resultProto = client.invokeSync(sdkProto, 2000);
System.out.println(i + " resultProto: " + resultProto.toString());
} }
System.out.println("invokeync test finish"); System.out.println("invokeync test finish");
// 测试异步请求关注rqid是否对应 // 测试异步请求关注rqid是否对应
final CountDownLatch countDownLatch = new CountDownLatch(NUM); final CountDownLatch countDownLatch = new CountDownLatch(NUM);
for (int i = 0; i < NUM; i++) { for (int i = 0; i < NUM; i++) {
final SdkProto sdkProto = new SdkProto();
final int finalI = i; final int finalI = i;
client.invokeAsync(sdkProto, 2000, responseFuture -> { client.invokeAsync(responseFuture -> {
System.out.println(finalI + " sendProto: " + sdkProto.toString());
countDownLatch.countDown(); countDownLatch.countDown();
System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto().toString()); System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto());
}); });
} }
countDownLatch.await(10, TimeUnit.SECONDS); countDownLatch.await(10, TimeUnit.SECONDS);
System.out.println("invokeAsync test finish"); System.out.println("invokeAsync test finish");
} }
@Test @Test

View File

@ -8,7 +8,7 @@ package cn.ceres.did.core;
* 协议格式0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 * 协议格式0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000
* 协议解释0 - 41 位时间戳 - 5 位数据中心标识 - 5 位机器标识 - 12 位序列号 * 协议解释0 - 41 位时间戳 - 5 位数据中心标识 - 5 位机器标识 - 12 位序列号
* <p> * <p>
* 1 位标识由于 long 基本类型在 Java 中是带符号的最高位是符号位正数是 0负数是 1所以 id 一般是正数最高位是 0 * 1 位标识由于 Long 基本类型在 Java 中是带符号的最高位是符号位正数是 0负数是 1所以 id 一般是正数最高位是 0
* <p> * <p>
* 41 位时间截毫秒级注意41 位时间截不是存储当前时间的时间截而是存储时间截的差值当前时间截 - 开始时间截)得到的值这里的的开始时间截 * 41 位时间截毫秒级注意41 位时间截不是存储当前时间的时间截而是存储时间截的差值当前时间截 - 开始时间截)得到的值这里的的开始时间截
* 一般是我们的 id 生成器开始使用的时间由我们程序来指定的如下下面程序 START_STMP 属性41 位的时间截可以使用 69 (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69 * 一般是我们的 id 生成器开始使用的时间由我们程序来指定的如下下面程序 START_STMP 属性41 位的时间截可以使用 69 (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69

View File

@ -1,5 +1,6 @@
package cn.ceres.did.server; package cn.ceres.did.server;
import cn.ceres.did.core.SnowFlake;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.DefaultEventLoopGroup; import io.netty.channel.DefaultEventLoopGroup;
@ -22,6 +23,7 @@ public abstract class BaseServer implements Server {
protected ChannelFuture channelFuture; protected ChannelFuture channelFuture;
protected ServerBootstrap serverBootstrap; protected ServerBootstrap serverBootstrap;
protected int port; protected int port;
protected SnowFlake snowFlake;
public void init() { public void init() {
defLoopGroup = new DefaultEventLoopGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { defLoopGroup = new DefaultEventLoopGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {

View File

@ -14,20 +14,23 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* Http服务器使用Netty中的Http协议栈 * Http 服务器使用 Netty 中的 Http 协议栈
* 实现中支持多条请求路径对于不存在的请求路径返回404状态码
* http://localhost:8099/getTime
* *
* @author ehlxr * @author ehlxr
*/ */
public class HttpServer extends BaseServer { public class HttpServer extends BaseServer {
protected Logger logger = LoggerFactory.getLogger(HttpServer.class); protected Logger logger = LoggerFactory.getLogger(HttpServer.class);
private final SnowFlake snowFlake; public HttpServer(SnowFlake snowFlake, int port) {
this.snowFlake = snowFlake;
this.port = port;
}
public HttpServer(SnowFlake snowFlake) { public HttpServer(SnowFlake snowFlake) {
this.snowFlake = snowFlake; this.snowFlake = snowFlake;
this.port = "".equals(Constants.getEnv("HTTP_PORT")) ? Constants.HTTP_PORT : Integer.parseInt(Constants.getEnv("HTTP_PORT")); this.port = "".equals(Constants.getEnv("HTTP_PORT")) ?
Constants.HTTP_PORT :
Integer.parseInt(Constants.getEnv("HTTP_PORT"));
} }
@Override @Override
@ -57,10 +60,6 @@ public class HttpServer extends BaseServer {
try { try {
channelFuture = serverBootstrap.bind(port).sync(); channelFuture = serverBootstrap.bind(port).sync();
logger.info("HttpServer start success, port is:{}", port); logger.info("HttpServer start success, port is:{}", port);
// channelFuture = serverBootstrap.bind().sync();
// InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress();
// logger.info("HttpServer start success, port is:{}", addr.getPort());
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error("HttpServer start fail,", e); logger.error("HttpServer start fail,", e);
} }

View File

@ -15,11 +15,6 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* 自定义的处理器目前支持三种请求
* getTime: 获取服务器当前时间
* clientInfo: 获取请求客户端的User-Agent信息
* 其它 返回404状态并且提示404信息
*
* @author ehlxr * @author ehlxr
*/ */
public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> { public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

View File

@ -1,44 +0,0 @@
package cn.ceres.did.server.sdk;
/**
* @author ehlxr
*/
public class SdkProto {
/**
* 请求的ID
*/
private int rqid;
/**
* 全局的 ID
*/
private long did;
SdkProto(int rqid, long did) {
this.rqid = rqid;
this.did = did;
}
public int getRqid() {
return rqid;
}
public void setRqid(int rqid) {
this.rqid = rqid;
}
public long getDid() {
return did;
}
public void setDid(long did) {
this.did = did;
}
@Override
public String toString() {
return "SdkProto{" +
"rqid=" + rqid +
", did=" + did +
'}';
}
}

View File

@ -15,11 +15,17 @@ import org.slf4j.LoggerFactory;
*/ */
public class SdkServer extends BaseServer { public class SdkServer extends BaseServer {
protected Logger logger = LoggerFactory.getLogger(SdkServer.class); protected Logger logger = LoggerFactory.getLogger(SdkServer.class);
private final SnowFlake snowFlake;
public SdkServer(SnowFlake snowFlake) { public SdkServer(SnowFlake snowFlake) {
this.snowFlake = snowFlake; this.snowFlake = snowFlake;
this.port = "".equals(Constants.getEnv("SDKS_PORT")) ? Constants.SDKS_PORT : Integer.parseInt(Constants.getEnv("SDKS_PORT")); this.port = "".equals(Constants.getEnv("SDK_PORT")) ?
Constants.SDK_PORT :
Integer.parseInt(Constants.getEnv("SDK_PORT"));
}
public SdkServer(SnowFlake snowFlake, int port) {
this.snowFlake = snowFlake;
this.port = port;
} }
@Override @Override
@ -48,10 +54,6 @@ public class SdkServer extends BaseServer {
try { try {
channelFuture = serverBootstrap.bind(port).sync(); channelFuture = serverBootstrap.bind(port).sync();
logger.info("SdkServer start success, port is:{}", port); logger.info("SdkServer start success, port is:{}", port);
// channelFuture = serverBootstrap.bind().sync();
// InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress();
// logger.info("SdkServer start success, port is:{}", addr.getPort());
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error("SdkServer start fail,", e); logger.error("SdkServer start fail,", e);
} }

View File

@ -1,6 +1,7 @@
package cn.ceres.did.server.sdk; package cn.ceres.did.server.sdk;
import cn.ceres.did.common.NettyUtil; import cn.ceres.did.common.NettyUtil;
import cn.ceres.did.common.SdkProto;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;

View File

@ -1,6 +1,7 @@
package cn.ceres.did.server.sdk; package cn.ceres.did.server.sdk;
import cn.ceres.did.common.NettyUtil; import cn.ceres.did.common.NettyUtil;
import cn.ceres.did.common.SdkProto;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;

View File

@ -2,6 +2,7 @@ package cn.ceres.did.server.sdk;
import cn.ceres.did.common.Constants; import cn.ceres.did.common.Constants;
import cn.ceres.did.common.NettyUtil; import cn.ceres.did.common.NettyUtil;
import cn.ceres.did.common.SdkProto;
import cn.ceres.did.core.SnowFlake; import cn.ceres.did.core.SnowFlake;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -23,7 +24,7 @@ public class SdkServerHandler extends SimpleChannelInboundHandler<SdkProto> {
/** /**
* 通过信号量来控制流量 * 通过信号量来控制流量
*/ */
private final Semaphore semaphore = new Semaphore(Constants.HANDLE_SDKS_TPS); private final Semaphore semaphore = new Semaphore(Constants.HANDLE_SDK_TPS);
private final SnowFlake snowFlake; private final SnowFlake snowFlake;
SdkServerHandler(SnowFlake snowFlake) { SdkServerHandler(SnowFlake snowFlake) {