init
This commit is contained in:
59
did-sdk/dependency-reduced-pom.xml
Normal file
59
did-sdk/dependency-reduced-pom.xml
Normal file
@@ -0,0 +1,59 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<parent>
|
||||
<artifactId>did</artifactId>
|
||||
<groupId>cn.ceres.did</groupId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>did-sdk</artifactId>
|
||||
<name>did-sdk</name>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<configuration>
|
||||
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
|
||||
<artifactSet>
|
||||
<includes>
|
||||
<include>cn.ceres.did:did-common</include>
|
||||
</includes>
|
||||
</artifactSet>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-source-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
<version>4.1.6.Final</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.1.7</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-core</artifactId>
|
||||
<version>1.1.7</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.20</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
52
did-sdk/pom.xml
Normal file
52
did-sdk/pom.xml
Normal file
@@ -0,0 +1,52 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<artifactId>did</artifactId>
|
||||
<groupId>cn.ceres.did</groupId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>did-sdk</artifactId>
|
||||
|
||||
<name>did-sdk</name>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>cn.ceres.did</groupId>
|
||||
<artifactId>did-common</artifactId>
|
||||
<version>1.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<configuration>
|
||||
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
|
||||
<artifactSet>
|
||||
<includes>
|
||||
<include>cn.ceres.did:did-common</include>
|
||||
</includes>
|
||||
</artifactSet>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-source-plugin</artifactId>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
179
did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java
Normal file
179
did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java
Normal file
@@ -0,0 +1,179 @@
|
||||
package cn.ceres.did.client;
|
||||
|
||||
import cn.ceres.did.sdk.SdkProto;
|
||||
import cn.ceres.did.common.NettyUtil;
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* @author ehlxr
|
||||
*/
|
||||
public abstract class AbstractClient implements Client {
|
||||
private Logger logger = LoggerFactory.getLogger(AbstractClient.class);
|
||||
|
||||
private final Semaphore asyncSemaphore = new Semaphore(100000);
|
||||
private final Semaphore onewaySemaphore = new Semaphore(100000);
|
||||
|
||||
ConcurrentMap<Integer, ResponseFuture> asyncResponse;
|
||||
NioEventLoopGroup workGroup;
|
||||
ChannelFuture channelFuture;
|
||||
Bootstrap bootstrap;
|
||||
|
||||
public void init() {
|
||||
asyncResponse = new ConcurrentHashMap<>(16);
|
||||
workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() {
|
||||
private AtomicInteger index = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
return new Thread(r, "WORK_" + index.incrementAndGet());
|
||||
}
|
||||
});
|
||||
|
||||
bootstrap = new Bootstrap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (workGroup != null) {
|
||||
workGroup.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SdkProto invokeSync(SdkProto sdkProto, long timeoutMillis) throws Exception {
|
||||
final Channel channel = channelFuture.channel();
|
||||
if (channel.isActive()) {
|
||||
final int rqid = sdkProto.getRqid();
|
||||
try {
|
||||
final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, null, null);
|
||||
asyncResponse.put(rqid, responseFuture);
|
||||
channel.writeAndFlush(sdkProto).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) {
|
||||
if (channelFuture.isSuccess()) {
|
||||
//发送成功后立即跳出
|
||||
responseFuture.setIsSendStateOk(true);
|
||||
return;
|
||||
}
|
||||
// 代码执行到此说明发送失败,需要释放资源
|
||||
asyncResponse.remove(rqid);
|
||||
responseFuture.putResponse(null);
|
||||
responseFuture.setCause(channelFuture.cause());
|
||||
logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.");
|
||||
}
|
||||
});
|
||||
// 阻塞等待响应
|
||||
SdkProto resultProto = responseFuture.waitResponse(timeoutMillis);
|
||||
if (null == resultProto) {
|
||||
if (responseFuture.isSendStateOk()) {
|
||||
throw new Exception(NettyUtil.parseRemoteAddr(channel) + timeoutMillis + responseFuture.getCause());
|
||||
} else {
|
||||
throw new Exception(NettyUtil.parseRemoteAddr(channel), responseFuture.getCause());
|
||||
}
|
||||
}
|
||||
return resultProto;
|
||||
} catch (Exception e) {
|
||||
logger.error("invokeSync fail, addr is " + NettyUtil.parseRemoteAddr(channel), e);
|
||||
throw new Exception(NettyUtil.parseRemoteAddr(channel), e);
|
||||
} finally {
|
||||
asyncResponse.remove(rqid);
|
||||
}
|
||||
} else {
|
||||
NettyUtil.closeChannel(channel);
|
||||
throw new Exception(NettyUtil.parseRemoteAddr(channel));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invokeAsync(SdkProto sdkProto, long timeoutMillis, final InvokeCallback invokeCallback) throws Exception {
|
||||
final Channel channel = channelFuture.channel();
|
||||
if (channel.isOpen() && channel.isActive()) {
|
||||
final int rqid = sdkProto.getRqid();
|
||||
boolean acquired = asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
if (acquired) {
|
||||
final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, invokeCallback, asyncSemaphore);
|
||||
asyncResponse.put(rqid, responseFuture);
|
||||
try {
|
||||
channelFuture.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) {
|
||||
if (channelFuture.isSuccess()) {
|
||||
responseFuture.setIsSendStateOk(true);
|
||||
return;
|
||||
}
|
||||
// 代码执行到些说明发送失败,需要释放资源
|
||||
asyncResponse.remove(rqid);
|
||||
responseFuture.setCause(channelFuture.cause());
|
||||
responseFuture.putResponse(null);
|
||||
|
||||
try {
|
||||
responseFuture.executeInvokeCallback();
|
||||
} catch (Exception e) {
|
||||
logger.warn("excute callback in writeAndFlush addListener, and callback throw", e);
|
||||
} finally {
|
||||
responseFuture.release();
|
||||
}
|
||||
logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.", channelFuture.cause());
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
responseFuture.release();
|
||||
logger.warn("send a request to channel <" + NettyUtil.parseRemoteAddr(channel) + "> Exception", e);
|
||||
throw new Exception(NettyUtil.parseRemoteAddr(channel), e);
|
||||
}
|
||||
} else {
|
||||
String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d",
|
||||
timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits());
|
||||
logger.warn(info);
|
||||
throw new Exception(info);
|
||||
}
|
||||
} else {
|
||||
NettyUtil.closeChannel(channel);
|
||||
throw new Exception(NettyUtil.parseRemoteAddr(channel));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void invokeOneWay(SdkProto sdkProto, long timeoutMillis) throws Exception {
|
||||
final Channel channel = channelFuture.channel();
|
||||
if (channel.isActive()) {
|
||||
final int rqid = sdkProto.getRqid();
|
||||
boolean acquired = onewaySemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
if (acquired) {
|
||||
try {
|
||||
channelFuture.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) {
|
||||
onewaySemaphore.release();
|
||||
if (!channelFuture.isSuccess()) {
|
||||
logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.");
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.warn("send a request to channel <" + NettyUtil.parseRemoteAddr(channel) + "> Exception");
|
||||
throw new Exception(NettyUtil.parseRemoteAddr(channel), e);
|
||||
} finally {
|
||||
asyncResponse.remove(rqid);
|
||||
}
|
||||
} else {
|
||||
String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits());
|
||||
logger.warn(info);
|
||||
throw new Exception(info);
|
||||
}
|
||||
} else {
|
||||
NettyUtil.closeChannel(channel);
|
||||
throw new Exception(NettyUtil.parseRemoteAddr(channel));
|
||||
}
|
||||
}
|
||||
}
|
18
did-sdk/src/main/java/cn/ceres/did/client/Client.java
Normal file
18
did-sdk/src/main/java/cn/ceres/did/client/Client.java
Normal file
@@ -0,0 +1,18 @@
|
||||
package cn.ceres.did.client;
|
||||
|
||||
import cn.ceres.did.sdk.SdkProto;
|
||||
|
||||
/**
|
||||
* @author ehlxr
|
||||
*/
|
||||
public interface Client {
|
||||
void start();
|
||||
|
||||
void shutdown();
|
||||
|
||||
SdkProto invokeSync(SdkProto proto, long timeoutMillis) throws Exception;
|
||||
|
||||
void invokeAsync(SdkProto proto, long timeoutMillis, InvokeCallback invokeCallback) throws Exception;
|
||||
|
||||
void invokeOneWay(SdkProto proto, long timeoutMillis) throws Exception;
|
||||
}
|
@@ -0,0 +1,9 @@
|
||||
package cn.ceres.did.client;
|
||||
|
||||
/**
|
||||
* @author ehlxr
|
||||
*/
|
||||
public interface InvokeCallback {
|
||||
|
||||
void operationComplete(ResponseFuture responseFuture);
|
||||
}
|
109
did-sdk/src/main/java/cn/ceres/did/client/ResponseFuture.java
Normal file
109
did-sdk/src/main/java/cn/ceres/did/client/ResponseFuture.java
Normal file
@@ -0,0 +1,109 @@
|
||||
package cn.ceres.did.client;
|
||||
|
||||
import cn.ceres.did.sdk.SdkProto;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class ResponseFuture {
|
||||
private final int rqid;
|
||||
private final long timeoutMillis;
|
||||
private final Semaphore semaphore;
|
||||
private final InvokeCallback invokeCallback;
|
||||
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
|
||||
private final AtomicBoolean semaphoreReleaseOnlyOnce = new AtomicBoolean(false);
|
||||
private final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
|
||||
private volatile Throwable cause;
|
||||
private volatile SdkProto sdkProto;
|
||||
private volatile boolean isSendStateOk;
|
||||
|
||||
public ResponseFuture(int rqid, long timeoutMillis, InvokeCallback invokeCallback, Semaphore semaphore) {
|
||||
this.rqid = rqid;
|
||||
this.timeoutMillis = timeoutMillis;
|
||||
this.invokeCallback = invokeCallback;
|
||||
this.semaphore = semaphore;
|
||||
this.isSendStateOk = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 流量控制时,释放获取的信号量
|
||||
*/
|
||||
public void release() {
|
||||
if (this.semaphore != null) {
|
||||
if (semaphoreReleaseOnlyOnce.compareAndSet(false, true)) {
|
||||
this.semaphore.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void executeInvokeCallback() {
|
||||
if (invokeCallback != null) {
|
||||
if (executeCallbackOnlyOnce.compareAndSet(false, true)) {
|
||||
invokeCallback.operationComplete(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public SdkProto waitResponse(final long timeoutMillis) throws InterruptedException {
|
||||
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
return this.sdkProto;
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步请求时,释放阻塞的CountDown
|
||||
*/
|
||||
public void putResponse(SdkProto sdkProto) {
|
||||
this.sdkProto = sdkProto;
|
||||
this.countDownLatch.countDown();
|
||||
}
|
||||
|
||||
public InvokeCallback getInvokeCallback() {
|
||||
return invokeCallback;
|
||||
}
|
||||
|
||||
public SdkProto getSdkProto() {
|
||||
return sdkProto;
|
||||
}
|
||||
|
||||
public void setSdkProto(SdkProto sdkProto) {
|
||||
this.sdkProto = sdkProto;
|
||||
}
|
||||
|
||||
public boolean isSendStateOk() {
|
||||
return isSendStateOk;
|
||||
}
|
||||
|
||||
public void setIsSendStateOk(boolean isSendStateOk) {
|
||||
this.isSendStateOk = isSendStateOk;
|
||||
}
|
||||
|
||||
public Throwable getCause() {
|
||||
return cause;
|
||||
}
|
||||
|
||||
public void setCause(Throwable cause) {
|
||||
this.cause = cause;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ResponseFuture{" +
|
||||
"rqid=" + rqid +
|
||||
", timeoutMillis=" + timeoutMillis +
|
||||
", semaphore=" + semaphore +
|
||||
", invokeCallback=" + invokeCallback +
|
||||
", executeCallbackOnlyOnce=" + executeCallbackOnlyOnce +
|
||||
", semaphoreReleaseOnlyOnce=" + semaphoreReleaseOnlyOnce +
|
||||
", countDownLatch=" + countDownLatch +
|
||||
", cause=" + cause +
|
||||
", sdkProto=" + sdkProto +
|
||||
", isSendStateOk=" + isSendStateOk +
|
||||
'}';
|
||||
}
|
||||
}
|
104
did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java
Normal file
104
did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java
Normal file
@@ -0,0 +1,104 @@
|
||||
package cn.ceres.did.client;
|
||||
|
||||
import cn.ceres.did.sdk.SdkClientDecoder;
|
||||
import cn.ceres.did.sdk.SdkClientEncoder;
|
||||
import cn.ceres.did.sdk.SdkProto;
|
||||
import cn.ceres.did.common.Constants;
|
||||
import cn.ceres.did.common.NettyUtil;
|
||||
import io.netty.channel.*;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class SdkClient extends AbstractClient {
|
||||
private Logger logger = LoggerFactory.getLogger(SdkClient.class);
|
||||
private String host;
|
||||
private int port;
|
||||
|
||||
public SdkClient(String host, int port) {
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
public SdkClient() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
bootstrap.group(workGroup)
|
||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
|
||||
.option(ChannelOption.TCP_NODELAY, true)
|
||||
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||
.channel(NioSocketChannel.class)
|
||||
.handler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel socketChannel) {
|
||||
socketChannel.pipeline().addLast("SdkServerDecoder", new SdkClientDecoder(12))
|
||||
.addLast("SdkServerEncoder", new SdkClientEncoder())
|
||||
.addLast("SdkClientHandler", new SdkClientHandler());
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
channelFuture = bootstrap.connect((host == null || "".equals(host)) ? Constants.DEFAULT_HOST : host, port == 0 ? Constants.SDKS_PORT : port).sync();
|
||||
channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) {
|
||||
logger.warn("client channel close.", channelFuture.cause());
|
||||
shutdown();
|
||||
}
|
||||
});
|
||||
|
||||
InetSocketAddress address = (InetSocketAddress) channelFuture.channel().remoteAddress();
|
||||
logger.info("SdkClient start success, host is {}, port is {}", address.getHostName(), address.getPort());
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("SdkClient start error", e);
|
||||
shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
class SdkClientHandler extends SimpleChannelInboundHandler<SdkProto> {
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) {
|
||||
final int rqid = sdkProto.getRqid();
|
||||
final ResponseFuture responseFuture = asyncResponse.get(rqid);
|
||||
if (responseFuture != null) {
|
||||
responseFuture.setSdkProto(sdkProto);
|
||||
responseFuture.release();
|
||||
asyncResponse.remove(rqid);
|
||||
|
||||
// 异步请求,执行回调函数
|
||||
if (responseFuture.getInvokeCallback() != null) {
|
||||
responseFuture.executeInvokeCallback();
|
||||
} else {
|
||||
// 同步请求,返回数据并释放CountDown
|
||||
responseFuture.putResponse(sdkProto);
|
||||
}
|
||||
} else {
|
||||
logger.warn("receive response, but not matched any request, address is {}", NettyUtil.parseRemoteAddr(ctx.channel()));
|
||||
logger.warn("response data is {}", sdkProto.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
logger.error("SdkHandler error", cause);
|
||||
NettyUtil.closeChannel(ctx.channel());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void setHost(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
public void setPort(int port) {
|
||||
this.port = port;
|
||||
}
|
||||
}
|
47
did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientDecoder.java
Normal file
47
did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientDecoder.java
Normal file
@@ -0,0 +1,47 @@
|
||||
package cn.ceres.did.sdk;
|
||||
|
||||
import cn.ceres.did.common.NettyUtil;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.FixedLengthFrameDecoder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class SdkClientDecoder extends FixedLengthFrameDecoder {
|
||||
private static final Logger logger = LoggerFactory.getLogger(SdkClientDecoder.class);
|
||||
|
||||
public SdkClientDecoder(int frameLength) {
|
||||
super(frameLength);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) {
|
||||
ByteBuf buf = null;
|
||||
try {
|
||||
buf = (ByteBuf) super.decode(ctx, in);
|
||||
if (buf == null) {
|
||||
return null;
|
||||
}
|
||||
return new SdkProto(buf.readInt(), buf.readLong());
|
||||
} catch (Exception e) {
|
||||
logger.error("SdkClientDecoder decode exception, " + NettyUtil.parseRemoteAddr(ctx.channel()), e);
|
||||
NettyUtil.closeChannel(ctx.channel());
|
||||
} finally {
|
||||
if (buf != null) {
|
||||
buf.release();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
Channel channel = ctx.channel();
|
||||
logger.error("SdkServerDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause);
|
||||
NettyUtil.closeChannel(channel);
|
||||
}
|
||||
}
|
29
did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientEncoder.java
Normal file
29
did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientEncoder.java
Normal file
@@ -0,0 +1,29 @@
|
||||
package cn.ceres.did.sdk;
|
||||
|
||||
import cn.ceres.did.common.NettyUtil;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.MessageToByteEncoder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class SdkClientEncoder extends MessageToByteEncoder<SdkProto> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(SdkClientEncoder.class);
|
||||
|
||||
@Override
|
||||
protected void encode(ChannelHandlerContext ctx, SdkProto sdkProto, ByteBuf out) {
|
||||
out.writeInt(sdkProto.getRqid());
|
||||
out.writeLong(sdkProto.getDid());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
Channel channel = ctx.channel();
|
||||
logger.error(String.format("SdkServerEncoder channel [%s] error and will be closed", NettyUtil.parseRemoteAddr(channel)), cause);
|
||||
NettyUtil.closeChannel(channel);
|
||||
}
|
||||
}
|
53
did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java
Normal file
53
did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java
Normal file
@@ -0,0 +1,53 @@
|
||||
package cn.ceres.did.sdk;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class SdkProto {
|
||||
private static AtomicInteger requestId = new AtomicInteger(0);
|
||||
|
||||
/**
|
||||
* 请求的ID
|
||||
*/
|
||||
private int rqid;
|
||||
/**
|
||||
* 全局的 ID
|
||||
*/
|
||||
private long did;
|
||||
|
||||
public SdkProto() {
|
||||
rqid = requestId.incrementAndGet();
|
||||
did = 0;
|
||||
}
|
||||
|
||||
public SdkProto(int rqid, long did) {
|
||||
this.rqid = rqid;
|
||||
this.did = did;
|
||||
}
|
||||
|
||||
public int getRqid() {
|
||||
return rqid;
|
||||
}
|
||||
|
||||
public void setRqid(int rqid) {
|
||||
this.rqid = rqid;
|
||||
}
|
||||
|
||||
public long getDid() {
|
||||
return did;
|
||||
}
|
||||
|
||||
public void setDid(long did) {
|
||||
this.did = did;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SdkProto{" +
|
||||
"rqid=" + rqid +
|
||||
", did=" + did +
|
||||
'}';
|
||||
}
|
||||
}
|
62
did-sdk/src/test/java/cn/ceres/did/DidSdkPressAsyncTest.java
Normal file
62
did-sdk/src/test/java/cn/ceres/did/DidSdkPressAsyncTest.java
Normal file
@@ -0,0 +1,62 @@
|
||||
package cn.ceres.did;
|
||||
|
||||
import cn.ceres.did.client.InvokeCallback;
|
||||
import cn.ceres.did.client.ResponseFuture;
|
||||
import cn.ceres.did.client.SdkClient;
|
||||
import cn.ceres.did.sdk.SdkProto;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 异步请求压测
|
||||
*
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class DidSdkPressAsyncTest {
|
||||
private static final Logger logger = LoggerFactory.getLogger(DidSdkPressAsyncTest.class);
|
||||
// 初始发送总量
|
||||
private static int NUM = 80000;
|
||||
|
||||
@Test
|
||||
public void pressAsyncTest() throws Exception {
|
||||
SdkClient client = new SdkClient();
|
||||
client.init();
|
||||
client.start();
|
||||
|
||||
long start;
|
||||
long end;
|
||||
long cast;
|
||||
long amount = 0;
|
||||
long allcast = 0;
|
||||
|
||||
for (int k = 0; k < 10; k++) {
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(NUM);
|
||||
start = System.currentTimeMillis();
|
||||
for (int i = 0; i < NUM; i++) {
|
||||
final SdkProto sdkProto = new SdkProto();
|
||||
client.invokeAsync(sdkProto, 5000, new InvokeCallback() {
|
||||
@Override
|
||||
public void operationComplete(ResponseFuture responseFuture) {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
end = System.currentTimeMillis();
|
||||
cast = (end - start);
|
||||
allcast += cast;
|
||||
countDownLatch.await(10, TimeUnit.SECONDS);
|
||||
|
||||
logger.info("invokeAsync test num is: {}, cast time: {} millsec, throughput: {} send/millsec", NUM, cast, (double) NUM / cast);
|
||||
amount += NUM;
|
||||
NUM = NUM + 5000;
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
}
|
||||
|
||||
logger.info("invokeAsync test all num is: {}, all cast time: {} millsec, all throughput: {} send/millsec", amount, allcast, (double) amount / allcast);
|
||||
}
|
||||
}
|
52
did-sdk/src/test/java/cn/ceres/did/DidSdkPressSyncTest.java
Normal file
52
did-sdk/src/test/java/cn/ceres/did/DidSdkPressSyncTest.java
Normal file
@@ -0,0 +1,52 @@
|
||||
package cn.ceres.did;
|
||||
|
||||
import cn.ceres.did.client.SdkClient;
|
||||
import cn.ceres.did.sdk.SdkProto;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 同步请求压测
|
||||
*
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class DidSdkPressSyncTest {
|
||||
private static final Logger logger = LoggerFactory.getLogger(DidSdkPressSyncTest.class);
|
||||
private static int NUM = 60000;
|
||||
|
||||
@Test
|
||||
public void pressSyncTest() throws Exception {
|
||||
SdkClient client = new SdkClient();
|
||||
client.init();
|
||||
client.start();
|
||||
|
||||
long start;
|
||||
long end;
|
||||
long cast;
|
||||
long amount = 0;
|
||||
long allcast = 0;
|
||||
|
||||
for (int k = 0; k < 20; k++) {
|
||||
start = System.currentTimeMillis();
|
||||
for (int i = 0; i < NUM; i++) {
|
||||
final SdkProto sdkProto = new SdkProto();
|
||||
client.invokeSync(sdkProto, 5000);
|
||||
}
|
||||
|
||||
end = System.currentTimeMillis();
|
||||
cast = (end - start);
|
||||
allcast += cast;
|
||||
|
||||
logger.info("invokeSync test num is: {}, cast time: {} millsec, throughput: {} send/millsec", NUM, cast, (double) NUM / cast);
|
||||
|
||||
amount += NUM;
|
||||
NUM += 5000;
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
}
|
||||
|
||||
logger.info("invokeSync test all num is: {}, all cast time: {} millsec, all throughput: {} send/millsec", amount, allcast, (double) amount / allcast);
|
||||
}
|
||||
}
|
51
did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java
Normal file
51
did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java
Normal file
@@ -0,0 +1,51 @@
|
||||
package cn.ceres.did;
|
||||
|
||||
import cn.ceres.did.client.InvokeCallback;
|
||||
import cn.ceres.did.client.ResponseFuture;
|
||||
import cn.ceres.did.client.SdkClient;
|
||||
import cn.ceres.did.sdk.SdkProto;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class DidSdkTest {
|
||||
private static final int NUM = 100;
|
||||
|
||||
@Test
|
||||
public void didSdkTest() throws Exception {
|
||||
SdkClient client = new SdkClient();
|
||||
client.init();
|
||||
client.start();
|
||||
|
||||
// 测试同步请求,关注rqid是否对应
|
||||
for (int i = 0; i < NUM; i++) {
|
||||
SdkProto sdkProto = new SdkProto();
|
||||
System.out.println(i + " sendProto: " + sdkProto.toString());
|
||||
SdkProto resultProto = client.invokeSync(sdkProto, 2000);
|
||||
System.out.println(i + " resultProto: " + resultProto.toString());
|
||||
}
|
||||
System.out.println("invokeync test finish");
|
||||
|
||||
// 测试异步请求,关注rqid是否对应
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(NUM);
|
||||
for (int i = 0; i < NUM; i++) {
|
||||
final SdkProto sdkProto = new SdkProto();
|
||||
final int finalI = i;
|
||||
client.invokeAsync(sdkProto, 2000, new InvokeCallback() {
|
||||
@Override
|
||||
public void operationComplete(ResponseFuture responseFuture) {
|
||||
System.out.println(finalI + " sendProto: " + sdkProto.toString());
|
||||
countDownLatch.countDown();
|
||||
System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto().toString());
|
||||
}
|
||||
});
|
||||
}
|
||||
countDownLatch.await(10, TimeUnit.SECONDS);
|
||||
System.out.println("invokeAsync test finish");
|
||||
|
||||
}
|
||||
}
|
5
did-sdk/target/maven-archiver/pom.properties
Normal file
5
did-sdk/target/maven-archiver/pom.properties
Normal file
@@ -0,0 +1,5 @@
|
||||
#Generated by Maven
|
||||
#Tue Aug 14 15:16:54 CST 2018
|
||||
version=1.0-SNAPSHOT
|
||||
groupId=cn.ceres.did
|
||||
artifactId=did-sdk
|
@@ -0,0 +1,15 @@
|
||||
cn/ceres/did/client/SdkClient$2.class
|
||||
cn/ceres/did/client/InvokeCallback.class
|
||||
cn/ceres/did/client/SdkClient.class
|
||||
cn/ceres/did/client/SdkClient$1.class
|
||||
cn/ceres/did/client/Client.class
|
||||
cn/ceres/did/client/AbstractClient$4.class
|
||||
cn/ceres/did/client/ResponseFuture.class
|
||||
cn/ceres/did/client/AbstractClient.class
|
||||
cn/ceres/did/client/AbstractClient$1.class
|
||||
cn/ceres/did/client/AbstractClient$2.class
|
||||
cn/ceres/did/sdk/SdkProto.class
|
||||
cn/ceres/did/sdk/SdkClientEncoder.class
|
||||
cn/ceres/did/client/SdkClient$SdkClientHandler.class
|
||||
cn/ceres/did/sdk/SdkClientDecoder.class
|
||||
cn/ceres/did/client/AbstractClient$3.class
|
@@ -0,0 +1,8 @@
|
||||
/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/client/ResponseFuture.java
|
||||
/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/client/SdkClient.java
|
||||
/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientDecoder.java
|
||||
/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/client/Client.java
|
||||
/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/sdk/SdkProto.java
|
||||
/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/sdk/SdkClientEncoder.java
|
||||
/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/client/AbstractClient.java
|
||||
/Users/ehlxr/WorkSpaces/did/did-sdk/src/main/java/cn/ceres/did/client/InvokeCallback.java
|
@@ -0,0 +1,5 @@
|
||||
cn/ceres/did/DidSdkPressAsyncTest$1.class
|
||||
cn/ceres/did/DidSdkPressAsyncTest.class
|
||||
cn/ceres/did/DidSdkTest.class
|
||||
cn/ceres/did/DidSdkTest$1.class
|
||||
cn/ceres/did/DidSdkPressSyncTest.class
|
@@ -0,0 +1,3 @@
|
||||
/Users/ehlxr/WorkSpaces/did/did-sdk/src/test/java/cn/ceres/did/DidSdkPressAsyncTest.java
|
||||
/Users/ehlxr/WorkSpaces/did/did-sdk/src/test/java/cn/ceres/did/DidSdkPressSyncTest.java
|
||||
/Users/ehlxr/WorkSpaces/did/did-sdk/src/test/java/cn/ceres/did/DidSdkTest.java
|
Reference in New Issue
Block a user