Optimized code

This commit is contained in:
ehlxr 2021-01-19 18:39:42 +08:00
parent d34eddd492
commit e06dcecd75
9 changed files with 61 additions and 38 deletions

View File

@ -27,6 +27,16 @@ public abstract class AbstractClient implements Client {
ChannelFuture channelFuture; ChannelFuture channelFuture;
Bootstrap bootstrap; Bootstrap bootstrap;
int timeoutMillis = 2000;
public int getTimeoutMillis() {
return timeoutMillis;
}
public void setTimeoutMillis(int timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
public void init() { public void init() {
asyncResponse = new ConcurrentHashMap<>(16); asyncResponse = new ConcurrentHashMap<>(16);
workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() { workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() {
@ -168,4 +178,8 @@ public abstract class AbstractClient implements Client {
throw new Exception(NettyUtil.parseRemoteAddr(channel)); throw new Exception(NettyUtil.parseRemoteAddr(channel));
} }
} }
public long invoke() throws Exception {
return invoke(timeoutMillis);
}
} }

View File

@ -15,4 +15,8 @@ public interface Client {
void invokeAsync(SdkProto proto, long timeoutMillis, InvokeCallback invokeCallback) throws Exception; void invokeAsync(SdkProto proto, long timeoutMillis, InvokeCallback invokeCallback) throws Exception;
void invokeOneWay(SdkProto proto, long timeoutMillis) throws Exception; void invokeOneWay(SdkProto proto, long timeoutMillis) throws Exception;
default long invoke(long timeoutMillis) throws Exception {
return invokeSync(new SdkProto(), timeoutMillis).getDid();
}
} }

View File

@ -33,8 +33,8 @@ public class SdkClient extends AbstractClient {
public void start() { public void start() {
bootstrap.group(workGroup) bootstrap.group(workGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.TCP_NODELAY, true) // .option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true) // .option(ChannelOption.SO_KEEPALIVE, true)
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() { .handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
@ -46,7 +46,9 @@ public class SdkClient extends AbstractClient {
}); });
try { try {
channelFuture = bootstrap.connect((host == null || "".equals(host)) ? Constants.DEFAULT_HOST : host, port == 0 ? Constants.SDKS_PORT : port).sync(); channelFuture = bootstrap.connect((host == null || "".equals(host)) ? Constants.DEFAULT_HOST : host,
port == 0 ? Constants.SDKS_PORT : port).sync();
channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> { channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> {
logger.warn("client channel close.", channelFuture.cause()); logger.warn("client channel close.", channelFuture.cause());
shutdown(); shutdown();

View File

@ -1,7 +1,5 @@
package cn.ceres.did; package cn.ceres.did;
import cn.ceres.did.client.InvokeCallback;
import cn.ceres.did.client.ResponseFuture;
import cn.ceres.did.client.SdkClient; import cn.ceres.did.client.SdkClient;
import cn.ceres.did.sdk.SdkProto; import cn.ceres.did.sdk.SdkProto;
import org.junit.After; import org.junit.After;
@ -23,7 +21,7 @@ public class DidSdkPressTest {
@Before @Before
public void init() { public void init() {
client = new SdkClient("127.0.0.1",16831); client = new SdkClient("127.0.0.1", 16831);
// client = new SdkClient(); // client = new SdkClient();
client.init(); client.init();
client.start(); client.start();
@ -50,12 +48,7 @@ public class DidSdkPressTest {
start = System.currentTimeMillis(); start = System.currentTimeMillis();
for (int i = 0; i < NUM; i++) { for (int i = 0; i < NUM; i++) {
final SdkProto sdkProto = new SdkProto(); final SdkProto sdkProto = new SdkProto();
client.invokeAsync(sdkProto, 5000, new InvokeCallback() { client.invokeAsync(sdkProto, 5000, responseFuture -> countDownLatch.countDown());
@Override
public void operationComplete(ResponseFuture responseFuture) {
countDownLatch.countDown();
}
});
} }
// countDownLatch.await(10, TimeUnit.SECONDS); // countDownLatch.await(10, TimeUnit.SECONDS);

View File

@ -2,6 +2,7 @@ package cn.ceres.did;
import cn.ceres.did.client.SdkClient; import cn.ceres.did.client.SdkClient;
import cn.ceres.did.sdk.SdkProto; import cn.ceres.did.sdk.SdkProto;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -12,14 +13,18 @@ import java.util.concurrent.TimeUnit;
*/ */
public class DidSdkTest { public class DidSdkTest {
private static final int NUM = 10; private static final int NUM = 10;
SdkClient client;
@Test @Before
public void didSdkTest() throws Exception { public void init() {
SdkClient client = new SdkClient("127.0.0.1", 16831); client = new SdkClient("127.0.0.1", 16831);
// SdkClient client = new SdkClient(); // SdkClient client = new SdkClient();
client.init(); client.init();
client.start(); client.start();
}
@Test
public void didSdkTest() throws Exception {
// 测试同步请求关注rqid是否对应 // 测试同步请求关注rqid是否对应
for (int i = 0; i < NUM; i++) { for (int i = 0; i < NUM; i++) {
SdkProto sdkProto = new SdkProto(); SdkProto sdkProto = new SdkProto();
@ -44,4 +49,12 @@ public class DidSdkTest {
System.out.println("invokeAsync test finish"); System.out.println("invokeAsync test finish");
} }
@Test
public void testInvoke() throws Exception {
System.out.println(client.invoke());
client.setTimeoutMillis(3000);
System.out.println(client.invoke());
}
} }

View File

@ -11,8 +11,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpResponseEncoder;
import java.net.InetSocketAddress;
/** /**
* Http服务器使用Netty中的Http协议栈 * Http服务器使用Netty中的Http协议栈
* 实现中支持多条请求路径对于不存在的请求路径返回404状态码 * 实现中支持多条请求路径对于不存在的请求路径返回404状态码
@ -33,12 +31,11 @@ public class HttpServer extends BaseServer {
super.init(); super.init();
serverBootstrap.group(bossGroup, workGroup) serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, false) // .option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.TCP_NODELAY, true) // .option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_BACKLOG, 1024)
.localAddress(new InetSocketAddress(port)) // .localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) { protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(defLoopGroup, ch.pipeline().addLast(defLoopGroup,
@ -49,15 +46,17 @@ public class HttpServer extends BaseServer {
); );
} }
}); });
} }
@Override @Override
public void start() { public void start() {
try { try {
channelFuture = serverBootstrap.bind().sync(); channelFuture = serverBootstrap.bind(port).sync();
InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress(); logger.info("HttpServer start success, port is:{}", port);
logger.info("HttpServer start success, port is:{}", addr.getPort());
// channelFuture = serverBootstrap.bind().sync();
// InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress();
// logger.info("HttpServer start success, port is:{}", addr.getPort());
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error("HttpServer start fail,", e); logger.error("HttpServer start fail,", e);
} }

View File

@ -8,8 +8,6 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
/** /**
* @author ehlxr * @author ehlxr
*/ */
@ -26,12 +24,11 @@ public class SdkServer extends BaseServer {
super.init(); super.init();
serverBootstrap.group(bossGroup, workGroup) serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true) // .option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true) // .option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_BACKLOG, 1024)
.localAddress(new InetSocketAddress(port)) // .localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) { protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(defLoopGroup, ch.pipeline().addLast(defLoopGroup,
@ -46,9 +43,12 @@ public class SdkServer extends BaseServer {
@Override @Override
public void start() { public void start() {
try { try {
channelFuture = serverBootstrap.bind().sync(); channelFuture = serverBootstrap.bind(port).sync();
InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress(); logger.info("SdkServer start success, port is:{}", port);
logger.info("SdkServer start success, port is:{}", addr.getPort());
// channelFuture = serverBootstrap.bind().sync();
// InetSocketAddress addr = (InetSocketAddress) channelFuture.channel().localAddress();
// logger.info("SdkServer start success, port is:{}", addr.getPort());
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.error("SdkServer start fail,", e); logger.error("SdkServer start fail,", e);
} }

View File

@ -32,11 +32,10 @@ public class SdkServerHandler extends SimpleChannelInboundHandler<SdkProto> {
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) throws Exception {
// if (msg instanceof SdkProto) {
// SdkProto sdkProto = (SdkProto) msg;
if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) { if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) {
try { try {
sdkProto.setDid(snowFlake.nextId()); sdkProto.setDid(snowFlake.nextId());
ctx.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> semaphore.release()); ctx.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> semaphore.release());
} catch (Exception e) { } catch (Exception e) {
semaphore.release(); semaphore.release();
@ -50,7 +49,6 @@ public class SdkServerHandler extends SimpleChannelInboundHandler<SdkProto> {
logger.warn(info); logger.warn(info);
throw new Exception(info); throw new Exception(info);
} }
// }
} }
@Override @Override

View File

@ -23,7 +23,7 @@
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
<netty.version>4.1.6.Final</netty.version> <netty.version>4.1.58.Final</netty.version>
<logback.version>1.1.7</logback.version> <logback.version>1.1.7</logback.version>
<junit.version>4.11</junit.version> <junit.version>4.11</junit.version>
</properties> </properties>