From d395b987e62b3869ea1f2cce179064119a035ac2 Mon Sep 17 00:00:00 2001 From: ehlxr Date: Mon, 8 Feb 2021 18:15:39 +0800 Subject: [PATCH] use java8 functional --- .../io/github/ehlxr/did/common/Constants.java | 5 ++++ .../io/github/ehlxr/did/common/NettyUtil.java | 15 +++++++++- .../io/github/ehlxr/did/client/SdkClient.java | 2 +- .../did/client/handler/SdkClientDecoder.java | 27 ++++++++++------- .../did/client/handler/SdkClientEncoder.java | 1 - .../java/io/github/ehlxr/did/DidSdkTest.java | 25 +++++++++------- .../ehlxr/did/server/sdk/SdkServer.java | 3 +- .../did/server/sdk/SdkServerDecoder.java | 29 +++++++++++-------- .../did/server/sdk/SdkServerEncoder.java | 1 + 9 files changed, 69 insertions(+), 39 deletions(-) diff --git a/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java b/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java index 1cb14a1..96859bf 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java +++ b/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java @@ -43,4 +43,9 @@ public class Constants { public static String getEnv(String key) { return SYS_ENV.get(key) == null ? "" : SYS_ENV.get(key); } + + /** + * 编码解码 byte 数组固定长度 + */ + public static int DECODER_FRAMELENGTH = 100; } diff --git a/did-common/src/main/java/io/github/ehlxr/did/common/NettyUtil.java b/did-common/src/main/java/io/github/ehlxr/did/common/NettyUtil.java index 6c31691..328cee5 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/common/NettyUtil.java +++ b/did-common/src/main/java/io/github/ehlxr/did/common/NettyUtil.java @@ -46,7 +46,20 @@ public class NettyUtil { ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); oos.flush(); - return bos.toByteArray(); + + byte[] bytes = bos.toByteArray(); + if (bytes.length > Constants.DECODER_FRAMELENGTH) { + logger.error("bytes length should not bigger than {}", Constants.DECODER_FRAMELENGTH); + return null; + } else if (bytes.length < Constants.DECODER_FRAMELENGTH) { + byte[] result = new byte[Constants.DECODER_FRAMELENGTH]; + + // 如果长度不足,填充 + System.arraycopy(bytes, 0, result, 0, bytes.length); + return result; + } + + return bytes; } public static Object toObject(byte[] bts) throws IOException, ClassNotFoundException { diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java index d752548..cfd2bbb 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java @@ -52,7 +52,7 @@ public class SdkClient extends AbstractClient { @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline() - .addLast(new SdkClientDecoder()) + .addLast(new SdkClientDecoder(Constants.DECODER_FRAMELENGTH)) // 如果长度不够会等待 .addLast(new SdkClientEncoder()) .addLast(new SdkClientHandler()); } diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientDecoder.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientDecoder.java index f0fd622..b0c6a72 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientDecoder.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientDecoder.java @@ -29,27 +29,32 @@ import io.github.ehlxr.did.common.Try; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.FixedLengthFrameDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - /** * @author ehlxr * @since 2021-01-20 14:42. */ -public class SdkClientDecoder extends ByteToMessageDecoder { +public class SdkClientDecoder extends FixedLengthFrameDecoder { private final Logger logger = LoggerFactory.getLogger(SdkClientDecoder.class); - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { - Try.of(() -> { - byte[] bytes = new byte[in.readableBytes()]; - in.readBytes(bytes); + public SdkClientDecoder(int frameLength) { + super(frameLength); + } - out.add(NettyUtil.toObject(bytes)); - }).trap(e -> logger.error("decode error", e)).run(); + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) { + return Try.of(() -> { + ByteBuf decode = (ByteBuf) super.decode(ctx, in); + + byte[] bytes = new byte[decode.readableBytes()]; + decode.readBytes(bytes); + + decode.release(); + return NettyUtil.toObject(bytes); + }).trap(e -> logger.error("decode error", e)).get(); } @Override diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientEncoder.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientEncoder.java index 6bac0d8..a15e00f 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientEncoder.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientEncoder.java @@ -44,7 +44,6 @@ public class SdkClientEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, SdkProto sdkProto, ByteBuf out) { - System.out.println("-------------"); Try.of(() -> { out.writeBytes(NettyUtil.toBytes(sdkProto)); }).trap(e -> logger.error("encode error", e)).run(); diff --git a/did-sdk/src/test/java/io/github/ehlxr/did/DidSdkTest.java b/did-sdk/src/test/java/io/github/ehlxr/did/DidSdkTest.java index bb49aaa..9205e27 100644 --- a/did-sdk/src/test/java/io/github/ehlxr/did/DidSdkTest.java +++ b/did-sdk/src/test/java/io/github/ehlxr/did/DidSdkTest.java @@ -1,10 +1,13 @@ package io.github.ehlxr.did; import io.github.ehlxr.did.client.SdkClient; +import io.github.ehlxr.did.common.Try; import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; /** @@ -31,19 +34,19 @@ public class DidSdkTest { // 测试同步请求 IntStream.range(0, NUM).parallel().forEach(i -> System.out.println(client.invokeSync())); - // System.out.println("invokeync test finish"); + System.out.println("invokeync test finish"); // 测试异步请求 - // final CountDownLatch countDownLatch = new CountDownLatch(NUM); - // IntStream.range(0, NUM).forEach(i -> - // Try.of(() -> client.invokeAsync(responseFuture -> { - // System.out.println(responseFuture.getSdkProto()); - // countDownLatch.countDown(); - // })).trap(Throwable::printStackTrace).run()); - // - // //noinspection ResultOfMethodCallIgnored - // countDownLatch.await(10, TimeUnit.SECONDS); - // System.out.println("invokeAsync test finish"); + final CountDownLatch countDownLatch = new CountDownLatch(NUM); + IntStream.range(0, NUM).forEach(i -> + Try.of(() -> client.invokeAsync(responseFuture -> { + System.out.println(responseFuture.getSdkProto()); + countDownLatch.countDown(); + })).trap(Throwable::printStackTrace).run()); + + //noinspection ResultOfMethodCallIgnored + countDownLatch.await(10, TimeUnit.SECONDS); + System.out.println("invokeAsync test finish"); } @Test diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServer.java b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServer.java index 55c9639..986349d 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServer.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServer.java @@ -49,8 +49,7 @@ public class SdkServer extends BaseServer { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(defLoopGroup, - new SdkServerDecoder(), - // new SdkServerDecoder(12), + new SdkServerDecoder(Constants.DECODER_FRAMELENGTH),// 如果长度不够会等待 new SdkServerEncoder(), new SdkServerHandler(snowFlake) ); diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java index f10b282..19b8cff 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java @@ -5,26 +5,31 @@ import io.github.ehlxr.did.common.Try; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.FixedLengthFrameDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - /** * @author ehlxr */ -public class SdkServerDecoder extends ByteToMessageDecoder { - private final Logger logger = LoggerFactory.getLogger(SdkServerDecoder.class); +public class SdkServerDecoder extends FixedLengthFrameDecoder { + private static final Logger logger = LoggerFactory.getLogger(SdkServerDecoder.class); + + SdkServerDecoder(int frameLength) { + super(frameLength); + } @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { - Try.of(() -> { - byte[] bytes = new byte[in.readableBytes()]; - in.readBytes(bytes); + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) { + return Try.of(() -> { + ByteBuf decode = (ByteBuf) super.decode(ctx, in); - out.add(NettyUtil.toObject(bytes)); - }).trap(e -> logger.error("decode error", e)).run(); + byte[] bytes = new byte[decode.readableBytes()]; + decode.readBytes(bytes); + + decode.release(); + return NettyUtil.toObject(bytes); + }).trap(e -> logger.error("decode error", e)).get(); } @Override @@ -33,4 +38,4 @@ public class SdkServerDecoder extends ByteToMessageDecoder { logger.error("SdkServerDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause); NettyUtil.closeChannel(channel); } -} \ No newline at end of file +} diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerEncoder.java b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerEncoder.java index 35bab76..7bddafb 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerEncoder.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerEncoder.java @@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory; public class SdkServerEncoder extends MessageToByteEncoder { private static final Logger logger = LoggerFactory.getLogger(SdkServerEncoder.class); + @Override protected void encode(ChannelHandlerContext channelHandlerContext, SdkProto sdkProto, ByteBuf out) { Try.of(() -> {