Compare commits

...

3 Commits

Author SHA1 Message Date
32f2d93e55 Merge branch 'master' of github.com:ehlxr/did
All checks were successful
continuous-integration/drone/push Build is passing
2021-02-07 18:22:42 +08:00
eca950d081 Optimized code 2021-02-07 18:22:17 +08:00
c869d9cc20 Optimized code 2021-02-07 18:21:08 +08:00
11 changed files with 312 additions and 38 deletions

View File

@ -22,6 +22,15 @@
<groupId>ch.qos.logback</groupId> <groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId> <artifactId>logback-classic</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,54 @@
package io.github.ehlxr.did.common;
/**
* 自定义状态码
*
* @author ehlxr
* @since 2020/3/18.
*/
public enum Code {
/**
* 成功
*/
SUCCESSFUL(200, "success"),
/**
* 未知异常
*/
UNKNOWN_EXCEPTION(600, "系统异常,请联系管理员");
private static final Code[] CODES = Code.values();
private final int code;
private final String message;
Code(int code, String message) {
this.code = code;
this.message = message;
}
public static Code code(int code) {
for (Code c : CODES) {
if (code == c.getCode()) {
return c;
}
}
return null;
}
public int getCode() {
return code;
}
public String getMessage() {
return message;
}
@Override
public String toString() {
return "Code{" +
"code=" + code +
", message='" + message + '\'' +
'}';
}
}

View File

@ -0,0 +1,150 @@
package io.github.ehlxr.did.common;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import io.netty.util.internal.StringUtil;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Objects;
/**
* 统一输出结果集
*
* @author ehlxr
* @since 2020/3/18.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class Result<T> implements Serializable {
private static final long serialVersionUID = -2758720512348727698L;
/**
* 响应编码
*/
private int code;
/**
* 消息如错误消息
*/
private String message;
/**
* 数据内容
*/
private T data;
private Result() {
}
private Result(int code, T data, String message) {
this.code = code;
this.data = data;
this.message = message;
}
public static <T> Result<T> success(T data, String message) {
return new Result<>(Code.SUCCESSFUL.getCode(), data, message);
}
public static <T> Result<T> success(T data) {
return success(data, null);
}
public static <T> Result<T> success() {
return success(null);
}
public static <T> Result<T> of(int c, T d, String m) {
return new Result<>(c, d, m);
}
public static <T> Result<T> of(Code c, T d, String m) {
return new Result<>(c.getCode(), d, m);
}
public static <T> Result<T> fail(Code code, String message) {
return of(code.getCode(), null, message);
}
public static <T> Result<T> fail(String message) {
return fail(Code.UNKNOWN_EXCEPTION.getCode(), message);
}
public static <T> Result<T> fail(int code, String message) {
return of(code, null, message);
}
public static <T> Result<T> fail(Code code) {
return fail(code, code.getMessage());
}
public static <T> Result<T> fail(Throwable e) {
return of(Code.UNKNOWN_EXCEPTION.getCode(), null, String.format("%s: %s", e.getClass().getSimpleName(), e.getMessage()));
}
public String getMessage() {
// return StringUtil.isNullOrEmpty(m) ? c.getMessage() : m;
if (StringUtil.isNullOrEmpty(message)) {
Code code;
try {
code = Code.code(this.code);
} catch (Exception e) {
return message;
}
return Objects.isNull(code) ? "" : code.getMessage();
}
return message;
}
public void setMessage(String message) {
this.message = message;
}
public int getCode() {
// return Objects.nonNull(c) ? c.getCode() : Code.UNKNOWN_EXCEPTION.getCode();
return code;
}
public void setCode(int code) {
this.code = code;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Result<?> result = (Result<?>) o;
return code == result.code &&
Objects.equals(message, result.message) &&
Objects.equals(data, result.data);
}
@Override
public String toString() {
try {
ObjectMapper om = new ObjectMapper();
// 取消时间的转化格式, 默认是时间戳
om.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
// 设置时间格式
om.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
om.configure(SerializationFeature.WRITE_SINGLE_ELEM_ARRAYS_UNWRAPPED, false);
return om.writeValueAsString(this);
} catch (JsonProcessingException e) {
return "";
}
}
}

View File

@ -31,8 +31,8 @@ public class SdkProto {
return rqid; return rqid;
} }
public void setRqid(int rqid) { public static SdkProtoBuilder newBuilder() {
this.rqid = rqid; return new SdkProtoBuilder();
} }
public long getDid() { public long getDid() {
@ -50,4 +50,35 @@ public class SdkProto {
", did=" + did + ", did=" + did +
'}'; '}';
} }
public void setRqid(int rqid) {
if (rqid > 0) {
this.rqid = rqid;
}
}
public static final class SdkProtoBuilder {
private int rqid;
private long did;
private SdkProtoBuilder() {
}
public SdkProtoBuilder rqid(int rqid) {
this.rqid = rqid;
return this;
}
public SdkProtoBuilder did(long did) {
this.did = did;
return this;
}
public SdkProto build() {
SdkProto sdkProto = new SdkProto();
sdkProto.setRqid(rqid);
sdkProto.setDid(did);
return sdkProto;
}
}
} }

View File

@ -2,6 +2,7 @@ package io.github.ehlxr.did.client;
import io.github.ehlxr.did.common.Constants; import io.github.ehlxr.did.common.Constants;
import io.github.ehlxr.did.common.NettyUtil; import io.github.ehlxr.did.common.NettyUtil;
import io.github.ehlxr.did.common.Result;
import io.github.ehlxr.did.common.SdkProto; import io.github.ehlxr.did.common.SdkProto;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -81,7 +82,7 @@ public abstract class AbstractClient implements Client {
} }
@Override @Override
public SdkProto invokeSync(long timeoutMillis) throws Exception { public Result<SdkProto> invokeSync(long timeoutMillis) {
final Channel channel = channelFuture.channel(); final Channel channel = channelFuture.channel();
if (channel.isActive()) { if (channel.isActive()) {
final SdkProto sdkProto = new SdkProto(); final SdkProto sdkProto = new SdkProto();
@ -98,23 +99,23 @@ public abstract class AbstractClient implements Client {
REPONSE_MAP.remove(rqid); REPONSE_MAP.remove(rqid);
responseFuture.putResponse(null); responseFuture.putResponse(null);
responseFuture.setCause(channelFuture.cause()); responseFuture.setCause(channelFuture.cause());
logger.warn("send a request command to channel <{}> failed.", NettyUtil.parseRemoteAddr(channel)); logger.error("send a request command to channel <{}> failed.", NettyUtil.parseRemoteAddr(channel));
}); });
// 阻塞等待响应 // 阻塞等待响应
SdkProto resultProto = responseFuture.waitResponse(timeoutMillis); SdkProto proto = responseFuture.waitResponse(timeoutMillis);
if (null == resultProto) { if (null == proto) {
throw new Exception(NettyUtil.parseRemoteAddr(channel), responseFuture.getCause()); return Result.fail("get result fail, addr is " + NettyUtil.parseRemoteAddr(channel) + responseFuture.getCause());
} }
return resultProto; return Result.success(proto);
} catch (Exception e) { } catch (Exception e) {
logger.error("invokeSync fail, addr is " + NettyUtil.parseRemoteAddr(channel), e); logger.error("invokeSync fail, addr is " + NettyUtil.parseRemoteAddr(channel), e);
throw new Exception(NettyUtil.parseRemoteAddr(channel), e); return Result.fail("invokeSync fail, addr is " + NettyUtil.parseRemoteAddr(channel) + e.getMessage());
} finally { } finally {
REPONSE_MAP.remove(rqid); REPONSE_MAP.remove(rqid);
} }
} else { } else {
NettyUtil.closeChannel(channel); NettyUtil.closeChannel(channel);
throw new Exception(NettyUtil.parseRemoteAddr(channel)); return Result.fail(NettyUtil.parseRemoteAddr(channel));
} }
} }
@ -166,11 +167,11 @@ public abstract class AbstractClient implements Client {
} }
} }
public long invoke() throws Exception { public Result<SdkProto> invoke() {
return invoke(timeoutMillis); return invoke(timeoutMillis);
} }
public SdkProto invokeSync() throws Exception { public Result<SdkProto> invokeSync() {
return invokeSync(timeoutMillis); return invokeSync(timeoutMillis);
} }

View File

@ -1,6 +1,7 @@
package io.github.ehlxr.did.client; package io.github.ehlxr.did.client;
import io.github.ehlxr.did.common.Result;
import io.github.ehlxr.did.common.SdkProto; import io.github.ehlxr.did.common.SdkProto;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -29,7 +30,7 @@ public interface Client {
* @return {@link SdkProto} * @return {@link SdkProto}
* @throws Exception 调用异常 * @throws Exception 调用异常
*/ */
SdkProto invokeSync(long timeoutMillis) throws Exception; Result<SdkProto> invokeSync(long timeoutMillis);
/** /**
* 异步调用 * 异步调用
@ -47,7 +48,7 @@ public interface Client {
* @return id * @return id
* @throws Exception 调用异常 * @throws Exception 调用异常
*/ */
default long invoke(long timeoutMillis) throws Exception { default Result<SdkProto> invoke(long timeoutMillis) {
return invokeSync(timeoutMillis).getDid(); return invokeSync(timeoutMillis);
} }
} }

View File

@ -1,6 +1,7 @@
package io.github.ehlxr.did; package io.github.ehlxr.did;
import io.github.ehlxr.did.client.SdkClient; import io.github.ehlxr.did.client.SdkClient;
import io.github.ehlxr.did.common.Result;
import io.github.ehlxr.did.common.SdkProto; import io.github.ehlxr.did.common.SdkProto;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -32,18 +33,17 @@ public class DidSdkTest {
public void didSdkTest() throws Exception { public void didSdkTest() throws Exception {
// 测试同步请求关注rqid是否对应 // 测试同步请求关注rqid是否对应
for (int i = 0; i < NUM; i++) { for (int i = 0; i < NUM; i++) {
SdkProto resultProto = client.invokeSync(); Result<SdkProto> resultProto = client.invokeSync();
System.out.println(i + " resultProto: " + resultProto); System.out.println(resultProto);
} }
System.out.println("invokeync test finish"); System.out.println("invokeync test finish");
// 测试异步请求关注rqid是否对应 // 测试异步请求关注rqid是否对应
final CountDownLatch countDownLatch = new CountDownLatch(NUM); final CountDownLatch countDownLatch = new CountDownLatch(NUM);
for (int i = 0; i < NUM; i++) { for (int i = 0; i < NUM; i++) {
final int finalI = i;
client.invokeAsync(responseFuture -> { client.invokeAsync(responseFuture -> {
countDownLatch.countDown(); countDownLatch.countDown();
System.out.println(finalI + " resultProto: " + responseFuture.getSdkProto()); System.out.println(responseFuture.getSdkProto());
}); });
} }
countDownLatch.await(10, TimeUnit.SECONDS); countDownLatch.await(10, TimeUnit.SECONDS);
@ -51,7 +51,7 @@ public class DidSdkTest {
} }
@Test @Test
public void testInvoke() throws Exception { public void testInvoke() {
System.out.println(client.invoke()); System.out.println(client.invoke());
client.setTimeoutMillis(3000); client.setTimeoutMillis(3000);

View File

@ -37,7 +37,6 @@ public class HttpServer extends BaseServer {
return new HttpServerBuilder(); return new HttpServerBuilder();
} }
@Override @Override
public void shutdown() { public void shutdown() {
logger.info("HttpServer shutdowning......"); logger.info("HttpServer shutdowning......");
@ -76,10 +75,6 @@ public class HttpServer extends BaseServer {
private HttpServerBuilder() { private HttpServerBuilder() {
} }
public static HttpServerBuilder aHttpServer() {
return new HttpServerBuilder();
}
public HttpServerBuilder snowFlake(SnowFlake snowFlake) { public HttpServerBuilder snowFlake(SnowFlake snowFlake) {
this.snowFlake = snowFlake; this.snowFlake = snowFlake;
return this; return this;

View File

@ -2,6 +2,8 @@ package io.github.ehlxr.did.server.http;
import io.github.ehlxr.did.common.Constants; import io.github.ehlxr.did.common.Constants;
import io.github.ehlxr.did.common.NettyUtil; import io.github.ehlxr.did.common.NettyUtil;
import io.github.ehlxr.did.common.Result;
import io.github.ehlxr.did.common.SdkProto;
import io.github.ehlxr.did.core.SnowFlake; import io.github.ehlxr.did.core.SnowFlake;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -32,22 +34,45 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
Result<?> result;
HttpResponseStatus status;
if (!"/did".equals(request.uri())) {
result = Result.fail(HttpResponseStatus.NOT_FOUND.code(), HttpResponseStatus.NOT_FOUND.reasonPhrase());
response.setStatus(HttpResponseStatus.NOT_FOUND)
.content().writeBytes(result.toString().getBytes());
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
return;
}
if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) { if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) {
try { try {
long id = snowFlake.nextId(); long id = snowFlake.nextId();
logger.info("HttpServerHandler id is: {}", id); logger.info("HttpServerHandler id is: {}", id);
response.content().writeBytes(("" + id).getBytes());
status = HttpResponseStatus.OK;
result = Result.success(SdkProto.newBuilder().did(id).build());
} catch (Exception e) { } catch (Exception e) {
semaphore.release(); semaphore.release();
logger.error("HttpServerHandler error", e); logger.error("HttpServerHandler error", e);
status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
result = Result.fail(status.code(), status.reasonPhrase());
} }
} else { } else {
String info = String.format("HttpServerHandler tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d availablePermit: %d", String info = String.format("HttpServerHandler tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d availablePermit: %d",
Constants.ACQUIRE_TIMEOUTMILLIS, this.semaphore.getQueueLength(), this.semaphore.availablePermits()); Constants.ACQUIRE_TIMEOUTMILLIS, this.semaphore.getQueueLength(), this.semaphore.availablePermits());
logger.warn(info); logger.warn(info);
throw new Exception(info);
status = HttpResponseStatus.SERVICE_UNAVAILABLE;
result = Result.fail(status.code(), info);
} }
response.setStatus(status)
.content().writeBytes(result.toString().getBytes());
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
} }

View File

@ -34,22 +34,18 @@ public class SdkServerHandler extends SimpleChannelInboundHandler<SdkProto> {
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) throws Exception {
if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) { if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) {
try { sdkProto.setDid(snowFlake.nextId());
sdkProto.setDid(snowFlake.nextId());
ctx.channel().writeAndFlush(sdkProto).addListener((ChannelFutureListener) channelFuture -> semaphore.release()); semaphore.release();
} catch (Exception e) {
semaphore.release();
logger.error("SdkServerhandler error", e);
}
} else { } else {
sdkProto.setDid(-1);
ctx.channel().writeAndFlush(sdkProto);
String info = String.format("SdkServerHandler tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d availablePermit: %d", String info = String.format("SdkServerHandler tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d availablePermit: %d",
Constants.ACQUIRE_TIMEOUTMILLIS, this.semaphore.getQueueLength(), this.semaphore.availablePermits()); Constants.ACQUIRE_TIMEOUTMILLIS, this.semaphore.getQueueLength(), this.semaphore.availablePermits());
logger.warn(info); logger.error(info);
throw new Exception(info);
} }
ctx.channel().
writeAndFlush(sdkProto).
addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} }
@Override @Override

12
pom.xml
View File

@ -27,6 +27,7 @@
<netty.version>4.1.58.Final</netty.version> <netty.version>4.1.58.Final</netty.version>
<logback.version>1.1.7</logback.version> <logback.version>1.1.7</logback.version>
<junit.version>4.13.1</junit.version> <junit.version>4.13.1</junit.version>
<jackson.version>2.10.3</jackson.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
@ -52,6 +53,17 @@
<artifactId>did-common</artifactId> <artifactId>did-common</artifactId>
<version>${did.version}</version> <version>${did.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>