use java8 functional
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2021-02-08 18:15:39 +08:00
parent 169bf6f290
commit d395b987e6
9 changed files with 69 additions and 39 deletions

View File

@@ -43,4 +43,9 @@ public class Constants {
public static String getEnv(String key) { public static String getEnv(String key) {
return SYS_ENV.get(key) == null ? "" : SYS_ENV.get(key); return SYS_ENV.get(key) == null ? "" : SYS_ENV.get(key);
} }
/**
* 编码解码 byte 数组固定长度
*/
public static int DECODER_FRAMELENGTH = 100;
} }

View File

@@ -46,7 +46,20 @@ public class NettyUtil {
ObjectOutputStream oos = new ObjectOutputStream(bos); ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj); oos.writeObject(obj);
oos.flush(); oos.flush();
return bos.toByteArray();
byte[] bytes = bos.toByteArray();
if (bytes.length > Constants.DECODER_FRAMELENGTH) {
logger.error("bytes length should not bigger than {}", Constants.DECODER_FRAMELENGTH);
return null;
} else if (bytes.length < Constants.DECODER_FRAMELENGTH) {
byte[] result = new byte[Constants.DECODER_FRAMELENGTH];
// 如果长度不足,填充
System.arraycopy(bytes, 0, result, 0, bytes.length);
return result;
}
return bytes;
} }
public static Object toObject(byte[] bts) throws IOException, ClassNotFoundException { public static Object toObject(byte[] bts) throws IOException, ClassNotFoundException {

View File

@@ -52,7 +52,7 @@ public class SdkClient extends AbstractClient {
@Override @Override
protected void initChannel(SocketChannel socketChannel) { protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline() socketChannel.pipeline()
.addLast(new SdkClientDecoder()) .addLast(new SdkClientDecoder(Constants.DECODER_FRAMELENGTH)) // 如果长度不够会等待
.addLast(new SdkClientEncoder()) .addLast(new SdkClientEncoder())
.addLast(new SdkClientHandler()); .addLast(new SdkClientHandler());
} }

View File

@@ -29,27 +29,32 @@ import io.github.ehlxr.did.common.Try;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.List;
/** /**
* @author ehlxr * @author ehlxr
* @since 2021-01-20 14:42. * @since 2021-01-20 14:42.
*/ */
public class SdkClientDecoder extends ByteToMessageDecoder { public class SdkClientDecoder extends FixedLengthFrameDecoder {
private final Logger logger = LoggerFactory.getLogger(SdkClientDecoder.class); private final Logger logger = LoggerFactory.getLogger(SdkClientDecoder.class);
@Override public SdkClientDecoder(int frameLength) {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { super(frameLength);
Try.of(() -> { }
byte[] bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
out.add(NettyUtil.toObject(bytes)); @Override
}).trap(e -> logger.error("decode error", e)).run(); protected Object decode(ChannelHandlerContext ctx, ByteBuf in) {
return Try.of(() -> {
ByteBuf decode = (ByteBuf) super.decode(ctx, in);
byte[] bytes = new byte[decode.readableBytes()];
decode.readBytes(bytes);
decode.release();
return NettyUtil.toObject(bytes);
}).trap(e -> logger.error("decode error", e)).get();
} }
@Override @Override

View File

@@ -44,7 +44,6 @@ public class SdkClientEncoder extends MessageToByteEncoder<SdkProto> {
@Override @Override
protected void encode(ChannelHandlerContext ctx, SdkProto sdkProto, ByteBuf out) { protected void encode(ChannelHandlerContext ctx, SdkProto sdkProto, ByteBuf out) {
System.out.println("-------------");
Try.of(() -> { Try.of(() -> {
out.writeBytes(NettyUtil.toBytes(sdkProto)); out.writeBytes(NettyUtil.toBytes(sdkProto));
}).trap(e -> logger.error("encode error", e)).run(); }).trap(e -> logger.error("encode error", e)).run();

View File

@@ -1,10 +1,13 @@
package io.github.ehlxr.did; package io.github.ehlxr.did;
import io.github.ehlxr.did.client.SdkClient; import io.github.ehlxr.did.client.SdkClient;
import io.github.ehlxr.did.common.Try;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream; import java.util.stream.IntStream;
/** /**
@@ -31,19 +34,19 @@ public class DidSdkTest {
// 测试同步请求 // 测试同步请求
IntStream.range(0, NUM).parallel().forEach(i -> System.out.println(client.invokeSync())); IntStream.range(0, NUM).parallel().forEach(i -> System.out.println(client.invokeSync()));
// System.out.println("invokeync test finish"); System.out.println("invokeync test finish");
// 测试异步请求 // 测试异步请求
// final CountDownLatch countDownLatch = new CountDownLatch(NUM); final CountDownLatch countDownLatch = new CountDownLatch(NUM);
// IntStream.range(0, NUM).forEach(i -> IntStream.range(0, NUM).forEach(i ->
// Try.of(() -> client.invokeAsync(responseFuture -> { Try.of(() -> client.invokeAsync(responseFuture -> {
// System.out.println(responseFuture.getSdkProto()); System.out.println(responseFuture.getSdkProto());
// countDownLatch.countDown(); countDownLatch.countDown();
// })).trap(Throwable::printStackTrace).run()); })).trap(Throwable::printStackTrace).run());
//
// //noinspection ResultOfMethodCallIgnored //noinspection ResultOfMethodCallIgnored
// countDownLatch.await(10, TimeUnit.SECONDS); countDownLatch.await(10, TimeUnit.SECONDS);
// System.out.println("invokeAsync test finish"); System.out.println("invokeAsync test finish");
} }
@Test @Test

View File

@@ -49,8 +49,7 @@ public class SdkServer extends BaseServer {
@Override @Override
protected void initChannel(SocketChannel ch) { protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(defLoopGroup, ch.pipeline().addLast(defLoopGroup,
new SdkServerDecoder(), new SdkServerDecoder(Constants.DECODER_FRAMELENGTH),// 如果长度不够会等待
// new SdkServerDecoder(12),
new SdkServerEncoder(), new SdkServerEncoder(),
new SdkServerHandler(snowFlake) new SdkServerHandler(snowFlake)
); );

View File

@@ -5,26 +5,31 @@ import io.github.ehlxr.did.common.Try;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.List;
/** /**
* @author ehlxr * @author ehlxr
*/ */
public class SdkServerDecoder extends ByteToMessageDecoder { public class SdkServerDecoder extends FixedLengthFrameDecoder {
private final Logger logger = LoggerFactory.getLogger(SdkServerDecoder.class); private static final Logger logger = LoggerFactory.getLogger(SdkServerDecoder.class);
SdkServerDecoder(int frameLength) {
super(frameLength);
}
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { protected Object decode(ChannelHandlerContext ctx, ByteBuf in) {
Try.of(() -> { return Try.of(() -> {
byte[] bytes = new byte[in.readableBytes()]; ByteBuf decode = (ByteBuf) super.decode(ctx, in);
in.readBytes(bytes);
out.add(NettyUtil.toObject(bytes)); byte[] bytes = new byte[decode.readableBytes()];
}).trap(e -> logger.error("decode error", e)).run(); decode.readBytes(bytes);
decode.release();
return NettyUtil.toObject(bytes);
}).trap(e -> logger.error("decode error", e)).get();
} }
@Override @Override
@@ -33,4 +38,4 @@ public class SdkServerDecoder extends ByteToMessageDecoder {
logger.error("SdkServerDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause); logger.error("SdkServerDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause);
NettyUtil.closeChannel(channel); NettyUtil.closeChannel(channel);
} }
} }

View File

@@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory;
public class SdkServerEncoder extends MessageToByteEncoder<SdkProto> { public class SdkServerEncoder extends MessageToByteEncoder<SdkProto> {
private static final Logger logger = LoggerFactory.getLogger(SdkServerEncoder.class); private static final Logger logger = LoggerFactory.getLogger(SdkServerEncoder.class);
@Override @Override
protected void encode(ChannelHandlerContext channelHandlerContext, SdkProto sdkProto, ByteBuf out) { protected void encode(ChannelHandlerContext channelHandlerContext, SdkProto sdkProto, ByteBuf out) {
Try.of(() -> { Try.of(() -> {