add LengthFieldBasedFrameDecoder
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2021-02-08 23:03:57 +08:00
parent d395b987e6
commit dbb95dd999
13 changed files with 292 additions and 39 deletions

View File

@@ -3,6 +3,8 @@ package io.github.ehlxr.did.server.sdk;
import io.github.ehlxr.did.common.Constants;
import io.github.ehlxr.did.common.Try;
import io.github.ehlxr.did.core.SnowFlake;
import io.github.ehlxr.did.netty.MyProtocolDecoder;
import io.github.ehlxr.did.netty.MyProtocolEncoder;
import io.github.ehlxr.did.server.BaseServer;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
@@ -49,8 +51,11 @@ public class SdkServer extends BaseServer {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(defLoopGroup,
new SdkServerDecoder(Constants.DECODER_FRAMELENGTH),// 如果长度不够会等待
new SdkServerEncoder(),
// new SdkServerDecoder(Constants.DECODER_FRAMELENGTH),// 如果长度不够会等待
// new SdkServerEncoder(),
new MyProtocolEncoder(),
new MyProtocolDecoder(Constants.MAX_FRAME_LENGTH, Constants.LENGTH_FIELD_OFFSET,
Constants.LENGTH_FIELD_LENGTH, Constants.LENGTH_ADJUSTMENT, Constants.INITIAL_BYTES_TO_STRIP, false),
new SdkServerHandler(snowFlake)
);
}

View File

@@ -22,12 +22,12 @@ public class SdkServerDecoder extends FixedLengthFrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) {
return Try.of(() -> {
ByteBuf decode = (ByteBuf) super.decode(ctx, in);
ByteBuf buf = (ByteBuf) super.decode(ctx, in);
byte[] bytes = new byte[decode.readableBytes()];
decode.readBytes(bytes);
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
decode.release();
buf.release();
return NettyUtil.toObject(bytes);
}).trap(e -> logger.error("decode error", e)).get();
}

View File

@@ -1,6 +1,7 @@
package io.github.ehlxr.did.server.sdk;
import io.github.ehlxr.did.common.Constants;
import io.github.ehlxr.did.netty.MyProtocolBean;
import io.github.ehlxr.did.common.NettyUtil;
import io.github.ehlxr.did.common.SdkProto;
import io.github.ehlxr.did.core.SnowFlake;
@@ -19,7 +20,7 @@ import java.util.concurrent.TimeUnit;
*
* @author ehlxr
*/
public class SdkServerHandler extends SimpleChannelInboundHandler<SdkProto> {
public class SdkServerHandler extends SimpleChannelInboundHandler<MyProtocolBean> {
private static final Logger logger = LoggerFactory.getLogger(SdkServerHandler.class);
/**
* 通过信号量来控制流量
@@ -32,21 +33,23 @@ public class SdkServerHandler extends SimpleChannelInboundHandler<SdkProto> {
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) throws Exception {
logger.debug("sdk server handler receive sdkProto {}", sdkProto);
protected void channelRead0(ChannelHandlerContext ctx, MyProtocolBean protocolBean) throws Exception {
logger.debug("sdk server handler receive MyProtocolBean {}", protocolBean);
if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) {
SdkProto sdkProto = (SdkProto) NettyUtil.toObject(protocolBean.getContent());
sdkProto.setDid(snowFlake.nextId());
protocolBean.setContent(NettyUtil.toBytes(sdkProto));
semaphore.release();
} else {
logger.error("tryAcquire timeout after {}ms, {} threads waiting to acquire, {} permits available in this semaphore",
Constants.ACQUIRE_TIMEOUTMILLIS, this.semaphore.getQueueLength(), this.semaphore.availablePermits());
}
logger.debug("sdk server handler write sdkProto {} to channel", sdkProto);
logger.debug("sdk server handler write protocolBean {} to channel", protocolBean);
ctx.channel().
writeAndFlush(sdkProto).
writeAndFlush(protocolBean).
addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}