From e929cd180f73b2cea3e7cd31d800ba7f19d28406 Mon Sep 17 00:00:00 2001 From: ehlxr Date: Sun, 7 Feb 2021 22:11:59 +0800 Subject: [PATCH] update at 2021-02-07 22:11:59 by ehlxr --- .drone.yml | 10 +- .drone2.yml | 10 +- did-common/pom.xml | 2 +- .../io/github/ehlxr/did/common/Constants.java | 15 +- .../io/github/ehlxr/did/common/JsonUtils.java | 159 ++++++++++++++++++ .../io/github/ehlxr/did/common/Result.java | 18 +- .../io/github/ehlxr/did/common/SdkProto.java | 21 +-- did-sdk/pom.xml | 2 +- .../ehlxr/did/client/AbstractClient.java | 35 ++-- .../io/github/ehlxr/did/client/SdkClient.java | 11 +- .../java/io/github/ehlxr/did/DidSdkTest.java | 4 +- did-server/pom.xml | 2 +- .../io/github/ehlxr/did/core/SnowFlake.java | 28 +-- .../did/server/sdk/SdkServerDecoder.java | 4 +- .../did/server/sdk/SdkServerHandler.java | 3 +- pom.xml | 2 +- 16 files changed, 230 insertions(+), 96 deletions(-) create mode 100644 did-common/src/main/java/io/github/ehlxr/did/common/JsonUtils.java diff --git a/.drone.yml b/.drone.yml index 5fa7a09..4bfb491 100644 --- a/.drone.yml +++ b/.drone.yml @@ -12,9 +12,9 @@ steps: - name: m2_cache # The Volume's name path: /root/.m2 # The path in the container commands: - - mvn clean install -DskipTests -e -U - - cp ./did-server/target/did-server*.jar ./docker - - echo -n "$(date -d @${DRONE_BUILD_CREATED} '+%Y%m%d_%H%M%S')_${DRONE_BUILD_NUMBER}, $(grep '.*' pom.xml | head -1 | awk -F '[>,<]' '{print $3}'), latest" > .tags + - mvn clean install -DskipTests -e -U + - cp ./did-server/target/did-server*.jar ./docker + - echo -n "$(date -d @${DRONE_BUILD_CREATED} '+%Y%m%d_%H%M%S')_${DRONE_BUILD_NUMBER}, $(grep '.*' pom.xml | head -1 | awk -F '[>,<]' '{print $3}'), latest" > .tags - name: docker image: plugins/docker @@ -50,8 +50,8 @@ steps: type: markdown when: status: - - failure - - success + - failure + - success volumes: - name: m2_cache # The name use in this pipeline, diff --git a/.drone2.yml b/.drone2.yml index b4939ba..f75e87c 100644 --- a/.drone2.yml +++ b/.drone2.yml @@ -9,9 +9,9 @@ steps: - name: build image: maven commands: - - mvn clean install -DskipTests -e -U - - cp ./did-server/target/did-server*.jar ./docker - - echo -n "$(date -d @${DRONE_BUILD_CREATED} '+%Y%m%d_%H%M%S')_${DRONE_BUILD_NUMBER}, $(grep '.*' pom.xml | head -1 | awk -F '[>,<]' '{print $3}'), latest" > .tags + - mvn clean install -DskipTests -e -U + - cp ./did-server/target/did-server*.jar ./docker + - echo -n "$(date -d @${DRONE_BUILD_CREATED} '+%Y%m%d_%H%M%S')_${DRONE_BUILD_NUMBER}, $(grep '.*' pom.xml | head -1 | awk -F '[>,<]' '{print $3}'), latest" > .tags - name: docker image: plugins/docker @@ -47,5 +47,5 @@ steps: type: markdown when: status: - - failure - - success \ No newline at end of file + - failure + - success \ No newline at end of file diff --git a/did-common/pom.xml b/did-common/pom.xml index 85c8b55..b76a4c4 100644 --- a/did-common/pom.xml +++ b/did-common/pom.xml @@ -1,6 +1,6 @@ - did-parent diff --git a/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java b/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java index 4e38063..1cb14a1 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java +++ b/did-common/src/main/java/io/github/ehlxr/did/common/Constants.java @@ -7,18 +7,12 @@ import java.util.Map; */ public class Constants { private static final Map SYS_ENV = System.getenv(); - - public static String getEnv(String key) { - return SYS_ENV.get(key) == null ? "" : SYS_ENV.get(key); - } - public static String SERVER_HOST = "localhost"; /** * HTTP 协议和 SDK 协议服务器默认端口 */ public static int HTTP_PORT = 16830; public static int SDK_PORT = 16831; - /** * 数据中心默认标识 ID,取值范围:0~31 * 机器或进程默认标识 ID,取值范围:0~31 @@ -27,27 +21,26 @@ public class Constants { */ public static long DATACENTER_ID = 1; public static long MACHINES_ID = 1; - /** * Server 流量控制,表示每秒处理的并发数 */ public static int HANDLE_HTTP_TPS = 10000; public static int HANDLE_SDK_TPS = 50000; - /** * sdk client 流量控制,表示每秒处理的并发数 */ public static int SDK_CLIENT_ASYNC_TPS = 100000; public static int SDK_CLIENT_ONEWAY_TPS = 100000; - public static int ACQUIRE_TIMEOUTMILLIS = 5000; - /** * sdk client 默认超时时间 */ public static int SDK_CLIENT_TIMEOUTMILLIS = 2000; - private Constants() { } + + public static String getEnv(String key) { + return SYS_ENV.get(key) == null ? "" : SYS_ENV.get(key); + } } diff --git a/did-common/src/main/java/io/github/ehlxr/did/common/JsonUtils.java b/did-common/src/main/java/io/github/ehlxr/did/common/JsonUtils.java new file mode 100644 index 0000000..45f3841 --- /dev/null +++ b/did-common/src/main/java/io/github/ehlxr/did/common/JsonUtils.java @@ -0,0 +1,159 @@ +package io.github.ehlxr.did.common; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.*; +import com.fasterxml.jackson.databind.node.MissingNode; +import io.netty.util.internal.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.SimpleDateFormat; + +/** + * JSON 处理类 + * + * @author ehlxr + * @since 2020/5/6. + */ +@SuppressWarnings({"unused", "unchecked"}) +public class JsonUtils { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Logger logger = LoggerFactory.getLogger(JsonUtils.class); + + static { + // 对象的所有字段全部列入 + OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.ALWAYS); + // 取消默认转换 timestamps 形式 + OBJECT_MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + // 忽略空 bean 转 JSON 的错误 + OBJECT_MAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + // 所有的日期格式都统一为以下的样式:yyyy-MM-dd HH:mm:ss + OBJECT_MAPPER.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); + // 忽略在 JSON 字符串中存在,但是在 java 对象中不存在对应属性的情况 + OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + public static ObjectMapper om() { + return OBJECT_MAPPER; + } + + /** + * 对象转为 JsonNode 实例 + * + * @param obj 要转换的对象 + * @param 要转换的对象类型 + * @return {@link JsonNode}实例 + */ + public static JsonNode obj2JsonNode(T obj) { + try { + return OBJECT_MAPPER.readTree(obj2String(obj)); + } catch (JsonProcessingException e) { + logger.error("", e); + return MissingNode.getInstance(); + } + } + + /** + * 对象转为 JSON 字符串 + * + * @param obj 要转换的对象 + * @param 要转换的对象类型 + * @return JSON 字符串 + */ + public static String obj2String(T obj) { + if (obj == null) { + return ""; + } + try { + return obj instanceof String ? (String) obj : OBJECT_MAPPER.writeValueAsString(obj); + } catch (JsonProcessingException e) { + logger.error("", e); + return ""; + } + } + + /** + * 对象转为格式化的 JSON 字符串 + * + * @param obj 要转换的对象 + * @param 要转换的对象类型 + * @return 格式化的 JSON 字符串 + */ + public static String obj2StringPretty(T obj) { + if (obj == null) { + return ""; + } + try { + return obj instanceof String ? + (String) obj : + OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj); + } catch (JsonProcessingException e) { + logger.error("", e); + return ""; + } + } + + /** + * 字符串转换为自定义对象 + * + * @param str 要转换的字符串 + * @param clazz 自定义对象的 class 对象 + * @param 自定义对象类型 + * @return 自定义对象 + */ + public static T string2Obj(String str, Class clazz) { + if (StringUtil.isNullOrEmpty(str) || clazz == null) { + throw new RuntimeException("json string to obj param should not empty"); + } + try { + return clazz.equals(String.class) ? (T) str : OBJECT_MAPPER.readValue(str, clazz); + } catch (JsonProcessingException e) { + logger.error("", e); + return null; + } + } + + /** + * 字符串转换为自定义对象 + * + * @param str 要转换的字符串 + * @param typeReference 集合对象 typeReference + * @param 集合对象类型 + * @return 自定义对象 + */ + public static T string2Obj(String str, TypeReference typeReference) { + if (StringUtil.isNullOrEmpty(str) || typeReference == null) { + throw new RuntimeException("json string to obj param should not empty"); + } + try { + return typeReference.getType().equals(String.class) ? + (T) str : + OBJECT_MAPPER.readValue(str, typeReference); + } catch (JsonProcessingException e) { + logger.error("", e); + return null; + } + } + + /** + * 字符串转换为自定义对象 + * + * @param str 要转换的字符串 + * @param collectionClazz 集合 class + * @param elementClazzes 集合对象 class + * @param 集合对象类型 + * @return 自定义对象 + */ + public static T string2Obj(String str, Class collectionClazz, Class... elementClazzes) { + JavaType javaType = OBJECT_MAPPER.getTypeFactory().constructParametricType(collectionClazz, elementClazzes); + try { + return OBJECT_MAPPER.readValue(str, javaType); + } catch (JsonProcessingException e) { + logger.error("", e); + return null; + } + } +} + diff --git a/did-common/src/main/java/io/github/ehlxr/did/common/Result.java b/did-common/src/main/java/io/github/ehlxr/did/common/Result.java index 673f749..db527fd 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/common/Result.java +++ b/did-common/src/main/java/io/github/ehlxr/did/common/Result.java @@ -1,13 +1,9 @@ package io.github.ehlxr.did.common; import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; import io.netty.util.internal.StringUtil; import java.io.Serializable; -import java.text.SimpleDateFormat; import java.util.Objects; /** @@ -16,6 +12,7 @@ import java.util.Objects; * @author ehlxr * @since 2020/3/18. */ +@SuppressWarnings("unused") @JsonInclude(JsonInclude.Include.NON_NULL) public class Result implements Serializable { private static final long serialVersionUID = -2758720512348727698L; @@ -133,18 +130,7 @@ public class Result implements Serializable { @Override public String toString() { - try { - ObjectMapper om = new ObjectMapper(); - // 取消时间的转化格式, 默认是时间戳 - om.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); - // 设置时间格式 - om.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); - om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); - om.configure(SerializationFeature.WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED, false); - return om.writeValueAsString(this); - } catch (JsonProcessingException e) { - return ""; - } + return JsonUtils.obj2String(this); } } diff --git a/did-common/src/main/java/io/github/ehlxr/did/common/SdkProto.java b/did-common/src/main/java/io/github/ehlxr/did/common/SdkProto.java index 3da7da6..d9fb36e 100644 --- a/did-common/src/main/java/io/github/ehlxr/did/common/SdkProto.java +++ b/did-common/src/main/java/io/github/ehlxr/did/common/SdkProto.java @@ -27,12 +27,18 @@ public class SdkProto { this.did = did; } + public static SdkProtoBuilder newBuilder() { + return new SdkProtoBuilder(); + } + public int getRqid() { return rqid; } - public static SdkProtoBuilder newBuilder() { - return new SdkProtoBuilder(); + public void setRqid(int rqid) { + if (rqid > 0) { + this.rqid = rqid; + } } public long getDid() { @@ -45,16 +51,7 @@ public class SdkProto { @Override public String toString() { - return "SdkProto{" + - "rqid=" + rqid + - ", did=" + did + - '}'; - } - - public void setRqid(int rqid) { - if (rqid > 0) { - this.rqid = rqid; - } + return JsonUtils.obj2String(this); } public static final class SdkProtoBuilder { diff --git a/did-sdk/pom.xml b/did-sdk/pom.xml index 5d2a5ea..01d5551 100644 --- a/did-sdk/pom.xml +++ b/did-sdk/pom.xml @@ -1,6 +1,6 @@ - did-parent diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/AbstractClient.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/AbstractClient.java index ec4abea..f05909d 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/AbstractClient.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/AbstractClient.java @@ -84,7 +84,7 @@ public abstract class AbstractClient implements Client { @Override public Result invokeSync(long timeoutMillis) { final Channel channel = channelFuture.channel(); - if (channel.isActive()) { + if (channel.isOpen() && channel.isActive()) { final SdkProto sdkProto = new SdkProto(); final int rqid = sdkProto.getRqid(); try { @@ -115,7 +115,7 @@ public abstract class AbstractClient implements Client { } } else { NettyUtil.closeChannel(channel); - return Result.fail(NettyUtil.parseRemoteAddr(channel)); + return Result.fail("channel " + NettyUtil.parseRemoteAddr(channel) + "is not active!"); } } @@ -133,37 +133,36 @@ public abstract class AbstractClient implements Client { if (channelFuture.isSuccess()) { return; } + // 代码执行到些说明发送失败,需要释放资源 + logger.error("send a request command to channel <{}> failed.", + NettyUtil.parseRemoteAddr(channel), channelFuture.cause()); + REPONSE_MAP.remove(rqid); responseFuture.setCause(channelFuture.cause()); responseFuture.putResponse(null); - try { - responseFuture.executeInvokeCallback(); - } catch (Exception e) { - logger.warn("excute callback in writeAndFlush addListener, and callback throw", e); - } finally { - responseFuture.release(); - } - logger.warn("send a request command to channel <{}> failed.", - NettyUtil.parseRemoteAddr(channel), channelFuture.cause()); + responseFuture.executeInvokeCallback(); + responseFuture.release(); }); } catch (Exception e) { responseFuture.release(); - logger.warn("send a request to channel <{}> Exception", - NettyUtil.parseRemoteAddr(channel), e); - throw new Exception(NettyUtil.parseRemoteAddr(channel), e); + String msg = String.format("send a request to channel <%s> Exception", + NettyUtil.parseRemoteAddr(channel)); + + logger.error(msg, e); + throw new Exception(msg, e); } } else { - String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + + String msg = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.asyncSemaphore.getQueueLength(), this.asyncSemaphore.availablePermits()); - logger.warn(info); - throw new Exception(info); + logger.error(msg); + throw new Exception(msg); } } else { NettyUtil.closeChannel(channel); - throw new Exception(NettyUtil.parseRemoteAddr(channel)); + throw new Exception(String.format("channel %s is not active!", NettyUtil.parseRemoteAddr(channel))); } } diff --git a/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java b/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java index e879a10..8a36e23 100644 --- a/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java +++ b/did-sdk/src/main/java/io/github/ehlxr/did/client/SdkClient.java @@ -57,11 +57,12 @@ public class SdkClient extends AbstractClient { }); try { - channelFuture = bootstrap.connect(host, port).sync(); - - channelFuture.channel().closeFuture().addListener((ChannelFutureListener) channelFuture -> { - logger.warn("client channel close.", channelFuture.cause()); - }); + channelFuture = bootstrap.connect(host, port) + .sync() + .channel() + .closeFuture() + .addListener((ChannelFutureListener) channelFuture -> + logger.warn("client channel close.", channelFuture.cause())); InetSocketAddress address = (InetSocketAddress) channelFuture.channel().remoteAddress(); logger.info("SdkClient start success, host is {}, port is {}", address.getHostName(), address.getPort()); diff --git a/did-sdk/src/test/java/io/github/ehlxr/did/DidSdkTest.java b/did-sdk/src/test/java/io/github/ehlxr/did/DidSdkTest.java index 485a94e..6d1b180 100644 --- a/did-sdk/src/test/java/io/github/ehlxr/did/DidSdkTest.java +++ b/did-sdk/src/test/java/io/github/ehlxr/did/DidSdkTest.java @@ -31,14 +31,14 @@ public class DidSdkTest { @Test public void didSdkTest() throws Exception { - // 测试同步请求,关注rqid是否对应 + // 测试同步请求 for (int i = 0; i < NUM; i++) { Result resultProto = client.invokeSync(); System.out.println(resultProto); } System.out.println("invokeync test finish"); - // 测试异步请求,关注rqid是否对应 + // 测试异步请求 final CountDownLatch countDownLatch = new CountDownLatch(NUM); for (int i = 0; i < NUM; i++) { client.invokeAsync(responseFuture -> { diff --git a/did-server/pom.xml b/did-server/pom.xml index fcbcfd6..0d005d3 100644 --- a/did-server/pom.xml +++ b/did-server/pom.xml @@ -1,6 +1,6 @@ - did-parent diff --git a/did-server/src/main/java/io/github/ehlxr/did/core/SnowFlake.java b/did-server/src/main/java/io/github/ehlxr/did/core/SnowFlake.java index c7545f2..44efdba 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/core/SnowFlake.java +++ b/did-server/src/main/java/io/github/ehlxr/did/core/SnowFlake.java @@ -91,6 +91,20 @@ public class SnowFlake { this.machineId = machineId; } + public static void main(String[] args) { + SnowFlake snowFlake = new SnowFlake(2, 3); + long start = System.currentTimeMillis(); + for (int i = 0; i < (1 << 18); i++) { + System.out.println(i + ": " + snowFlake.nextId()); + } + long end = System.currentTimeMillis(); + System.out.println(end - start); + } + + public static SnowFlakeBuilder newBuilder() { + return new SnowFlakeBuilder(); + } + /** * 产生下一个ID */ @@ -130,20 +144,6 @@ public class SnowFlake { return System.currentTimeMillis(); } - public static void main(String[] args) { - SnowFlake snowFlake = new SnowFlake(2, 3); - long start = System.currentTimeMillis(); - for (int i = 0; i < (1 << 18); i++) { - System.out.println(i + ": " + snowFlake.nextId()); - } - long end = System.currentTimeMillis(); - System.out.println(end - start); - } - - public static SnowFlakeBuilder newBuilder() { - return new SnowFlakeBuilder(); - } - public static final class SnowFlakeBuilder { private long datacenterId; private long machineId; diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java index 95a589c..11baf8c 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerDecoder.java @@ -31,7 +31,7 @@ public class SdkServerDecoder extends FixedLengthFrameDecoder { } catch (Exception e) { logger.error("decode exception, " + NettyUtil.parseRemoteAddr(ctx.channel()), e); NettyUtil.closeChannel(ctx.channel()); - }finally { + } finally { if (buf != null) { buf.release(); } @@ -42,7 +42,7 @@ public class SdkServerDecoder extends FixedLengthFrameDecoder { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { Channel channel = ctx.channel(); - logger.error("SdkServerDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel),cause); + logger.error("SdkServerDecoder channel [{}] error and will be closed", NettyUtil.parseRemoteAddr(channel), cause); NettyUtil.closeChannel(channel); } } diff --git a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerHandler.java b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerHandler.java index a87f14c..04e6671 100644 --- a/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerHandler.java +++ b/did-server/src/main/java/io/github/ehlxr/did/server/sdk/SdkServerHandler.java @@ -38,9 +38,8 @@ public class SdkServerHandler extends SimpleChannelInboundHandler { semaphore.release(); } else { - String info = String.format("SdkServerHandler tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d availablePermit: %d", + logger.error("tryAcquire timeout after {}ms, {} threads waiting to acquire, {} permits available in this semaphore", Constants.ACQUIRE_TIMEOUTMILLIS, this.semaphore.getQueueLength(), this.semaphore.availablePermits()); - logger.error(info); } ctx.channel(). diff --git a/pom.xml b/pom.xml index 92d0fc3..3093e89 100644 --- a/pom.xml +++ b/pom.xml @@ -1,6 +1,6 @@ - 4.0.0