Optimized code

This commit is contained in:
ehlxr 2021-01-19 14:45:49 +08:00
parent 2cc021a74f
commit d34eddd492
11 changed files with 76 additions and 111 deletions

View File

@ -23,13 +23,4 @@
<artifactId>logback-classic</artifactId> <artifactId>logback-classic</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -27,10 +27,6 @@
<build> <build>
<plugins> <plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId> <artifactId>maven-shade-plugin</artifactId>

View File

@ -1,7 +1,7 @@
package cn.ceres.did.client; package cn.ceres.did.client;
import cn.ceres.did.sdk.SdkProto;
import cn.ceres.did.common.NettyUtil; import cn.ceres.did.common.NettyUtil;
import cn.ceres.did.sdk.SdkProto;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author ehlxr * @author ehlxr
*/ */
public abstract class AbstractClient implements Client { public abstract class AbstractClient implements Client {
private Logger logger = LoggerFactory.getLogger(AbstractClient.class); private final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
private final Semaphore asyncSemaphore = new Semaphore(100000); private final Semaphore asyncSemaphore = new Semaphore(100000);
private final Semaphore onewaySemaphore = new Semaphore(100000); private final Semaphore onewaySemaphore = new Semaphore(100000);
@ -30,7 +30,7 @@ public abstract class AbstractClient implements Client {
public void init() { public void init() {
asyncResponse = new ConcurrentHashMap<>(16); asyncResponse = new ConcurrentHashMap<>(16);
workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() { workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() {
private AtomicInteger index = new AtomicInteger(0); private final AtomicInteger index = new AtomicInteger(0);
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
@ -57,20 +57,17 @@ public abstract class AbstractClient implements Client {
try { try {
final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, null, null); final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, null, null);
asyncResponse.put(rqid, responseFuture); asyncResponse.put(rqid, responseFuture);
channel.writeAndFlush(sdkProto).addListener(new ChannelFutureListener() { channel.writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> {
@Override if (channelFuture.isSuccess()) {
public void operationComplete(ChannelFuture channelFuture) { //发送成功后立即跳出
if (channelFuture.isSuccess()) { responseFuture.setIsSendStateOk(true);
//发送成功后立即跳出 return;
responseFuture.setIsSendStateOk(true);
return;
}
// 代码执行到此说明发送失败需要释放资源
asyncResponse.remove(rqid);
responseFuture.putResponse(null);
responseFuture.setCause(channelFuture.cause());
logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.");
} }
// 代码执行到此说明发送失败需要释放资源
asyncResponse.remove(rqid);
responseFuture.putResponse(null);
responseFuture.setCause(channelFuture.cause());
logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.");
}); });
// 阻塞等待响应 // 阻塞等待响应
SdkProto resultProto = responseFuture.waitResponse(timeoutMillis); SdkProto resultProto = responseFuture.waitResponse(timeoutMillis);
@ -104,27 +101,24 @@ public abstract class AbstractClient implements Client {
final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, invokeCallback, asyncSemaphore); final ResponseFuture responseFuture = new ResponseFuture(rqid, timeoutMillis, invokeCallback, asyncSemaphore);
asyncResponse.put(rqid, responseFuture); asyncResponse.put(rqid, responseFuture);
try { try {
channelFuture.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() { channelFuture.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> {
@Override if (channelFuture.isSuccess()) {
public void operationComplete(ChannelFuture channelFuture) { responseFuture.setIsSendStateOk(true);
if (channelFuture.isSuccess()) { return;
responseFuture.setIsSendStateOk(true);
return;
}
// 代码执行到些说明发送失败需要释放资源
asyncResponse.remove(rqid);
responseFuture.setCause(channelFuture.cause());
responseFuture.putResponse(null);
try {
responseFuture.executeInvokeCallback();
} catch (Exception e) {
logger.warn("excute callback in writeAndFlush addListener, and callback throw", e);
} finally {
responseFuture.release();
}
logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.", channelFuture.cause());
} }
// 代码执行到些说明发送失败需要释放资源
asyncResponse.remove(rqid);
responseFuture.setCause(channelFuture.cause());
responseFuture.putResponse(null);
try {
responseFuture.executeInvokeCallback();
} catch (Exception e) {
logger.warn("excute callback in writeAndFlush addListener, and callback throw", e);
} finally {
responseFuture.release();
}
logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.", channelFuture.cause());
}); });
} catch (Exception e) { } catch (Exception e) {
responseFuture.release(); responseFuture.release();
@ -151,13 +145,10 @@ public abstract class AbstractClient implements Client {
boolean acquired = onewaySemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); boolean acquired = onewaySemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) { if (acquired) {
try { try {
channelFuture.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() { channelFuture.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> {
@Override onewaySemaphore.release();
public void operationComplete(ChannelFuture channelFuture) { if (!channelFuture.isSuccess()) {
onewaySemaphore.release(); logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.");
if (!channelFuture.isSuccess()) {
logger.warn("send a request command to channel <" + NettyUtil.parseRemoteAddr(channel) + "> failed.");
}
} }
}); });
} catch (Exception e) { } catch (Exception e) {
@ -167,7 +158,8 @@ public abstract class AbstractClient implements Client {
asyncResponse.remove(rqid); asyncResponse.remove(rqid);
} }
} else { } else {
String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits()); String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d",
timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits());
logger.warn(info); logger.warn(info);
throw new Exception(info); throw new Exception(info);
} }

View File

@ -1,10 +1,10 @@
package cn.ceres.did.client; package cn.ceres.did.client;
import cn.ceres.did.common.Constants;
import cn.ceres.did.common.NettyUtil;
import cn.ceres.did.sdk.SdkClientDecoder; import cn.ceres.did.sdk.SdkClientDecoder;
import cn.ceres.did.sdk.SdkClientEncoder; import cn.ceres.did.sdk.SdkClientEncoder;
import cn.ceres.did.sdk.SdkProto; import cn.ceres.did.sdk.SdkProto;
import cn.ceres.did.common.Constants;
import cn.ceres.did.common.NettyUtil;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
@ -17,7 +17,7 @@ import java.net.InetSocketAddress;
* @author ehlxr * @author ehlxr
*/ */
public class SdkClient extends AbstractClient { public class SdkClient extends AbstractClient {
private Logger logger = LoggerFactory.getLogger(SdkClient.class); private final Logger logger = LoggerFactory.getLogger(SdkClient.class);
private String host; private String host;
private int port; private int port;
@ -47,12 +47,9 @@ public class SdkClient extends AbstractClient {
try { try {
channelFuture = bootstrap.connect((host == null || "".equals(host)) ? Constants.DEFAULT_HOST : host, port == 0 ? Constants.SDKS_PORT : port).sync(); channelFuture = bootstrap.connect((host == null || "".equals(host)) ? Constants.DEFAULT_HOST : host, port == 0 ? Constants.SDKS_PORT : port).sync();
channelFuture.channel().closeFuture().addListener(new ChannelFutureListener() { channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> {
@Override logger.warn("client channel close.", channelFuture.cause());
public void operationComplete(ChannelFuture channelFuture) { shutdown();
logger.warn("client channel close.", channelFuture.cause());
shutdown();
}
}); });
InetSocketAddress address = (InetSocketAddress) channelFuture.channel().remoteAddress(); InetSocketAddress address = (InetSocketAddress) channelFuture.channel().remoteAddress();

View File

@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @author ehlxr * @author ehlxr
*/ */
public class SdkProto { public class SdkProto {
private static final AtomicInteger requestId = new AtomicInteger(0); private static final AtomicInteger REQUEST_ID = new AtomicInteger(0);
/** /**
* 请求的ID * 请求的ID
@ -18,7 +18,7 @@ public class SdkProto {
private long did; private long did;
public SdkProto() { public SdkProto() {
rqid = requestId.incrementAndGet(); rqid = REQUEST_ID.incrementAndGet();
did = 0; did = 0;
} }

View File

@ -1,7 +1,5 @@
package cn.ceres.did; package cn.ceres.did;
import cn.ceres.did.client.InvokeCallback;
import cn.ceres.did.client.ResponseFuture;
import cn.ceres.did.client.SdkClient; import cn.ceres.did.client.SdkClient;
import cn.ceres.did.sdk.SdkProto; import cn.ceres.did.sdk.SdkProto;
import org.junit.Test; import org.junit.Test;
@ -36,13 +34,10 @@ public class DidSdkTest {
for (int i = 0; i < NUM; i++) { for (int i = 0; i < NUM; i++) {
final SdkProto sdkProto = new SdkProto(); final SdkProto sdkProto = new SdkProto();
final int finalI = i; final int finalI = i;
client.invokeAsync(sdkProto, 2000, new InvokeCallback() { client.invokeAsync(sdkProto, 2000, responseFuture -> {
@Override System.out.println(finalI + " sendProto: " + sdkProto.toString());
public void operationComplete(ResponseFuture responseFuture) { countDownLatch.countDown();
System.out.println(finalI + " sendProto: " + sdkProto.toString()); System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto().toString());
countDownLatch.countDown();
System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto().toString());
}
}); });
} }
countDownLatch.await(10, TimeUnit.SECONDS); countDownLatch.await(10, TimeUnit.SECONDS);

View File

@ -23,10 +23,6 @@
<build> <build>
<plugins> <plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId> <artifactId>maven-shade-plugin</artifactId>

View File

@ -1,9 +1,9 @@
package cn.ceres.did; package cn.ceres.did;
import cn.ceres.did.common.Constants;
import cn.ceres.did.core.SnowFlake; import cn.ceres.did.core.SnowFlake;
import cn.ceres.did.server.http.HttpServer; import cn.ceres.did.server.http.HttpServer;
import cn.ceres.did.server.sdk.SdkServer; import cn.ceres.did.server.sdk.SdkServer;
import cn.ceres.did.common.Constants;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -40,13 +40,10 @@ public class ServerStarter {
sdkServer.init(); sdkServer.init();
sdkServer.start(); sdkServer.start();
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
@Override httpServer.shutdown();
public void run() { sdkServer.shutdown();
httpServer.shutdown(); System.exit(0);
sdkServer.shutdown(); }));
System.exit(0);
}
});
} }
} }

View File

@ -1,8 +1,8 @@
package cn.ceres.did.server.sdk; package cn.ceres.did.server.sdk;
import cn.ceres.did.common.Constants; import cn.ceres.did.common.Constants;
import cn.ceres.did.server.BaseServer;
import cn.ceres.did.core.SnowFlake; import cn.ceres.did.core.SnowFlake;
import cn.ceres.did.server.BaseServer;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
@ -18,7 +18,7 @@ public class SdkServer extends BaseServer {
public SdkServer(SnowFlake snowFlake) { public SdkServer(SnowFlake snowFlake) {
this.snowFlake = snowFlake; this.snowFlake = snowFlake;
this.port = "".equals(Constants.getEnv("SDKS_PORT")) ? Constants.SDKS_PORT : Integer.valueOf(Constants.getEnv("SDKS_PORT")); this.port = "".equals(Constants.getEnv("SDKS_PORT")) ? Constants.SDKS_PORT : Integer.parseInt(Constants.getEnv("SDKS_PORT"));
} }
@Override @Override

View File

@ -3,7 +3,10 @@ package cn.ceres.did.server.sdk;
import cn.ceres.did.common.Constants; import cn.ceres.did.common.Constants;
import cn.ceres.did.common.NettyUtil; import cn.ceres.did.common.NettyUtil;
import cn.ceres.did.core.SnowFlake; import cn.ceres.did.core.SnowFlake;
import io.netty.channel.*; import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,12 +37,7 @@ public class SdkServerHandler extends SimpleChannelInboundHandler<SdkProto> {
if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) { if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) {
try { try {
sdkProto.setDid(snowFlake.nextId()); sdkProto.setDid(snowFlake.nextId());
ctx.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() { ctx.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> semaphore.release());
@Override
public void operationComplete(ChannelFuture channelFuture) {
semaphore.release();
}
});
} catch (Exception e) { } catch (Exception e) {
semaphore.release(); semaphore.release();
logger.error("SdkServerhandler error", e); logger.error("SdkServerhandler error", e);

27
pom.xml
View File

@ -16,22 +16,34 @@
<name>did</name> <name>did</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!--目标编译的 Java 版本可以通过以下属性指定,不用配置 maven-compiler-plugin 插件-->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<netty.version>4.1.6.Final</netty.version>
<logback.version>1.1.7</logback.version>
<junit.version>4.11</junit.version>
</properties>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-all</artifactId> <artifactId>netty-all</artifactId>
<version>4.1.6.Final</version> <version>${netty.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>ch.qos.logback</groupId> <groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId> <artifactId>logback-classic</artifactId>
<version>1.1.7</version> <version>${logback.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<version>4.11</version> <version>${junit.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
@ -40,15 +52,6 @@
<build> <build>
<pluginManagement> <pluginManagement>
<plugins> <plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId> <artifactId>maven-shade-plugin</artifactId>