diff --git a/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java b/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java index 3844ebf..74c6d1d 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java +++ b/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java @@ -45,6 +45,7 @@ public class Constants { * 编码解码 byte 数组固定长度 */ public static int DECODER_FRAMELENGTH = 100; + private Constants() { } diff --git a/did-common/src/main/java/io/github/ehlxr/did/common/NettyUtil.java b/did-common/src/main/java/io/github/ehlxr/did/common/NettyUtil.java index 86e1ba7..284e141 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/common/NettyUtil.java +++ b/did-common/src/main/java/io/github/ehlxr/did/common/NettyUtil.java @@ -5,7 +5,6 @@ import io.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; import java.net.SocketAddress; /** @@ -40,31 +39,4 @@ public class NettyUtil { logger.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote, future.isSuccess())); } - - public static byte[] toBytes(Object obj) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(bos); - oos.writeObject(obj); - oos.flush(); - - // if (bytes.length > Constants.DECODER_FRAMELENGTH) { - // logger.error("bytes length should not bigger than {}", Constants.DECODER_FRAMELENGTH); - // return null; - // } else if (bytes.length < Constants.DECODER_FRAMELENGTH) { - // byte[] result = new byte[Constants.DECODER_FRAMELENGTH]; - // - // // 如果长度不足,填充 - // System.arraycopy(bytes, 0, result, 0, bytes.length); - // return result; - // } - - return bos.toByteArray(); - } - - public static Object toObject(byte[] bts) throws IOException, ClassNotFoundException { - ByteArrayInputStream bis = new ByteArrayInputStream(bts); - ObjectInputStream ois = new ObjectInputStream(bis); - - return ois.readObject(); - } } diff --git a/did-core/pom.xml b/did-core/pom.xml new file mode 100644 index 0000000..eabaa92 --- /dev/null +++ b/did-core/pom.xml @@ -0,0 +1,43 @@ + + + + did-parent + io.github.ehlxr + 1.0.2-SNAPSHOT + + 4.0.0 + + did-core + + + + io.github.ehlxr + did-common + + + + io.protostuff + protostuff-core + + + io.protostuff + protostuff-runtime + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + + com.github.github + site-maven-plugin + + + + \ No newline at end of file diff --git a/did-common/src/main/java/io/github/ehlxr/did/common/SdkProto.java b/did-core/src/main/java/io/github/ehlxr/did/SdkProto.java similarity index 74% rename from did-common/src/main/java/io/github/ehlxr/did/common/SdkProto.java rename to did-core/src/main/java/io/github/ehlxr/did/SdkProto.java index 1dc9e88..0120af1 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/common/SdkProto.java +++ b/did-core/src/main/java/io/github/ehlxr/did/SdkProto.java @@ -1,4 +1,6 @@ -package io.github.ehlxr.did.common; +package io.github.ehlxr.did; + +import io.github.ehlxr.did.common.JsonUtils; import java.io.Serializable; import java.util.concurrent.atomic.AtomicInteger; @@ -20,13 +22,6 @@ public class SdkProto implements Serializable { private long did; public SdkProto() { - rqid = REQUEST_ID.incrementAndGet(); - did = 0; - } - - public SdkProto(int rqid, long did) { - this.rqid = rqid; - this.did = did; } public static SdkProtoBuilder newBuilder() { @@ -37,10 +32,9 @@ public class SdkProto implements Serializable { return rqid; } - public void setRqid(int rqid) { - if (rqid > 0) { - this.rqid = rqid; - } + public int rqid() { + rqid = REQUEST_ID.incrementAndGet(); + return rqid; } public long getDid() { @@ -57,17 +51,11 @@ public class SdkProto implements Serializable { } public static final class SdkProtoBuilder { - private int rqid; private long did; private SdkProtoBuilder() { } - public SdkProtoBuilder rqid(int rqid) { - this.rqid = rqid; - return this; - } - public SdkProtoBuilder did(long did) { this.did = did; return this; @@ -75,7 +63,6 @@ public class SdkProto implements Serializable { public SdkProto build() { SdkProto sdkProto = new SdkProto(); - sdkProto.setRqid(rqid); sdkProto.setDid(did); return sdkProto; } diff --git a/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolBean.java b/did-core/src/main/java/io/github/ehlxr/did/adapter/Message.java similarity index 51% rename from did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolBean.java rename to did-core/src/main/java/io/github/ehlxr/did/adapter/Message.java index 6996382..b9f1827 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolBean.java +++ b/did-core/src/main/java/io/github/ehlxr/did/adapter/Message.java @@ -22,35 +22,53 @@ * THE SOFTWARE. */ -package io.github.ehlxr.did.netty; +package io.github.ehlxr.did.adapter; -import io.github.ehlxr.did.common.NettyUtil; import io.github.ehlxr.did.common.Try; +import io.github.ehlxr.did.serializer.SerializerHolder; + +import java.io.Serializable; /** * @author ehlxr * @since 2021-02-08 22:07. */ -public class MyProtocolBean { - // 类型(系统编号 0xA 表示A系统,0xB 表示B系统) +public class Message implements Serializable { + private static final long serialVersionUID = 2419320297359425651L; + + /** + * 类型(例如:0xA 表示 A 系统;0xB 表示 B 系统) + */ private byte type; - // 信息标志 0xA 表示心跳包 0xB 表示超时包 0xC 业务信息包 + /** + * 信息标志(例如:0xA 表示心跳包;0xB 表示超时包;0xC 业务信息包) + */ private byte flag; - // 内容长度 + /** + * 内容长度(通过 content 计算而来) + */ private int length; - // 内容 - private byte[] content; + /** + * 消息内容 + */ + private T content; - public MyProtocolBean(byte type, byte flag, int length, byte[] content) { + public Message(byte type, byte flag, T content) { this.type = type; this.flag = flag; - this.length = length; this.content = content; } + public Message() { + } + + public static MessageBuilder newBuilder() { + return new MessageBuilder<>(); + } + public byte getType() { return type; } @@ -68,28 +86,60 @@ public class MyProtocolBean { } public int getLength() { - return length; - } - - public void setLength(int length) { - this.length = length; + return getContent().length; } public byte[] getContent() { - return content; + return Try.of(SerializerHolder.get()::serializer).apply(content).get(); } - public void setContent(byte[] content) { + public void setContent(T content) { this.content = content; } + public T content(Class clazz) { + return SerializerHolder.get().deserializer(getContent(), clazz); + } + @Override public String toString() { - return "MyProtocolBean{" + + return "Message{" + "type=" + type + ", flag=" + flag + ", length=" + length + - ", content=" + Try.of(NettyUtil::toObject).apply(content).get() + + ", content=" + content + '}'; } + + public static final class MessageBuilder { + private byte type; + private byte flag; + private T content; + + private MessageBuilder() { + } + + public MessageBuilder type(byte type) { + this.type = type; + return this; + } + + public MessageBuilder flag(byte flag) { + this.flag = flag; + return this; + } + + public MessageBuilder content(T content) { + this.content = content; + return this; + } + + public Message build() { + Message message = new Message<>(); + message.setType(type); + message.setFlag(flag); + message.setContent(content); + return message; + } + } } \ No newline at end of file diff --git a/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolDecoder.java b/did-core/src/main/java/io/github/ehlxr/did/adapter/MessageDecoder.java similarity index 52% rename from did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolDecoder.java rename to did-core/src/main/java/io/github/ehlxr/did/adapter/MessageDecoder.java index ef17dc2..0f16b5c 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolDecoder.java +++ b/did-core/src/main/java/io/github/ehlxr/did/adapter/MessageDecoder.java @@ -22,58 +22,83 @@ * THE SOFTWARE. */ -package io.github.ehlxr.did.netty; +package io.github.ehlxr.did.adapter; +import io.github.ehlxr.did.SdkProto; +import io.github.ehlxr.did.common.Constants; +import io.github.ehlxr.did.common.NettyUtil; +import io.github.ehlxr.did.serializer.SerializerHolder; import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author ehlxr * @since 2021-02-08 22:09. */ -public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder { +public class MessageDecoder extends LengthFieldBasedFrameDecoder { private static final int HEADER_SIZE = 6; + private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class); /** * @param maxFrameLength 帧的最大长度 - * @param lengthFieldOffset length字段偏移的地址 - * @param lengthFieldLength length字段所占的字节长 + * @param lengthFieldOffset length 字段偏移的地址 + * @param lengthFieldLength length 字段所占的字节长 * @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段 * @param initialBytesToStrip 解析时候跳过多少个长度 - * @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异 + * @param failFast true,当 frame 长度超过 maxFrameLength 时立即报 TooLongFrameException 异常, + * false,读取完整个帧再报异 */ - public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, - int lengthAdjustment, int initialBytesToStrip, boolean failFast) { + public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, + int lengthAdjustment, int initialBytesToStrip, boolean failFast) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); } + public MessageDecoder() { + super(Constants.MAX_FRAME_LENGTH, Constants.LENGTH_FIELD_OFFSET, Constants.LENGTH_FIELD_LENGTH, + Constants.LENGTH_ADJUSTMENT, Constants.INITIAL_BYTES_TO_STRIP, false); + } + @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - //在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要body部分 + // 在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要 body 部分 in = (ByteBuf) super.decode(ctx, in); if (in == null) { return null; } if (in.readableBytes() < HEADER_SIZE) { - throw new Exception("字节数不足"); + throw new IllegalArgumentException("decode failed 'cause of insufficient readable bytes"); } - //读取type字段 + // 读取 type 字段 byte type = in.readByte(); - //读取flag字段 + // 读取 flag 字段 byte flag = in.readByte(); - //读取length字段 + // 读取 length 字段 int length = in.readInt(); if (in.readableBytes() != length) { - throw new Exception("标记的长度不符合实际长度"); + throw new IllegalArgumentException("the length of the readable bytes does not match the actual length"); } - //读取body + // 读取 body byte[] bytes = new byte[in.readableBytes()]; in.readBytes(bytes); - return new MyProtocolBean(type, flag, length, bytes); + return Message.newBuilder() + .type(type) + .flag(flag) + .content(SerializerHolder.get().deserializer(bytes, SdkProto.class)) + .build(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + Channel channel = ctx.channel(); + logger.error("channel {} will be closed, 'cause of ", NettyUtil.parseRemoteAddr(channel), cause); + NettyUtil.closeChannel(channel); } } \ No newline at end of file diff --git a/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolEncoder.java b/did-core/src/main/java/io/github/ehlxr/did/adapter/MessageEncoder.java similarity index 80% rename from did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolEncoder.java rename to did-core/src/main/java/io/github/ehlxr/did/adapter/MessageEncoder.java index d7a1dc0..ab557ec 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/netty/MyProtocolEncoder.java +++ b/did-core/src/main/java/io/github/ehlxr/did/adapter/MessageEncoder.java @@ -22,23 +22,25 @@ * THE SOFTWARE. */ -package io.github.ehlxr.did.netty; +package io.github.ehlxr.did.adapter; +import io.github.ehlxr.did.SdkProto; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; +import java.util.Objects; + /** * @author ehlxr * @since 2021-02-08 22:12. */ -public class MyProtocolEncoder extends MessageToByteEncoder { +public class MessageEncoder extends MessageToByteEncoder> { @Override - protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception { - if (msg == null) { - throw new Exception("msg is null"); - } + protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) { + Objects.requireNonNull(msg, "encode failed 'cause of recive message is null"); + out.writeByte(msg.getType()); out.writeByte(msg.getFlag()); out.writeInt(msg.getLength()); diff --git a/did-core/src/main/java/io/github/ehlxr/did/extension/Activate.java b/did-core/src/main/java/io/github/ehlxr/did/extension/Activate.java new file mode 100644 index 0000000..ed6efff --- /dev/null +++ b/did-core/src/main/java/io/github/ehlxr/did/extension/Activate.java @@ -0,0 +1,80 @@ +/* + * The MIT License (MIT) + * + * Copyright © 2020 xrv + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package io.github.ehlxr.did.extension; + +import java.lang.annotation.*; + +/** + * Activate + *

+ * 对于可以被框架中自动激活加载扩展,此Annotation用于配置扩展被自动激活加载条件。 + * 比如,过滤扩展,有多个实现,使用Activate Annotation的扩展可以根据条件被自动加载。 + *

+ *

+ * 底层框架SPI提供者通过{@link ExtensionLoader}的{@link ExtensionLoader#getActivateExtensions}方法 + * 获得条件的扩展。 + * + * @see SPI + * @see ExtensionLoader + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +public @interface Activate { + /** + * Group过滤条件。 + *
+ * 包含{@link ExtensionLoader#getActivateExtensions}的group参数给的值,则返回扩展。 + *
+ * 如没有Group设置,则不过滤。 + */ + String[] group() default {}; + + /** + * Key过滤条件。包含{@link ExtensionLoader#getActivateExtensions}的URL的参数Key中有,则返回扩展。 + *

+ * 示例:
+ * 注解的值 @Activate("cache,validatioin"), + * 则{@link ExtensionLoader#getActivateExtensions}的URL的参数有cacheKey,或是validatioin则返回扩展。 + *
+ * 如没有设置,则不过滤。 + */ + String[] value() default {}; + + /** + * 排序信息,可以不提供。 + */ + String[] before() default {}; + + /** + * 排序信息,可以不提供。 + */ + String[] after() default {}; + + /** + * 排序信息,可以不提供。 + */ + int order() default 0; +} \ No newline at end of file diff --git a/did-core/src/main/java/io/github/ehlxr/did/extension/ActivateComparator.java b/did-core/src/main/java/io/github/ehlxr/did/extension/ActivateComparator.java new file mode 100644 index 0000000..6d86f41 --- /dev/null +++ b/did-core/src/main/java/io/github/ehlxr/did/extension/ActivateComparator.java @@ -0,0 +1,90 @@ +/* + * The MIT License (MIT) + * + * Copyright © 2020 xrv + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package io.github.ehlxr.did.extension; + +import java.util.Comparator; + +/** + * Order Comparetor + * + * @author ehlxr + */ +public class ActivateComparator implements Comparator { + public static final Comparator COMPARATOR = new ActivateComparator(); + + @Override + public int compare(Object o1, Object o2) { + if (o1 == null && o2 == null) { + return 0; + } + if (o1 == null) { + return -1; + } + if (o2 == null) { + return 1; + } + if (o1.equals(o2)) { + return 0; + } + Activate a1 = o1.getClass().getAnnotation(Activate.class); + Activate a2 = o2.getClass().getAnnotation(Activate.class); + if ((a1.before().length > 0 || a1.after().length > 0 + || a2.before().length > 0 || a2.after().length > 0) + && o1.getClass().getInterfaces().length > 0 + && o1.getClass().getInterfaces()[0].isAnnotationPresent(SPI.class)) { + ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(o1.getClass().getInterfaces()[0]); + if (a1.before().length > 0 || a1.after().length > 0) { + String n2 = extensionLoader.getExtensionName(o2.getClass()); + for (String before : a1.before()) { + if (before.equals(n2)) { + return -1; + } + } + for (String after : a1.after()) { + if (after.equals(n2)) { + return 1; + } + } + } + if (a2.before().length > 0 || a2.after().length > 0) { + String n1 = extensionLoader.getExtensionName(o1.getClass()); + for (String before : a2.before()) { + if (before.equals(n1)) { + return 1; + } + } + for (String after : a2.after()) { + if (after.equals(n1)) { + return -1; + } + } + } + } + int n1 = a1.order(); + int n2 = a2 == null ? 0 : a2.order(); + return n1 > n2 ? 1 : -1; + } + +} diff --git a/did-core/src/main/java/io/github/ehlxr/did/extension/Adaptive.java b/did-core/src/main/java/io/github/ehlxr/did/extension/Adaptive.java new file mode 100644 index 0000000..76a6d21 --- /dev/null +++ b/did-core/src/main/java/io/github/ehlxr/did/extension/Adaptive.java @@ -0,0 +1,40 @@ +/* + * The MIT License (MIT) + * + * Copyright © 2020 xrv + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package io.github.ehlxr.did.extension; + +import java.lang.annotation.*; + +/** + * 在{@link ExtensionLoader}生成Extension的Adaptive Instance时,为{@link ExtensionLoader}提供信息。 + * 使用上类似于工厂方法 + * + * @see ExtensionLoader + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +public @interface Adaptive { + String[] value() default {}; +} \ No newline at end of file diff --git a/did-core/src/main/java/io/github/ehlxr/did/extension/ExtensionLoader.java b/did-core/src/main/java/io/github/ehlxr/did/extension/ExtensionLoader.java new file mode 100644 index 0000000..7da5478 --- /dev/null +++ b/did-core/src/main/java/io/github/ehlxr/did/extension/ExtensionLoader.java @@ -0,0 +1,441 @@ +/* + * The MIT License (MIT) + * + * Copyright © 2020 xrv + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package io.github.ehlxr.did.extension; + +import io.netty.util.internal.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.reflect.Constructor; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Pattern; + +/** + * 加载和管理扩展。 + *

+ *

    + *
  • 管理的扩展实例是单例。 + *
  • Wrapper实例每次获得扩展实例重新创建,并Wrap到扩展实例上。 + *
+ */ +public class ExtensionLoader { + private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class); + + private static final String DUBBO_DIRECTORY = "META-INF/extensions/"; + private static final String DUBBO_INTERNAL_DIRECTORY = "META-INF/extensions/internal/"; + + private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*,+\\s*"); + + private static final ConcurrentMap, ExtensionLoader> EXTENSION_LOADERS = new ConcurrentHashMap<>(); + private static final ConcurrentMap, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>(); + + private final Class type; + private final ConcurrentMap> cachedInstances = new ConcurrentHashMap<>(); + private final ConcurrentMap, String> cachedNames = new ConcurrentHashMap<>(); + private final Holder>> cachedClasses = new Holder<>(); + private final Map, List> cachedActivates = new ConcurrentHashMap<>(); + private final Map exceptions = new ConcurrentHashMap<>(); + private String defaultExtension; + private Set> cachedWrapperClasses; + private volatile Class adaptiveClass = null; + + private ExtensionLoader(Class type) { + this.type = type; + } + + /** + * {@link ExtensionLoader}的工厂方法。 + * + * @param type 扩展点接口类型 + * @param 扩展点类型 + * @return {@link ExtensionLoader}实例 + * @throws IllegalArgumentException 参数为null; + * 或是扩展点接口上没有{@link SPI}注解。 + * @since 0.1.0 + */ + @SuppressWarnings("unchecked") + public static ExtensionLoader getExtensionLoader(Class type) { + if (type == null) { + throw new IllegalArgumentException("SPI type == null"); + } + if (!type.isInterface()) { + throw new IllegalArgumentException("SPI type(" + type.getName() + ") is not interface!"); + } + if (!withExtensionAnnotation(type)) { + throw new IllegalArgumentException("type(" + type.getName() + + ") is not a extension, because WITHOUT @SPI Annotation!"); + } + + ExtensionLoader loader = (ExtensionLoader) EXTENSION_LOADERS.get(type); + if (loader == null) { + EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<>(type)); + loader = (ExtensionLoader) EXTENSION_LOADERS.get(type); + } + return loader; + } + + private static boolean withExtensionAnnotation(Class type) { + return type.isAnnotationPresent(SPI.class); + } + + /** + * Get the String of Throwable, like the output of {@link Throwable#printStackTrace()}. + * + * @param throwable the input throwable. + */ + private static String throwable2String(Throwable throwable) { + StringWriter w = new StringWriter(1024); + try (PrintWriter p = new PrintWriter(w)) { + throwable.printStackTrace(p); + return w.toString(); + } + } + + public static T getExtension(Class clazz, String value) { + return getExtensionLoader(clazz).getExtension(value); + } + + public T getExtension(String name) { + if (StringUtil.isNullOrEmpty(name)) { + throw new IllegalArgumentException("SPI name == null"); + } + + Holder holder = cachedInstances.get(name); + if (holder == null) { + cachedInstances.putIfAbsent(name, new Holder<>()); + holder = cachedInstances.get(name); + } + + Object instance = holder.get(); + if (instance == null) { + synchronized (cachedInstances) { + instance = holder.get(); + if (instance == null) { + instance = createExtension(name); + holder.set(instance); + } + } + } + return (T) instance; + } + + /** + * 返回缺省的扩展。 + * + * @throws IllegalStateException 指定的扩展没有设置缺省扩展点 + * @since 0.1.0 + */ + public T getDefaultExtension() { + loadExtensionClasses0(); + + if (null == defaultExtension || defaultExtension.length() == 0) { + throw new IllegalStateException("No default extension on extension " + type.getName()); + } + return getExtension(defaultExtension); + } + + /** + * 获取扩展点实现的所有扩展点名。 + * + * @since 0.1.0 + */ + public Set getSupportedExtensions() { + Map> classes = getExtensionClasses(); + return Collections.unmodifiableSet(new HashSet<>(classes.keySet())); + } + + /** + * get active extension points + * + * @return extension list which are activated. + */ + public List getActivateExtensions() { + getExtensionClasses(); + + // TODO {group value} filter + + List activates = (List) cachedActivates.get(type); + activates.sort(ActivateComparator.COMPARATOR); + return activates; + } + + public String getExtensionName(Class spi) { + getExtensionClasses(); + return cachedNames.get(spi); + } + + private T createExtension(String name) { + Class clazz = getExtensionClasses().get(name); + if (clazz == null) { + throw findExtensionClassLoadException(name); + } + try { + T instance = (T) EXTENSION_INSTANCES.get(clazz); + if (instance == null) { + EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); + instance = (T) EXTENSION_INSTANCES.get(clazz); + } + + // TODO 支持组件自动注入 + + // TODO 支持wrapper类包装实现aop相似的功能 + + return instance; + } catch (Throwable t) { + throw new IllegalStateException("SPI instance(name: " + name + ", class: " + + type + ") could not be instantiated: " + t.getMessage(), t); + } + } + + /** + * Thread-safe. + */ + private Map> getExtensionClasses() { + Map> classes = cachedClasses.get(); + if (classes == null) { + synchronized (cachedClasses) { + classes = cachedClasses.get(); + if (classes == null) { + loadExtensionClasses0(); + classes = cachedClasses.get(); + } + } + } + return classes; + } + + private IllegalStateException findExtensionClassLoadException(String name) { + for (Map.Entry entry : exceptions.entrySet()) { + if (entry.getKey().toLowerCase().contains(name.toLowerCase())) { + return entry.getValue(); + } + } + + int i = 1; + StringBuilder buf = new StringBuilder("No such extension " + type.getName() + " by name " + name); + for (Map.Entry entry : exceptions.entrySet()) { + if (i == 1) { + buf.append(", possible causes: "); + } + + buf.append("\r\n("); + buf.append(i++); + buf.append(") "); + buf.append(entry.getKey()); + buf.append(":\r\n"); + buf.append(throwable2String(entry.getValue())); + } + return new IllegalStateException(buf.toString()); + } + + private void loadExtensionClasses0() { + final SPI annotation = type.getAnnotation(SPI.class); + if (annotation != null) { + String value = annotation.value(); + if ((value = value.trim()).length() > 0) { + String[] names = NAME_SEPARATOR.split(value); + if (names.length > 1) { + throw new IllegalStateException("more than 1 default extension name on extension " + + type.getName() + ": " + Arrays.toString(names)); + } + if (names.length == 1 && names[0].trim().length() > 0) { + defaultExtension = names[0].trim(); + } + } + } + + Map> extensionClasses = new HashMap<>(); + loadFile(extensionClasses, DUBBO_DIRECTORY); + loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY); + cachedClasses.set(extensionClasses); + } + + private void loadFile(Map> extensionClasses, String dir) { + String fileName = dir + type.getName(); + try { + Enumeration urls; + + ClassLoader classLoader = ExtensionLoader.class.getClassLoader(); + if (classLoader != null) { + urls = classLoader.getResources(fileName); + } else { + urls = ClassLoader.getSystemResources(fileName); + } + + if (urls != null) { + while (urls.hasMoreElements()) { + URL url = urls.nextElement(); + + try (BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), StandardCharsets.UTF_8))) { + String line; + while ((line = reader.readLine()) != null) { + // delete comments + final int ci = line.indexOf('#'); + if (ci >= 0) { + line = line.substring(0, ci); + } + line = line.trim(); + if (line.length() == 0) { + continue; + } + + try { + String name = null; + + int i = line.indexOf('='); + + if (i > 0) { + name = line.substring(0, i).trim(); + line = line.substring(i + 1).trim(); + } + + if (line.length() > 0) { + Class clazz = Class.forName(line, true, classLoader).asSubclass(type); + if (!type.isAssignableFrom(clazz)) { + throw new IllegalStateException("Error when load extension class(interface: " + + type.getName() + ", class line: " + clazz.getName() + "), class " + + clazz.getName() + "is not subtype of interface."); + } + + if (clazz.isAnnotationPresent(Adaptive.class)) { + if (adaptiveClass == null) { + adaptiveClass = clazz; + } else if (!adaptiveClass.equals(clazz)) { + throw new IllegalStateException("More than 1 adaptive class found: " + + adaptiveClass.getName() + + ", " + clazz.getName()); + } + } else { + try { + clazz.getConstructor(type); + Set> wrappers = cachedWrapperClasses; + if (wrappers == null) { + cachedWrapperClasses = new HashSet<>(); + wrappers = cachedWrapperClasses; + } + wrappers.add(clazz); + } catch (NoSuchMethodException e) { + Constructor constructor = clazz.getConstructor(); + if (name == null || name.length() == 0) { + if (clazz.getSimpleName().length() > type.getSimpleName().length() + && clazz.getSimpleName().endsWith(type.getSimpleName())) { + name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase(); + } else { + throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url); + } + } + + Activate activate = clazz.getAnnotation(Activate.class); + + if (activate != null) { + List activates = cachedActivates.get(type); + if (activates == null) { + synchronized (cachedActivates) { + activates = new ArrayList<>(); + } + } + + activates.add(constructor.newInstance()); + cachedActivates.put(type, activates); + } + + if (!cachedNames.containsKey(clazz)) { + cachedNames.put(clazz, name); + } + + Class c = extensionClasses.get(name); + if (c == null) { + extensionClasses.put(name, clazz); + } else if (c != clazz) { + throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + name + " on " + c.getName() + " and " + clazz.getName()); + } + } + } + } + } catch (Throwable t) { + IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t); + exceptions.put(line, e); + } + } // end of while read lines + } catch (Throwable t) { + logger.error("Exception when load extension class(interface: " + + type + ", class file: " + url + ") in " + url, t); + } + } + } + } catch (Throwable t) { + logger.error("Exception when load extension class(interface: " + + type + ", description file: " + fileName + ").", t); + } + } + + @Override + public String toString() { + return this.getClass().getName() + "[" + type.getName() + "]"; + } + + /** + * Holds a value of type T. + * + * @since 0.1.0 + */ + private static final class Holder { + /** + * The value contained in the holder. + */ + private volatile T value; + + /** + * Creates a new holder with a null value. + */ + Holder() { + } + + /** + * Create a new holder with the specified value. + * + * @param value The value to be stored in the holder. + */ + public Holder(T value) { + this.value = value; + } + + public T get() { + return value; + } + + public void set(T value) { + this.value = value; + } + } +} diff --git a/did-core/src/main/java/io/github/ehlxr/did/extension/SPI.java b/did-core/src/main/java/io/github/ehlxr/did/extension/SPI.java new file mode 100644 index 0000000..2c1eed5 --- /dev/null +++ b/did-core/src/main/java/io/github/ehlxr/did/extension/SPI.java @@ -0,0 +1,49 @@ +/* + * The MIT License (MIT) + * + * Copyright © 2020 xrv + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package io.github.ehlxr.did.extension; + +import java.lang.annotation.*; + +/** + * 把一个接口标识成扩展点。 + *

+ * 没有此注释的接口{@link ExtensionLoader}会拒绝接管。 + * + * @see ExtensionLoader + * @since 0.1.0 + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface SPI { + + /** + * the default extension name. + * + * @since 0.1.0 + */ + String value() default ""; + +} diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientEncoder.java b/did-core/src/main/java/io/github/ehlxr/did/serializer/JdkSerializer.java similarity index 53% rename from did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientEncoder.java rename to did-core/src/main/java/io/github/ehlxr/did/serializer/JdkSerializer.java index a15e00f..531c27b 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientEncoder.java +++ b/did-core/src/main/java/io/github/ehlxr/did/serializer/JdkSerializer.java @@ -22,39 +22,41 @@ * THE SOFTWARE. */ -package io.github.ehlxr.did.client.handler; +package io.github.ehlxr.did.serializer; -import io.github.ehlxr.did.client.SdkClient; -import io.github.ehlxr.did.common.NettyUtil; -import io.github.ehlxr.did.common.SdkProto; import io.github.ehlxr.did.common.Try; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; /** * @author ehlxr - * @since 2021-01-20 14:43. + * @since 2021-02-09 11:07. */ -public class SdkClientEncoder extends MessageToByteEncoder { - private final Logger logger = LoggerFactory.getLogger(SdkClient.class); - +public class JdkSerializer implements Serializer { @Override - protected void encode(ChannelHandlerContext ctx, SdkProto sdkProto, ByteBuf out) { - Try.of(() -> { - out.writeBytes(NettyUtil.toBytes(sdkProto)); - }).trap(e -> logger.error("encode error", e)).run(); + public byte[] serializer(T obj) { + return Try.of(o -> { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(obj); + oos.flush(); + + return bos.toByteArray(); + }).trap(Throwable::printStackTrace).apply(obj).get(); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - Channel channel = ctx.channel(); - logger.error(String.format("SdkServerEncoder channel [%s] error and will be closed", - NettyUtil.parseRemoteAddr(channel)), cause); + public T deserializer(byte[] bytes, Class clazz) { + return Try.of(bs -> { + ByteArrayInputStream bis = new ByteArrayInputStream(bs); + ObjectInputStream ois = new ObjectInputStream(bis); + + Object o = ois.readObject(); + return clazz.isInstance(o) ? clazz.cast(o) : null; + }).trap(Throwable::printStackTrace).apply(bytes).get(); - NettyUtil.closeChannel(channel); } } diff --git a/did-core/src/main/java/io/github/ehlxr/did/serializer/ProtobufSerializer.java b/did-core/src/main/java/io/github/ehlxr/did/serializer/ProtobufSerializer.java new file mode 100644 index 0000000..376546e --- /dev/null +++ b/did-core/src/main/java/io/github/ehlxr/did/serializer/ProtobufSerializer.java @@ -0,0 +1,77 @@ +/* + * The MIT License (MIT) + * + * Copyright © 2020 xrv + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package io.github.ehlxr.did.serializer; + +import io.github.ehlxr.did.common.Try; +import io.protostuff.LinkedBuffer; +import io.protostuff.ProtostuffIOUtil; +import io.protostuff.Schema; +import io.protostuff.runtime.RuntimeSchema; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author ehlxr + * @since 2021-02-09 11:07. + */ +public class ProtobufSerializer implements Serializer { + private static final Map, Schema> CACHED_SCHEMA = new ConcurrentHashMap<>(); + + @Override + public byte[] serializer(T obj) { + LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); + + return Try.of(b -> { + //noinspection unchecked + Class clazz = (Class) obj.getClass(); + //noinspection unchecked + Schema schema = (Schema) CACHED_SCHEMA.get(clazz); + if (schema == null) { + schema = RuntimeSchema.getSchema(clazz); + CACHED_SCHEMA.put(clazz, schema); + } + + return ProtostuffIOUtil.toByteArray(obj, schema, buffer); + }).apply(buffer).trap(e -> { + throw new IllegalStateException(e.getMessage(), e); + }).andFinally(b -> { + ((LinkedBuffer) b).clear(); + }).get(); + } + + @Override + public T deserializer(byte[] bytes, Class clazz) { + return Try.of(bs -> { + Schema schema = RuntimeSchema.getSchema(clazz); + T message = schema.newMessage(); + + ProtostuffIOUtil.mergeFrom(bs, message, schema); + return message; + }).trap(e -> { + throw new IllegalStateException(e.getMessage(), e); + }).apply(bytes).get(); + } +} diff --git a/did-core/src/main/java/io/github/ehlxr/did/serializer/Serializer.java b/did-core/src/main/java/io/github/ehlxr/did/serializer/Serializer.java new file mode 100644 index 0000000..08100ac --- /dev/null +++ b/did-core/src/main/java/io/github/ehlxr/did/serializer/Serializer.java @@ -0,0 +1,27 @@ +package io.github.ehlxr.did.serializer; + +import io.github.ehlxr.did.extension.SPI; + +/** + * @author ehlxr + * @since 2021-02-08 22:12. + */ +@SPI("jdk") +public interface Serializer { + /** + * 将 obj 序列化成 byte 数组 + * + * @param obj 要序列化对象 + * @return byte 数组 + */ + byte[] serializer(T obj); + + /** + * 将 byte 数组反序列化成 class 是 clazz 的 obj 对象 + * + * @param bytes byte 数组 + * @param clazz obj 对象 class + * @return obj 对象 + */ + T deserializer(byte[] bytes, Class clazz); +} \ No newline at end of file diff --git a/did-core/src/main/java/io/github/ehlxr/did/serializer/SerializerHolder.java b/did-core/src/main/java/io/github/ehlxr/did/serializer/SerializerHolder.java new file mode 100644 index 0000000..c9c9bc7 --- /dev/null +++ b/did-core/src/main/java/io/github/ehlxr/did/serializer/SerializerHolder.java @@ -0,0 +1,25 @@ +package io.github.ehlxr.did.serializer; + +import io.github.ehlxr.did.extension.ExtensionLoader; + +/** + * @author ehlxr + * @since 2021-02-08 22:12. + */ +public final class SerializerHolder { + volatile private static Serializer serializer = null; + + private SerializerHolder() { + } + + public static Serializer get() { + if (serializer == null) { + synchronized (SerializerHolder.class) { + if (serializer == null) { + serializer = ExtensionLoader.getExtensionLoader(Serializer.class).getDefaultExtension(); + } + } + } + return serializer; + } +} diff --git a/did-core/src/main/resources/META-INF/extensions/internal/io.github.ehlxr.did.serializer.Serializer b/did-core/src/main/resources/META-INF/extensions/internal/io.github.ehlxr.did.serializer.Serializer new file mode 100644 index 0000000..a389703 --- /dev/null +++ b/did-core/src/main/resources/META-INF/extensions/internal/io.github.ehlxr.did.serializer.Serializer @@ -0,0 +1,2 @@ +jdk=io.github.ehlxr.did.serializer.JdkSerializer +protobuf=io.github.ehlxr.did.serializer.ProtobufSerializer \ No newline at end of file diff --git a/did-sdk/pom.xml b/did-sdk/pom.xml index 01d5551..9f23fe7 100644 --- a/did-sdk/pom.xml +++ b/did-sdk/pom.xml @@ -15,7 +15,7 @@ io.github.ehlxr - did-common + did-core junit diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/AbstractClient.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/AbstractClient.java index 9e6df66..f9d77ed 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/AbstractClient.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/AbstractClient.java @@ -1,7 +1,11 @@ package io.github.ehlxr.did.client; -import io.github.ehlxr.did.common.*; -import io.github.ehlxr.did.netty.MyProtocolBean; +import io.github.ehlxr.did.SdkProto; +import io.github.ehlxr.did.adapter.Message; +import io.github.ehlxr.did.common.Constants; +import io.github.ehlxr.did.common.NettyUtil; +import io.github.ehlxr.did.common.Result; +import io.github.ehlxr.did.common.Try; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -83,27 +87,25 @@ public abstract class AbstractClient implements Client { final Channel channel = channelFuture.channel(); if (channel.isOpen() && channel.isActive()) { final SdkProto sdkProto = new SdkProto(); - final int rqid = sdkProto.getRqid(); + final int rqid = sdkProto.rqid(); return Try.of(() -> { final ResponseFuture responseFuture = new ResponseFuture(timeoutMillis, null, null); REPONSE_MAP.put(rqid, responseFuture); logger.debug("write {} to channel", sdkProto); - - byte[] bytes = NettyUtil.toBytes(sdkProto); - MyProtocolBean myProtocolBean = new MyProtocolBean((byte) 0xA, (byte) 0xC, bytes.length, bytes); - channel.writeAndFlush(myProtocolBean).addListener((ChannelFutureListener) channelFuture -> { - if (channelFuture.isSuccess()) { - //发送成功后立即跳出 - return; - } - // 代码执行到此说明发送失败,需要释放资源 - REPONSE_MAP.remove(rqid); - responseFuture.putResponse(null); - responseFuture.setCause(channelFuture.cause()); - logger.error("send a request command to channel <{}> failed.", NettyUtil.parseRemoteAddr(channel)); - }); + channel.writeAndFlush(Message.newBuilder().type((byte) 0xA).flag((byte) 0xC).content(sdkProto).build()) + .addListener((ChannelFutureListener) channelFuture -> { + if (channelFuture.isSuccess()) { + //发送成功后立即跳出 + return; + } + // 代码执行到此说明发送失败,需要释放资源 + REPONSE_MAP.remove(rqid); + responseFuture.putResponse(null); + responseFuture.setCause(channelFuture.cause()); + logger.error("send a request command to channel <{}> failed.", NettyUtil.parseRemoteAddr(channel)); + }); // 阻塞等待响应 SdkProto proto = responseFuture.waitResponse(timeoutMillis); @@ -129,31 +131,29 @@ public abstract class AbstractClient implements Client { final Channel channel = channelFuture.channel(); if (channel.isOpen() && channel.isActive()) { final SdkProto sdkProto = new SdkProto(); - final int rqid = sdkProto.getRqid(); + final int rqid = sdkProto.rqid(); if (asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)) { final ResponseFuture responseFuture = new ResponseFuture(timeoutMillis, invokeCallback, asyncSemaphore); REPONSE_MAP.put(rqid, responseFuture); Try.of(() -> { logger.debug("write {} to channel", sdkProto); + channelFuture.channel().writeAndFlush(Message.newBuilder().type((byte) 0xA).flag((byte) 0xC).content(sdkProto).build()) + .addListener(channelFuture -> { + if (channelFuture.isSuccess()) { + return; + } - byte[] bytes = NettyUtil.toBytes(sdkProto); - MyProtocolBean myProtocolBean = new MyProtocolBean((byte) 0xA, (byte) 0xC, bytes.length, bytes); - channelFuture.channel().writeAndFlush(myProtocolBean).addListener(channelFuture -> { - if (channelFuture.isSuccess()) { - return; - } + // 代码执行到些说明发送失败,需要释放资源 + logger.error("send a request command to channel <{}> failed.", + NettyUtil.parseRemoteAddr(channel), channelFuture.cause()); - // 代码执行到些说明发送失败,需要释放资源 - logger.error("send a request command to channel <{}> failed.", - NettyUtil.parseRemoteAddr(channel), channelFuture.cause()); - - REPONSE_MAP.remove(rqid); - responseFuture.setCause(channelFuture.cause()); - responseFuture.putResponse(null); - responseFuture.executeInvokeCallback(); - responseFuture.release(); - }); + REPONSE_MAP.remove(rqid); + responseFuture.setCause(channelFuture.cause()); + responseFuture.putResponse(null); + responseFuture.executeInvokeCallback(); + responseFuture.release(); + }); }).trap(e -> { responseFuture.release(); logger.error("send a request to channel <{}> Exception", NettyUtil.parseRemoteAddr(channel), e); diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/Client.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/Client.java index dc7e16f..7cdf582 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/Client.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/Client.java @@ -1,8 +1,8 @@ package io.github.ehlxr.did.client; +import io.github.ehlxr.did.SdkProto; import io.github.ehlxr.did.common.Result; -import io.github.ehlxr.did.common.SdkProto; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/ResponseFuture.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/ResponseFuture.java index 31d77ea..d3569fe 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/ResponseFuture.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/ResponseFuture.java @@ -1,6 +1,6 @@ package io.github.ehlxr.did.client; -import io.github.ehlxr.did.common.SdkProto; +import io.github.ehlxr.did.SdkProto; import io.github.ehlxr.did.common.Try; import java.util.concurrent.CountDownLatch; diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java index 979503d..4987b12 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java @@ -1,10 +1,10 @@ package io.github.ehlxr.did.client; +import io.github.ehlxr.did.adapter.MessageDecoder; +import io.github.ehlxr.did.adapter.MessageEncoder; import io.github.ehlxr.did.client.handler.SdkClientHandler; import io.github.ehlxr.did.common.Constants; import io.github.ehlxr.did.common.Try; -import io.github.ehlxr.did.netty.MyProtocolDecoder; -import io.github.ehlxr.did.netty.MyProtocolEncoder; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; @@ -52,11 +52,8 @@ public class SdkClient extends AbstractClient { @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline() - // .addLast(new SdkClientDecoder(Constants.DECODER_FRAMELENGTH)) // 如果长度不够会等待 - // .addLast(new SdkClientEncoder()) - .addLast(new MyProtocolEncoder()) - .addLast(new MyProtocolDecoder(Constants.MAX_FRAME_LENGTH, Constants.LENGTH_FIELD_OFFSET, - Constants.LENGTH_FIELD_LENGTH, Constants.LENGTH_ADJUSTMENT, Constants.INITIAL_BYTES_TO_STRIP, false)) + .addLast(new MessageEncoder()) + .addLast(new MessageDecoder()) .addLast(new SdkClientHandler()); } }); diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientDecoder.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientDecoder.java deleted file mode 100644 index 450feb1..0000000 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientDecoder.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * The MIT License (MIT) - * - * Copyright © 2020 xrv - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ - -package io.github.ehlxr.did.client.handler; - -import io.github.ehlxr.did.common.NettyUtil; -import io.github.ehlxr.did.common.Try; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.FixedLengthFrameDecoder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author ehlxr - * @since 2021-01-20 14:42. - */ -public class SdkClientDecoder extends FixedLengthFrameDecoder { - private final Logger logger = LoggerFactory.getLogger(SdkClientDecoder.class); - - public SdkClientDecoder(int frameLength) { - super(frameLength); - } - - @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) { - return Try.of(() -> { - ByteBuf buf = (ByteBuf) super.decode(ctx, in); - - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - - buf.release(); - return NettyUtil.toObject(bytes); - }).trap(e -> logger.error("decode error", e)).get(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - Channel channel = ctx.channel(); - logger.error("SdkClientDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause); - NettyUtil.closeChannel(channel); - } -} \ No newline at end of file diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientHandler.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientHandler.java index d136024..b3306b8 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientHandler.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/handler/SdkClientHandler.java @@ -24,12 +24,12 @@ package io.github.ehlxr.did.client.handler; +import io.github.ehlxr.did.SdkProto; +import io.github.ehlxr.did.adapter.Message; import io.github.ehlxr.did.client.Client; import io.github.ehlxr.did.client.ResponseFuture; import io.github.ehlxr.did.common.NettyUtil; -import io.github.ehlxr.did.common.SdkProto; import io.github.ehlxr.did.common.Try; -import io.github.ehlxr.did.netty.MyProtocolBean; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.slf4j.Logger; @@ -39,17 +39,16 @@ import org.slf4j.LoggerFactory; * @author ehlxr * @since 2021-01-20 14:43. */ -public class SdkClientHandler extends SimpleChannelInboundHandler { +public class SdkClientHandler extends SimpleChannelInboundHandler> { private final Logger logger = LoggerFactory.getLogger(SdkClientHandler.class); @Override - protected void channelRead0(ChannelHandlerContext ctx, MyProtocolBean protocolBean) { - logger.debug("sdk client handler receive protocolBean {}", protocolBean); + protected void channelRead0(ChannelHandlerContext ctx, Message message) { + logger.debug("sdk client handler receive message {}", message); - SdkProto sdkProto = Try.of(p -> - (SdkProto) NettyUtil.toObject(p.getContent())) - .apply(protocolBean) - .get(SdkProto.newBuilder().build()); + SdkProto sdkProto = Try., SdkProto>of(m -> m.content(SdkProto.class)) + .apply(message) + .get(); final int rqid = sdkProto.getRqid(); final ResponseFuture responseFuture = Client.REPONSE_MAP.get(rqid); @@ -67,7 +66,7 @@ public class SdkClientHandler extends SimpleChannelInboundHandler - Try.of(() -> client.invokeAsync(responseFuture -> { - System.out.println(responseFuture.getSdkProto()); - countDownLatch.countDown(); - })).trap(Throwable::printStackTrace).run()); - - //noinspection ResultOfMethodCallIgnored - countDownLatch.await(10, TimeUnit.SECONDS); - System.out.println("invokeAsync test finish"); + // final CountDownLatch countDownLatch = new CountDownLatch(NUM); + // IntStream.range(0, NUM).parallel().forEach(i -> + // Try.of(() -> client.invokeAsync(responseFuture -> { + // System.out.println(responseFuture.getSdkProto()); + // countDownLatch.countDown(); + // })).trap(Throwable::printStackTrace).run()); + // + // //noinspection ResultOfMethodCallIgnored + // countDownLatch.await(10, TimeUnit.SECONDS); + // System.out.println("invokeAsync test finish"); } @Test diff --git a/did-server/pom.xml b/did-server/pom.xml index 0d005d3..112daee 100644 --- a/did-server/pom.xml +++ b/did-server/pom.xml @@ -16,7 +16,7 @@ io.github.ehlxr - did-common + did-core diff --git a/did-server/src/main/java/io/github/ehlxr/did/ServerStarter.java b/did-server/src/main/java/io/github/ehlxr/did/ServerStarter.java index 0f33b96..676828a 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/ServerStarter.java +++ b/did-server/src/main/java/io/github/ehlxr/did/ServerStarter.java @@ -1,7 +1,6 @@ package io.github.ehlxr.did; import io.github.ehlxr.did.common.Constants; -import io.github.ehlxr.did.core.SnowFlake; import io.github.ehlxr.did.server.Server; import io.github.ehlxr.did.server.http.HttpServer; import io.github.ehlxr.did.server.sdk.SdkServer; diff --git a/did-server/src/main/java/io/github/ehlxr/did/core/SnowFlake.java b/did-server/src/main/java/io/github/ehlxr/did/SnowFlake.java similarity index 99% rename from did-server/src/main/java/io/github/ehlxr/did/core/SnowFlake.java rename to did-server/src/main/java/io/github/ehlxr/did/SnowFlake.java index 44efdba..08f05b5 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/core/SnowFlake.java +++ b/did-server/src/main/java/io/github/ehlxr/did/SnowFlake.java @@ -1,4 +1,4 @@ -package io.github.ehlxr.did.core; +package io.github.ehlxr.did; /** * twitter 的 snowflake 算法 -- java 实现 diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/BaseServer.java b/did-server/src/main/java/io/github/ehlxr/did/server/BaseServer.java index 4953f9f..5c64a40 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/BaseServer.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/BaseServer.java @@ -1,7 +1,7 @@ package io.github.ehlxr.did.server; +import io.github.ehlxr.did.SnowFlake; import io.github.ehlxr.did.common.Try; -import io.github.ehlxr.did.core.SnowFlake; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/http/HttpServer.java b/did-server/src/main/java/io/github/ehlxr/did/server/http/HttpServer.java index c4d1299..669bb2a 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/http/HttpServer.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/http/HttpServer.java @@ -1,8 +1,8 @@ package io.github.ehlxr.did.server.http; +import io.github.ehlxr.did.SnowFlake; import io.github.ehlxr.did.common.Constants; import io.github.ehlxr.did.common.Try; -import io.github.ehlxr.did.core.SnowFlake; import io.github.ehlxr.did.server.BaseServer; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/http/HttpServerHandler.java b/did-server/src/main/java/io/github/ehlxr/did/server/http/HttpServerHandler.java index e798a94..6c458ba 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/http/HttpServerHandler.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/http/HttpServerHandler.java @@ -1,10 +1,10 @@ package io.github.ehlxr.did.server.http; +import io.github.ehlxr.did.SdkProto; +import io.github.ehlxr.did.SnowFlake; import io.github.ehlxr.did.common.Constants; import io.github.ehlxr.did.common.NettyUtil; import io.github.ehlxr.did.common.Result; -import io.github.ehlxr.did.common.SdkProto; -import io.github.ehlxr.did.core.SnowFlake; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServer.java b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServer.java index 03461d5..65d532f 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServer.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServer.java @@ -1,10 +1,10 @@ package io.github.ehlxr.did.server.sdk; +import io.github.ehlxr.did.SnowFlake; +import io.github.ehlxr.did.adapter.MessageDecoder; +import io.github.ehlxr.did.adapter.MessageEncoder; import io.github.ehlxr.did.common.Constants; import io.github.ehlxr.did.common.Try; -import io.github.ehlxr.did.core.SnowFlake; -import io.github.ehlxr.did.netty.MyProtocolDecoder; -import io.github.ehlxr.did.netty.MyProtocolEncoder; import io.github.ehlxr.did.server.BaseServer; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; @@ -51,11 +51,8 @@ public class SdkServer extends BaseServer { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(defLoopGroup, - // new SdkServerDecoder(Constants.DECODER_FRAMELENGTH),// 如果长度不够会等待 - // new SdkServerEncoder(), - new MyProtocolEncoder(), - new MyProtocolDecoder(Constants.MAX_FRAME_LENGTH, Constants.LENGTH_FIELD_OFFSET, - Constants.LENGTH_FIELD_LENGTH, Constants.LENGTH_ADJUSTMENT, Constants.INITIAL_BYTES_TO_STRIP, false), + new MessageEncoder(), + new MessageDecoder(), new SdkServerHandler(snowFlake) ); } diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java deleted file mode 100644 index 684a98a..0000000 --- a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java +++ /dev/null @@ -1,41 +0,0 @@ -package io.github.ehlxr.did.server.sdk; - -import io.github.ehlxr.did.common.NettyUtil; -import io.github.ehlxr.did.common.Try; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.FixedLengthFrameDecoder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author ehlxr - */ -public class SdkServerDecoder extends FixedLengthFrameDecoder { - private static final Logger logger = LoggerFactory.getLogger(SdkServerDecoder.class); - - SdkServerDecoder(int frameLength) { - super(frameLength); - } - - @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) { - return Try.of(() -> { - ByteBuf buf = (ByteBuf) super.decode(ctx, in); - - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - - buf.release(); - return NettyUtil.toObject(bytes); - }).trap(e -> logger.error("decode error", e)).get(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - Channel channel = ctx.channel(); - logger.error("SdkServerDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause); - NettyUtil.closeChannel(channel); - } -} diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerEncoder.java b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerEncoder.java deleted file mode 100644 index 7bddafb..0000000 --- a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerEncoder.java +++ /dev/null @@ -1,33 +0,0 @@ -package io.github.ehlxr.did.server.sdk; - -import io.github.ehlxr.did.common.NettyUtil; -import io.github.ehlxr.did.common.SdkProto; -import io.github.ehlxr.did.common.Try; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author ehlxr - */ -public class SdkServerEncoder extends MessageToByteEncoder { - private static final Logger logger = LoggerFactory.getLogger(SdkServerEncoder.class); - - - @Override - protected void encode(ChannelHandlerContext channelHandlerContext, SdkProto sdkProto, ByteBuf out) { - Try.of(() -> { - out.writeBytes(NettyUtil.toBytes(sdkProto)); - }).trap(e -> logger.error("encode error", e)).run(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - Channel channel = ctx.channel(); - logger.error("SdkServerEncoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause); - NettyUtil.closeChannel(channel); - } -} diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerHandler.java b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerHandler.java index a5750be..7b00f08 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerHandler.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerHandler.java @@ -1,10 +1,10 @@ package io.github.ehlxr.did.server.sdk; +import io.github.ehlxr.did.SdkProto; +import io.github.ehlxr.did.SnowFlake; +import io.github.ehlxr.did.adapter.Message; import io.github.ehlxr.did.common.Constants; import io.github.ehlxr.did.common.NettyUtil; -import io.github.ehlxr.did.common.SdkProto; -import io.github.ehlxr.did.core.SnowFlake; -import io.github.ehlxr.did.netty.MyProtocolBean; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit; * * @author ehlxr */ -public class SdkServerHandler extends SimpleChannelInboundHandler { +public class SdkServerHandler extends SimpleChannelInboundHandler> { private static final Logger logger = LoggerFactory.getLogger(SdkServerHandler.class); /** * 通过信号量来控制流量 @@ -33,23 +33,23 @@ public class SdkServerHandler extends SimpleChannelInboundHandler message) throws Exception { + logger.debug("sdk server handler receive message {}", message); if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) { - SdkProto sdkProto = (SdkProto) NettyUtil.toObject(protocolBean.getContent()); + SdkProto sdkProto = message.content(SdkProto.class); sdkProto.setDid(snowFlake.nextId()); - protocolBean.setContent(NettyUtil.toBytes(sdkProto)); + message.setContent(sdkProto); semaphore.release(); } else { logger.error("tryAcquire timeout after {}ms, {} threads waiting to acquire, {} permits available in this semaphore", Constants.ACQUIRE_TIMEOUTMILLIS, this.semaphore.getQueueLength(), this.semaphore.availablePermits()); } - logger.debug("sdk server handler write protocolBean {} to channel", protocolBean); + logger.debug("sdk server handler write message {} to channel", message); ctx.channel(). - writeAndFlush(protocolBean). + writeAndFlush(message). addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } diff --git a/pom.xml b/pom.xml index 3093e89..aee9a3c 100644 --- a/pom.xml +++ b/pom.xml @@ -12,6 +12,7 @@ did-server did-sdk did-common + did-core did-parent @@ -28,6 +29,7 @@ 1.1.7 4.13.1 2.10.3 + 1.7.2 @@ -53,6 +55,11 @@ did-common ${did.version} + + io.github.ehlxr + did-core + ${did.version} + com.fasterxml.jackson.core jackson-annotations @@ -64,6 +71,17 @@ jackson-databind ${jackson.version} + + + io.protostuff + protostuff-core + ${protostuff.version} + + + io.protostuff + protostuff-runtime + ${protostuff.version} +