use java8 functional

This commit is contained in:
2021-02-08 17:15:36 +08:00
parent e929cd180f
commit 169bf6f290
20 changed files with 536 additions and 149 deletions

View File

@@ -1,9 +1,6 @@
package io.github.ehlxr.did.client;
import io.github.ehlxr.did.common.Constants;
import io.github.ehlxr.did.common.NettyUtil;
import io.github.ehlxr.did.common.Result;
import io.github.ehlxr.did.common.SdkProto;
import io.github.ehlxr.did.common.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -14,6 +11,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -59,8 +57,8 @@ public abstract class AbstractClient implements Client {
}
});
bootstrap = new Bootstrap();
bootstrap.group(workGroup)
bootstrap = new Bootstrap()
.group(workGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
// .option(ChannelOption.TCP_NODELAY, true)
// .option(ChannelOption.SO_KEEPALIVE, true)
@@ -70,13 +68,11 @@ public abstract class AbstractClient implements Client {
@Override
public void shutdown() {
logger.info("SDK Client shutdowning......");
try {
if (workGroup != null) {
workGroup.shutdownGracefully().sync();
}
} catch (Exception e) {
logger.error("Client EventLoopGroup shutdown error.", e);
}
Optional.ofNullable(workGroup).ifPresent(wg ->
Try.<NioEventLoopGroup>of(w -> w.shutdownGracefully().sync())
.trap(e -> logger.error("Client EventLoopGroup shutdown error.", e))
.accept(wg));
logger.info("SDK Client shutdown finish!");
}
@@ -87,9 +83,12 @@ public abstract class AbstractClient implements Client {
if (channel.isOpen() && channel.isActive()) {
final SdkProto sdkProto = new SdkProto();
final int rqid = sdkProto.getRqid();
try {
return Try.of(() -> {
final ResponseFuture responseFuture = new ResponseFuture(timeoutMillis, null, null);
REPONSE_MAP.put(rqid, responseFuture);
logger.debug("write {} to channel", sdkProto);
channel.writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
//发送成功后立即跳出
@@ -101,18 +100,20 @@ public abstract class AbstractClient implements Client {
responseFuture.setCause(channelFuture.cause());
logger.error("send a request command to channel <{}> failed.", NettyUtil.parseRemoteAddr(channel));
});
// 阻塞等待响应
SdkProto proto = responseFuture.waitResponse(timeoutMillis);
if (null == proto) {
return Result.fail("get result fail, addr is " + NettyUtil.parseRemoteAddr(channel) + responseFuture.getCause());
String msg = String.format("get result from addr %s failed, cause by %s",
NettyUtil.parseRemoteAddr(channel), responseFuture.getCause());
logger.error(msg);
return Result.<SdkProto>fail(msg);
}
return Result.success(proto);
} catch (Exception e) {
logger.error("invokeSync fail, addr is " + NettyUtil.parseRemoteAddr(channel), e);
return Result.fail("invokeSync fail, addr is " + NettyUtil.parseRemoteAddr(channel) + e.getMessage());
} finally {
REPONSE_MAP.remove(rqid);
}
}).trap(e -> logger.error("sync invoke {} failed.", NettyUtil.parseRemoteAddr(channel), e))
.andFinally(() -> REPONSE_MAP.remove(rqid))
.get(Result.fail("failed to sync invoke " + NettyUtil.parseRemoteAddr(channel)));
} else {
NettyUtil.closeChannel(channel);
return Result.fail("channel " + NettyUtil.parseRemoteAddr(channel) + "is not active!");
@@ -128,7 +129,9 @@ public abstract class AbstractClient implements Client {
if (asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)) {
final ResponseFuture responseFuture = new ResponseFuture(timeoutMillis, invokeCallback, asyncSemaphore);
REPONSE_MAP.put(rqid, responseFuture);
try {
Try.of(() -> {
logger.debug("write {} to channel", sdkProto);
channelFuture.channel().writeAndFlush(sdkProto).addListener(channelFuture -> {
if (channelFuture.isSuccess()) {
return;
@@ -141,18 +144,13 @@ public abstract class AbstractClient implements Client {
REPONSE_MAP.remove(rqid);
responseFuture.setCause(channelFuture.cause());
responseFuture.putResponse(null);
responseFuture.executeInvokeCallback();
responseFuture.release();
});
} catch (Exception e) {
}).trap(e -> {
responseFuture.release();
String msg = String.format("send a request to channel <%s> Exception",
NettyUtil.parseRemoteAddr(channel));
logger.error(msg, e);
throw new Exception(msg, e);
}
logger.error("send a request to channel <{}> Exception", NettyUtil.parseRemoteAddr(channel), e);
}).run();
} else {
String msg = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " +
"nums: %d semaphoreAsyncValue: %d",

View File

@@ -28,7 +28,6 @@ public interface Client {
*
* @param timeoutMillis 超时时间
* @return {@link SdkProto}
* @throws Exception 调用异常
*/
Result<SdkProto> invokeSync(long timeoutMillis);
@@ -46,7 +45,6 @@ public interface Client {
*
* @param timeoutMillis 超时时间
* @return id
* @throws Exception 调用异常
*/
default Result<SdkProto> invoke(long timeoutMillis) {
return invokeSync(timeoutMillis);

View File

@@ -1,6 +1,7 @@
package io.github.ehlxr.did.client;
import io.github.ehlxr.did.common.SdkProto;
import io.github.ehlxr.did.common.Try;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
@@ -46,8 +47,12 @@ public class ResponseFuture {
}
}
public SdkProto waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
public SdkProto waitResponse(final long timeoutMillis) {
Try.of(() -> {
if (!this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS)) {
setCause(new RuntimeException("timeout after wait " + timeoutMillis));
}
}).trap(this::setCause).run();
return this.sdkProto;
}

View File

@@ -4,6 +4,7 @@ import io.github.ehlxr.did.client.handler.SdkClientDecoder;
import io.github.ehlxr.did.client.handler.SdkClientEncoder;
import io.github.ehlxr.did.client.handler.SdkClientHandler;
import io.github.ehlxr.did.common.Constants;
import io.github.ehlxr.did.common.Try;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
@@ -50,13 +51,14 @@ public class SdkClient extends AbstractClient {
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast("SdkServerDecoder", new SdkClientDecoder(12))
.addLast("SdkServerEncoder", new SdkClientEncoder())
.addLast("SdkClientHandler", new SdkClientHandler());
socketChannel.pipeline()
.addLast(new SdkClientDecoder())
.addLast(new SdkClientEncoder())
.addLast(new SdkClientHandler());
}
});
try {
Try.of(() -> {
channelFuture = bootstrap.connect(host, port)
.sync()
.channel()
@@ -66,10 +68,10 @@ public class SdkClient extends AbstractClient {
InetSocketAddress address = (InetSocketAddress) channelFuture.channel().remoteAddress();
logger.info("SdkClient start success, host is {}, port is {}", address.getHostName(), address.getPort());
} catch (InterruptedException e) {
}).trap(e -> {
logger.error("SdkClient start error", e);
shutdown();
}
}).run();
}
public static final class SdkClientBuilder {

View File

@@ -25,49 +25,37 @@
package io.github.ehlxr.did.client.handler;
import io.github.ehlxr.did.common.NettyUtil;
import io.github.ehlxr.did.common.SdkProto;
import io.github.ehlxr.did.common.Try;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* @author ehlxr
* @since 2021-01-20 14:42.
*/
public class SdkClientDecoder extends FixedLengthFrameDecoder {
public class SdkClientDecoder extends ByteToMessageDecoder {
private final Logger logger = LoggerFactory.getLogger(SdkClientDecoder.class);
public SdkClientDecoder(int frameLength) {
super(frameLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) {
ByteBuf buf = null;
try {
buf = (ByteBuf) super.decode(ctx, in);
if (buf == null) {
return null;
}
return new SdkProto(buf.readInt(), buf.readLong());
} catch (Exception e) {
logger.error("SdkClientDecoder decode exception, " + NettyUtil.parseRemoteAddr(ctx.channel()), e);
NettyUtil.closeChannel(ctx.channel());
} finally {
if (buf != null) {
buf.release();
}
}
return null;
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
Try.of(() -> {
byte[] bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
out.add(NettyUtil.toObject(bytes));
}).trap(e -> logger.error("decode error", e)).run();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Channel channel = ctx.channel();
logger.error("SdkServerDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause);
logger.error("SdkClientDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause);
NettyUtil.closeChannel(channel);
}
}
}

View File

@@ -27,6 +27,7 @@ package io.github.ehlxr.did.client.handler;
import io.github.ehlxr.did.client.SdkClient;
import io.github.ehlxr.did.common.NettyUtil;
import io.github.ehlxr.did.common.SdkProto;
import io.github.ehlxr.did.common.Try;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@@ -41,11 +42,12 @@ import org.slf4j.LoggerFactory;
public class SdkClientEncoder extends MessageToByteEncoder<SdkProto> {
private final Logger logger = LoggerFactory.getLogger(SdkClient.class);
@Override
protected void encode(ChannelHandlerContext ctx, SdkProto sdkProto, ByteBuf out) {
out.writeInt(sdkProto.getRqid());
out.writeLong(sdkProto.getDid());
System.out.println("-------------");
Try.of(() -> {
out.writeBytes(NettyUtil.toBytes(sdkProto));
}).trap(e -> logger.error("encode error", e)).run();
}
@Override

View File

@@ -42,6 +42,8 @@ public class SdkClientHandler extends SimpleChannelInboundHandler<SdkProto> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) {
logger.debug("sdk client handler receive sdkProto {}", sdkProto);
final int rqid = sdkProto.getRqid();
final ResponseFuture responseFuture = Client.REPONSE_MAP.get(rqid);
if (responseFuture != null) {

View File

@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date [%thread] %-5level %logger{35}:%line - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<root level="DEBBUG">
<appender-ref ref="STDOUT"/>
</root>
<logger name="io.netty" level="OFF"/>
</configuration>

View File

@@ -1,14 +1,11 @@
package io.github.ehlxr.did;
import io.github.ehlxr.did.client.SdkClient;
import io.github.ehlxr.did.common.Result;
import io.github.ehlxr.did.common.SdkProto;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
/**
* @author ehlxr
@@ -20,7 +17,7 @@ public class DidSdkTest {
@Before
public void init() {
// client = new SdkClient("127.0.0.1", 16831, 5000);
client = SdkClient.newBuilder().build();
client = SdkClient.newBuilder().timeoutMillis(5000).build();
client.start();
}
@@ -32,27 +29,26 @@ public class DidSdkTest {
@Test
public void didSdkTest() throws Exception {
// 测试同步请求
for (int i = 0; i < NUM; i++) {
Result<SdkProto> resultProto = client.invokeSync();
System.out.println(resultProto);
}
System.out.println("invokeync test finish");
IntStream.range(0, NUM).parallel().forEach(i -> System.out.println(client.invokeSync()));
// System.out.println("invokeync test finish");
// 测试异步请求
final CountDownLatch countDownLatch = new CountDownLatch(NUM);
for (int i = 0; i < NUM; i++) {
client.invokeAsync(responseFuture -> {
countDownLatch.countDown();
System.out.println(responseFuture.getSdkProto());
});
}
countDownLatch.await(10, TimeUnit.SECONDS);
System.out.println("invokeAsync test finish");
// final CountDownLatch countDownLatch = new CountDownLatch(NUM);
// IntStream.range(0, NUM).forEach(i ->
// Try.of(() -> client.invokeAsync(responseFuture -> {
// System.out.println(responseFuture.getSdkProto());
// countDownLatch.countDown();
// })).trap(Throwable::printStackTrace).run());
//
// //noinspection ResultOfMethodCallIgnored
// countDownLatch.await(10, TimeUnit.SECONDS);
// System.out.println("invokeAsync test finish");
}
@Test
public void testInvoke() {
System.out.println(client.invoke());
// System.out.println(client.invoke());
client.setTimeoutMillis(3000);
System.out.println(client.invoke());