add protobuf serializer, spi extension
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2021-02-09 17:36:11 +08:00
parent 9d72433199
commit 5db4dbeba5
37 changed files with 1123 additions and 343 deletions

View File

@@ -15,7 +15,7 @@
<dependencies>
<dependency>
<groupId>io.github.ehlxr</groupId>
<artifactId>did-common</artifactId>
<artifactId>did-core</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>

View File

@@ -1,7 +1,11 @@
package io.github.ehlxr.did.client;
import io.github.ehlxr.did.common.*;
import io.github.ehlxr.did.netty.MyProtocolBean;
import io.github.ehlxr.did.SdkProto;
import io.github.ehlxr.did.adapter.Message;
import io.github.ehlxr.did.common.Constants;
import io.github.ehlxr.did.common.NettyUtil;
import io.github.ehlxr.did.common.Result;
import io.github.ehlxr.did.common.Try;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -83,27 +87,25 @@ public abstract class AbstractClient implements Client {
final Channel channel = channelFuture.channel();
if (channel.isOpen() && channel.isActive()) {
final SdkProto sdkProto = new SdkProto();
final int rqid = sdkProto.getRqid();
final int rqid = sdkProto.rqid();
return Try.of(() -> {
final ResponseFuture responseFuture = new ResponseFuture(timeoutMillis, null, null);
REPONSE_MAP.put(rqid, responseFuture);
logger.debug("write {} to channel", sdkProto);
byte[] bytes = NettyUtil.toBytes(sdkProto);
MyProtocolBean myProtocolBean = new MyProtocolBean((byte) 0xA, (byte) 0xC, bytes.length, bytes);
channel.writeAndFlush(myProtocolBean).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
//发送成功后立即跳出
return;
}
// 代码执行到此说明发送失败,需要释放资源
REPONSE_MAP.remove(rqid);
responseFuture.putResponse(null);
responseFuture.setCause(channelFuture.cause());
logger.error("send a request command to channel <{}> failed.", NettyUtil.parseRemoteAddr(channel));
});
channel.writeAndFlush(Message.newBuilder().type((byte) 0xA).flag((byte) 0xC).content(sdkProto).build())
.addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
//发送成功后立即跳出
return;
}
// 代码执行到此说明发送失败,需要释放资源
REPONSE_MAP.remove(rqid);
responseFuture.putResponse(null);
responseFuture.setCause(channelFuture.cause());
logger.error("send a request command to channel <{}> failed.", NettyUtil.parseRemoteAddr(channel));
});
// 阻塞等待响应
SdkProto proto = responseFuture.waitResponse(timeoutMillis);
@@ -129,31 +131,29 @@ public abstract class AbstractClient implements Client {
final Channel channel = channelFuture.channel();
if (channel.isOpen() && channel.isActive()) {
final SdkProto sdkProto = new SdkProto();
final int rqid = sdkProto.getRqid();
final int rqid = sdkProto.rqid();
if (asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)) {
final ResponseFuture responseFuture = new ResponseFuture(timeoutMillis, invokeCallback, asyncSemaphore);
REPONSE_MAP.put(rqid, responseFuture);
Try.of(() -> {
logger.debug("write {} to channel", sdkProto);
channelFuture.channel().writeAndFlush(Message.newBuilder().type((byte) 0xA).flag((byte) 0xC).content(sdkProto).build())
.addListener(channelFuture -> {
if (channelFuture.isSuccess()) {
return;
}
byte[] bytes = NettyUtil.toBytes(sdkProto);
MyProtocolBean myProtocolBean = new MyProtocolBean((byte) 0xA, (byte) 0xC, bytes.length, bytes);
channelFuture.channel().writeAndFlush(myProtocolBean).addListener(channelFuture -> {
if (channelFuture.isSuccess()) {
return;
}
// 代码执行到些说明发送失败,需要释放资源
logger.error("send a request command to channel <{}> failed.",
NettyUtil.parseRemoteAddr(channel), channelFuture.cause());
// 代码执行到些说明发送失败,需要释放资源
logger.error("send a request command to channel <{}> failed.",
NettyUtil.parseRemoteAddr(channel), channelFuture.cause());
REPONSE_MAP.remove(rqid);
responseFuture.setCause(channelFuture.cause());
responseFuture.putResponse(null);
responseFuture.executeInvokeCallback();
responseFuture.release();
});
REPONSE_MAP.remove(rqid);
responseFuture.setCause(channelFuture.cause());
responseFuture.putResponse(null);
responseFuture.executeInvokeCallback();
responseFuture.release();
});
}).trap(e -> {
responseFuture.release();
logger.error("send a request to channel <{}> Exception", NettyUtil.parseRemoteAddr(channel), e);

View File

@@ -1,8 +1,8 @@
package io.github.ehlxr.did.client;
import io.github.ehlxr.did.SdkProto;
import io.github.ehlxr.did.common.Result;
import io.github.ehlxr.did.common.SdkProto;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

View File

@@ -1,6 +1,6 @@
package io.github.ehlxr.did.client;
import io.github.ehlxr.did.common.SdkProto;
import io.github.ehlxr.did.SdkProto;
import io.github.ehlxr.did.common.Try;
import java.util.concurrent.CountDownLatch;

View File

@@ -1,10 +1,10 @@
package io.github.ehlxr.did.client;
import io.github.ehlxr.did.adapter.MessageDecoder;
import io.github.ehlxr.did.adapter.MessageEncoder;
import io.github.ehlxr.did.client.handler.SdkClientHandler;
import io.github.ehlxr.did.common.Constants;
import io.github.ehlxr.did.common.Try;
import io.github.ehlxr.did.netty.MyProtocolDecoder;
import io.github.ehlxr.did.netty.MyProtocolEncoder;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
@@ -52,11 +52,8 @@ public class SdkClient extends AbstractClient {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline()
// .addLast(new SdkClientDecoder(Constants.DECODER_FRAMELENGTH)) // 如果长度不够会等待
// .addLast(new SdkClientEncoder())
.addLast(new MyProtocolEncoder())
.addLast(new MyProtocolDecoder(Constants.MAX_FRAME_LENGTH, Constants.LENGTH_FIELD_OFFSET,
Constants.LENGTH_FIELD_LENGTH, Constants.LENGTH_ADJUSTMENT, Constants.INITIAL_BYTES_TO_STRIP, false))
.addLast(new MessageEncoder())
.addLast(new MessageDecoder())
.addLast(new SdkClientHandler());
}
});

View File

@@ -1,66 +0,0 @@
/*
* The MIT License (MIT)
*
* Copyright © 2020 xrv <xrg@live.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package io.github.ehlxr.did.client.handler;
import io.github.ehlxr.did.common.NettyUtil;
import io.github.ehlxr.did.common.Try;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author ehlxr
* @since 2021-01-20 14:42.
*/
public class SdkClientDecoder extends FixedLengthFrameDecoder {
private final Logger logger = LoggerFactory.getLogger(SdkClientDecoder.class);
public SdkClientDecoder(int frameLength) {
super(frameLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) {
return Try.of(() -> {
ByteBuf buf = (ByteBuf) super.decode(ctx, in);
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
buf.release();
return NettyUtil.toObject(bytes);
}).trap(e -> logger.error("decode error", e)).get();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Channel channel = ctx.channel();
logger.error("SdkClientDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause);
NettyUtil.closeChannel(channel);
}
}

View File

@@ -1,60 +0,0 @@
/*
* The MIT License (MIT)
*
* Copyright © 2020 xrv <xrg@live.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package io.github.ehlxr.did.client.handler;
import io.github.ehlxr.did.client.SdkClient;
import io.github.ehlxr.did.common.NettyUtil;
import io.github.ehlxr.did.common.SdkProto;
import io.github.ehlxr.did.common.Try;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author ehlxr
* @since 2021-01-20 14:43.
*/
public class SdkClientEncoder extends MessageToByteEncoder<SdkProto> {
private final Logger logger = LoggerFactory.getLogger(SdkClient.class);
@Override
protected void encode(ChannelHandlerContext ctx, SdkProto sdkProto, ByteBuf out) {
Try.of(() -> {
out.writeBytes(NettyUtil.toBytes(sdkProto));
}).trap(e -> logger.error("encode error", e)).run();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Channel channel = ctx.channel();
logger.error(String.format("SdkServerEncoder channel [%s] error and will be closed",
NettyUtil.parseRemoteAddr(channel)), cause);
NettyUtil.closeChannel(channel);
}
}

View File

@@ -24,12 +24,12 @@
package io.github.ehlxr.did.client.handler;
import io.github.ehlxr.did.SdkProto;
import io.github.ehlxr.did.adapter.Message;
import io.github.ehlxr.did.client.Client;
import io.github.ehlxr.did.client.ResponseFuture;
import io.github.ehlxr.did.common.NettyUtil;
import io.github.ehlxr.did.common.SdkProto;
import io.github.ehlxr.did.common.Try;
import io.github.ehlxr.did.netty.MyProtocolBean;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
@@ -39,17 +39,16 @@ import org.slf4j.LoggerFactory;
* @author ehlxr
* @since 2021-01-20 14:43.
*/
public class SdkClientHandler extends SimpleChannelInboundHandler<MyProtocolBean> {
public class SdkClientHandler extends SimpleChannelInboundHandler<Message<SdkProto>> {
private final Logger logger = LoggerFactory.getLogger(SdkClientHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyProtocolBean protocolBean) {
logger.debug("sdk client handler receive protocolBean {}", protocolBean);
protected void channelRead0(ChannelHandlerContext ctx, Message<SdkProto> message) {
logger.debug("sdk client handler receive message {}", message);
SdkProto sdkProto = Try.<MyProtocolBean, SdkProto>of(p ->
(SdkProto) NettyUtil.toObject(p.getContent()))
.apply(protocolBean)
.get(SdkProto.newBuilder().build());
SdkProto sdkProto = Try.<Message<SdkProto>, SdkProto>of(m -> m.content(SdkProto.class))
.apply(message)
.get();
final int rqid = sdkProto.getRqid();
final ResponseFuture responseFuture = Client.REPONSE_MAP.get(rqid);
@@ -67,7 +66,7 @@ public class SdkClientHandler extends SimpleChannelInboundHandler<MyProtocolBean
}
} else {
logger.error("receive response {}, but not matched any request, address is {}",
protocolBean, NettyUtil.parseRemoteAddr(ctx.channel()));
message, NettyUtil.parseRemoteAddr(ctx.channel()));
}
}

View File

@@ -10,7 +10,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
/**
* 异步请求压测
* 压测
*
* @author ehlxr
*/

View File

@@ -1,13 +1,10 @@
package io.github.ehlxr.did;
import io.github.ehlxr.did.client.SdkClient;
import io.github.ehlxr.did.common.Try;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
/**
@@ -37,16 +34,16 @@ public class DidSdkTest {
System.out.println("invokeync test finish");
// 测试异步请求
final CountDownLatch countDownLatch = new CountDownLatch(NUM);
IntStream.range(0, NUM).parallel().forEach(i ->
Try.of(() -> client.invokeAsync(responseFuture -> {
System.out.println(responseFuture.getSdkProto());
countDownLatch.countDown();
})).trap(Throwable::printStackTrace).run());
//noinspection ResultOfMethodCallIgnored
countDownLatch.await(10, TimeUnit.SECONDS);
System.out.println("invokeAsync test finish");
// final CountDownLatch countDownLatch = new CountDownLatch(NUM);
// IntStream.range(0, NUM).parallel().forEach(i ->
// Try.of(() -> client.invokeAsync(responseFuture -> {
// System.out.println(responseFuture.getSdkProto());
// countDownLatch.countDown();
// })).trap(Throwable::printStackTrace).run());
//
// //noinspection ResultOfMethodCallIgnored
// countDownLatch.await(10, TimeUnit.SECONDS);
// System.out.println("invokeAsync test finish");
}
@Test