From dbb95dd9992faa1bd4b849e57c0fe17a3c532db1 Mon Sep 17 00:00:00 2001 From: ehlxr Date: Mon, 8 Feb 2021 23:03:57 +0800 Subject: [PATCH] add LengthFieldBasedFrameDecoder --- .../io/github/ehlxr/did/common/Constants.java | 6 ++ .../io/github/ehlxr/did/common/NettyUtil.java | 23 +++-- .../ehlxr/did/netty/MyProtocolBean.java | 95 +++++++++++++++++++ .../ehlxr/did/netty/MyProtocolDecoder.java | 80 ++++++++++++++++ .../ehlxr/did/netty/MyProtocolEncoder.java | 48 ++++++++++ .../ehlxr/did/client/AbstractClient.java | 11 ++- .../io/github/ehlxr/did/client/SdkClient.java | 11 ++- .../did/client/handler/SdkClientDecoder.java | 8 +- .../did/client/handler/SdkClientHandler.java | 17 +++- .../java/io/github/ehlxr/did/DidSdkTest.java | 2 +- .../ehlxr/did/server/sdk/SdkServer.java | 9 +- .../did/server/sdk/SdkServerDecoder.java | 8 +- .../did/server/sdk/SdkServerHandler.java | 13 ++- 13 files changed, 292 insertions(+), 39 deletions(-) create mode 100644 did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolBean.java create mode 100644 did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolDecoder.java create mode 100644 did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolEncoder.java diff --git a/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java b/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java index 96859bf..eb8b3df 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java +++ b/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java @@ -48,4 +48,10 @@ public class Constants { * 编码解码 byte 数组固定长度 */ public static int DECODER_FRAMELENGTH = 100; + + public static final int MAX_FRAME_LENGTH = 1024 * 1024; //最大长度 + public static final int LENGTH_FIELD_LENGTH = 4; //长度字段所占的字节数 + public static final int LENGTH_FIELD_OFFSET = 2; //长度偏移 + public static final int LENGTH_ADJUSTMENT = 0; + public static final int INITIAL_BYTES_TO_STRIP = 0; } diff --git a/did-common/src/main/java/io/github/ehlxr/did/common/NettyUtil.java b/did-common/src/main/java/io/github/ehlxr/did/common/NettyUtil.java index 328cee5..86e1ba7 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/common/NettyUtil.java +++ b/did-common/src/main/java/io/github/ehlxr/did/common/NettyUtil.java @@ -47,19 +47,18 @@ public class NettyUtil { oos.writeObject(obj); oos.flush(); - byte[] bytes = bos.toByteArray(); - if (bytes.length > Constants.DECODER_FRAMELENGTH) { - logger.error("bytes length should not bigger than {}", Constants.DECODER_FRAMELENGTH); - return null; - } else if (bytes.length < Constants.DECODER_FRAMELENGTH) { - byte[] result = new byte[Constants.DECODER_FRAMELENGTH]; + // if (bytes.length > Constants.DECODER_FRAMELENGTH) { + // logger.error("bytes length should not bigger than {}", Constants.DECODER_FRAMELENGTH); + // return null; + // } else if (bytes.length < Constants.DECODER_FRAMELENGTH) { + // byte[] result = new byte[Constants.DECODER_FRAMELENGTH]; + // + // // 如果长度不足,填充 + // System.arraycopy(bytes, 0, result, 0, bytes.length); + // return result; + // } - // 如果长度不足,填充 - System.arraycopy(bytes, 0, result, 0, bytes.length); - return result; - } - - return bytes; + return bos.toByteArray(); } public static Object toObject(byte[] bts) throws IOException, ClassNotFoundException { diff --git a/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolBean.java b/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolBean.java new file mode 100644 index 0000000..6996382 --- /dev/null +++ b/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolBean.java @@ -0,0 +1,95 @@ +/* + * The MIT License (MIT) + * + * Copyright © 2020 xrv + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package io.github.ehlxr.did.netty; + +import io.github.ehlxr.did.common.NettyUtil; +import io.github.ehlxr.did.common.Try; + +/** + * @author ehlxr + * @since 2021-02-08 22:07. + */ +public class MyProtocolBean { + // 类型(系统编号 0xA 表示A系统,0xB 表示B系统) + private byte type; + + // 信息标志 0xA 表示心跳包 0xB 表示超时包 0xC 业务信息包 + private byte flag; + + // 内容长度 + private int length; + + // 内容 + private byte[] content; + + public MyProtocolBean(byte type, byte flag, int length, byte[] content) { + this.type = type; + this.flag = flag; + this.length = length; + this.content = content; + } + + public byte getType() { + return type; + } + + public void setType(byte type) { + this.type = type; + } + + public byte getFlag() { + return flag; + } + + public void setFlag(byte flag) { + this.flag = flag; + } + + public int getLength() { + return length; + } + + public void setLength(int length) { + this.length = length; + } + + public byte[] getContent() { + return content; + } + + public void setContent(byte[] content) { + this.content = content; + } + + @Override + public String toString() { + return "MyProtocolBean{" + + "type=" + type + + ", flag=" + flag + + ", length=" + length + + ", content=" + Try.of(NettyUtil::toObject).apply(content).get() + + '}'; + } +} \ No newline at end of file diff --git a/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolDecoder.java b/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolDecoder.java new file mode 100644 index 0000000..fd497e9 --- /dev/null +++ b/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolDecoder.java @@ -0,0 +1,80 @@ +/* + * The MIT License (MIT) + * + * Copyright © 2020 xrv + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package io.github.ehlxr.did.netty; + +import io.github.ehlxr.did.netty.MyProtocolBean; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +/** + * @author ehlxr + * @since 2021-02-08 22:09. + */ +public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder { + private static final int HEADER_SIZE = 6; + + /** + * @param maxFrameLength 帧的最大长度 + * @param lengthFieldOffset length字段偏移的地址 + * @param lengthFieldLength length字段所占的字节长 + * @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段 + * @param initialBytesToStrip 解析时候跳过多少个长度 + * @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异 + */ + + public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, + int lengthAdjustment, int initialBytesToStrip, boolean failFast) { + super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); + } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + //在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要body部分 + in = (ByteBuf) super.decode(ctx, in); + + if (in == null) { + return null; + } + if (in.readableBytes() < HEADER_SIZE) { + throw new Exception("字节数不足"); + } + //读取type字段 + byte type = in.readByte(); + //读取flag字段 + byte flag = in.readByte(); + //读取length字段 + int length = in.readInt(); + + if (in.readableBytes() != length) { + throw new Exception("标记的长度不符合实际长度"); + } + //读取body + byte[] bytes = new byte[in.readableBytes()]; + in.readBytes(bytes); + + return new MyProtocolBean(type, flag, length, bytes); + } +} \ No newline at end of file diff --git a/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolEncoder.java b/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolEncoder.java new file mode 100644 index 0000000..d7a1dc0 --- /dev/null +++ b/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolEncoder.java @@ -0,0 +1,48 @@ +/* + * The MIT License (MIT) + * + * Copyright © 2020 xrv + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package io.github.ehlxr.did.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +/** + * @author ehlxr + * @since 2021-02-08 22:12. + */ +public class MyProtocolEncoder extends MessageToByteEncoder { + + @Override + protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception { + if (msg == null) { + throw new Exception("msg is null"); + } + out.writeByte(msg.getType()); + out.writeByte(msg.getFlag()); + out.writeInt(msg.getLength()); + out.writeBytes(msg.getContent()); + } +} + diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/AbstractClient.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/AbstractClient.java index 0cbe32e..6a2a528 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/AbstractClient.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/AbstractClient.java @@ -1,6 +1,7 @@ package io.github.ehlxr.did.client; import io.github.ehlxr.did.common.*; +import io.github.ehlxr.did.netty.MyProtocolBean; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -89,7 +90,10 @@ public abstract class AbstractClient implements Client { REPONSE_MAP.put(rqid, responseFuture); logger.debug("write {} to channel", sdkProto); - channel.writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> { + + byte[] bytes = NettyUtil.toBytes(sdkProto); + MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, bytes.length, bytes); + channel.writeAndFlush(myProtocolBean).addListener((ChannelFutureListener) channelFuture -> { if (channelFuture.isSuccess()) { //发送成功后立即跳出 return; @@ -132,7 +136,10 @@ public abstract class AbstractClient implements Client { Try.of(() -> { logger.debug("write {} to channel", sdkProto); - channelFuture.channel().writeAndFlush(sdkProto).addListener(channelFuture -> { + + byte[] bytes = NettyUtil.toBytes(sdkProto); + MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, bytes.length, bytes); + channelFuture.channel().writeAndFlush(myProtocolBean).addListener(channelFuture -> { if (channelFuture.isSuccess()) { return; } diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java index cfd2bbb..1c6cd3c 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java @@ -1,7 +1,7 @@ package io.github.ehlxr.did.client; -import io.github.ehlxr.did.client.handler.SdkClientDecoder; -import io.github.ehlxr.did.client.handler.SdkClientEncoder; +import io.github.ehlxr.did.netty.MyProtocolDecoder; +import io.github.ehlxr.did.netty.MyProtocolEncoder; import io.github.ehlxr.did.client.handler.SdkClientHandler; import io.github.ehlxr.did.common.Constants; import io.github.ehlxr.did.common.Try; @@ -52,8 +52,11 @@ public class SdkClient extends AbstractClient { @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline() - .addLast(new SdkClientDecoder(Constants.DECODER_FRAMELENGTH)) // 如果长度不够会等待 - .addLast(new SdkClientEncoder()) + // .addLast(new SdkClientDecoder(Constants.DECODER_FRAMELENGTH)) // 如果长度不够会等待 + // .addLast(new SdkClientEncoder()) + .addLast(new MyProtocolEncoder()) + .addLast(new MyProtocolDecoder(Constants.MAX_FRAME_LENGTH, Constants.LENGTH_FIELD_OFFSET, + Constants.LENGTH_FIELD_LENGTH, Constants.LENGTH_ADJUSTMENT, Constants.INITIAL_BYTES_TO_STRIP, false)) .addLast(new SdkClientHandler()); } }); diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientDecoder.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientDecoder.java index b0c6a72..450feb1 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientDecoder.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientDecoder.java @@ -47,12 +47,12 @@ public class SdkClientDecoder extends FixedLengthFrameDecoder { @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) { return Try.of(() -> { - ByteBuf decode = (ByteBuf) super.decode(ctx, in); + ByteBuf buf = (ByteBuf) super.decode(ctx, in); - byte[] bytes = new byte[decode.readableBytes()]; - decode.readBytes(bytes); + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); - decode.release(); + buf.release(); return NettyUtil.toObject(bytes); }).trap(e -> logger.error("decode error", e)).get(); } diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientHandler.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientHandler.java index 5d51cc5..d136024 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientHandler.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientHandler.java @@ -28,6 +28,8 @@ import io.github.ehlxr.did.client.Client; import io.github.ehlxr.did.client.ResponseFuture; import io.github.ehlxr.did.common.NettyUtil; import io.github.ehlxr.did.common.SdkProto; +import io.github.ehlxr.did.common.Try; +import io.github.ehlxr.did.netty.MyProtocolBean; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.slf4j.Logger; @@ -37,12 +39,17 @@ import org.slf4j.LoggerFactory; * @author ehlxr * @since 2021-01-20 14:43. */ -public class SdkClientHandler extends SimpleChannelInboundHandler { +public class SdkClientHandler extends SimpleChannelInboundHandler { private final Logger logger = LoggerFactory.getLogger(SdkClientHandler.class); @Override - protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) { - logger.debug("sdk client handler receive sdkProto {}", sdkProto); + protected void channelRead0(ChannelHandlerContext ctx, MyProtocolBean protocolBean) { + logger.debug("sdk client handler receive protocolBean {}", protocolBean); + + SdkProto sdkProto = Try.of(p -> + (SdkProto) NettyUtil.toObject(p.getContent())) + .apply(protocolBean) + .get(SdkProto.newBuilder().build()); final int rqid = sdkProto.getRqid(); final ResponseFuture responseFuture = Client.REPONSE_MAP.get(rqid); @@ -59,8 +66,8 @@ public class SdkClientHandler extends SimpleChannelInboundHandler { responseFuture.putResponse(sdkProto); } } else { - logger.warn("receive response {}, but not matched any request, address is {}", - sdkProto, NettyUtil.parseRemoteAddr(ctx.channel())); + logger.error("receive response {}, but not matched any request, address is {}", + protocolBean, NettyUtil.parseRemoteAddr(ctx.channel())); } } diff --git a/did-sdk/src/test/java/io/github/ehlxr/did/DidSdkTest.java b/did-sdk/src/test/java/io/github/ehlxr/did/DidSdkTest.java index 9205e27..c94d8a0 100644 --- a/did-sdk/src/test/java/io/github/ehlxr/did/DidSdkTest.java +++ b/did-sdk/src/test/java/io/github/ehlxr/did/DidSdkTest.java @@ -38,7 +38,7 @@ public class DidSdkTest { // 测试异步请求 final CountDownLatch countDownLatch = new CountDownLatch(NUM); - IntStream.range(0, NUM).forEach(i -> + IntStream.range(0, NUM).parallel().forEach(i -> Try.of(() -> client.invokeAsync(responseFuture -> { System.out.println(responseFuture.getSdkProto()); countDownLatch.countDown(); diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServer.java b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServer.java index 986349d..03461d5 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServer.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServer.java @@ -3,6 +3,8 @@ package io.github.ehlxr.did.server.sdk; import io.github.ehlxr.did.common.Constants; import io.github.ehlxr.did.common.Try; import io.github.ehlxr.did.core.SnowFlake; +import io.github.ehlxr.did.netty.MyProtocolDecoder; +import io.github.ehlxr.did.netty.MyProtocolEncoder; import io.github.ehlxr.did.server.BaseServer; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; @@ -49,8 +51,11 @@ public class SdkServer extends BaseServer { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(defLoopGroup, - new SdkServerDecoder(Constants.DECODER_FRAMELENGTH),// 如果长度不够会等待 - new SdkServerEncoder(), + // new SdkServerDecoder(Constants.DECODER_FRAMELENGTH),// 如果长度不够会等待 + // new SdkServerEncoder(), + new MyProtocolEncoder(), + new MyProtocolDecoder(Constants.MAX_FRAME_LENGTH, Constants.LENGTH_FIELD_OFFSET, + Constants.LENGTH_FIELD_LENGTH, Constants.LENGTH_ADJUSTMENT, Constants.INITIAL_BYTES_TO_STRIP, false), new SdkServerHandler(snowFlake) ); } diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java index 19b8cff..684a98a 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java @@ -22,12 +22,12 @@ public class SdkServerDecoder extends FixedLengthFrameDecoder { @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) { return Try.of(() -> { - ByteBuf decode = (ByteBuf) super.decode(ctx, in); + ByteBuf buf = (ByteBuf) super.decode(ctx, in); - byte[] bytes = new byte[decode.readableBytes()]; - decode.readBytes(bytes); + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); - decode.release(); + buf.release(); return NettyUtil.toObject(bytes); }).trap(e -> logger.error("decode error", e)).get(); } diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerHandler.java b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerHandler.java index 3a630c0..645f839 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerHandler.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerHandler.java @@ -1,6 +1,7 @@ package io.github.ehlxr.did.server.sdk; import io.github.ehlxr.did.common.Constants; +import io.github.ehlxr.did.netty.MyProtocolBean; import io.github.ehlxr.did.common.NettyUtil; import io.github.ehlxr.did.common.SdkProto; import io.github.ehlxr.did.core.SnowFlake; @@ -19,7 +20,7 @@ import java.util.concurrent.TimeUnit; * * @author ehlxr */ -public class SdkServerHandler extends SimpleChannelInboundHandler { +public class SdkServerHandler extends SimpleChannelInboundHandler { private static final Logger logger = LoggerFactory.getLogger(SdkServerHandler.class); /** * 通过信号量来控制流量 @@ -32,21 +33,23 @@ public class SdkServerHandler extends SimpleChannelInboundHandler { } @Override - protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) throws Exception { - logger.debug("sdk server handler receive sdkProto {}", sdkProto); + protected void channelRead0(ChannelHandlerContext ctx, MyProtocolBean protocolBean) throws Exception { + logger.debug("sdk server handler receive MyProtocolBean {}", protocolBean); if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) { + SdkProto sdkProto = (SdkProto) NettyUtil.toObject(protocolBean.getContent()); sdkProto.setDid(snowFlake.nextId()); + protocolBean.setContent(NettyUtil.toBytes(sdkProto)); semaphore.release(); } else { logger.error("tryAcquire timeout after {}ms, {} threads waiting to acquire, {} permits available in this semaphore", Constants.ACQUIRE_TIMEOUTMILLIS, this.semaphore.getQueueLength(), this.semaphore.availablePermits()); } - logger.debug("sdk server handler write sdkProto {} to channel", sdkProto); + logger.debug("sdk server handler write protocolBean {} to channel", protocolBean); ctx.channel(). - writeAndFlush(sdkProto). + writeAndFlush(protocolBean). addListener(ChannelFutureListener.CLOSE_ON_FAILURE); }