update at 2021-01-18 16:17:14 by ehlxr
This commit is contained in:
parent
55dd9517b1
commit
1ff60d6808
@ -6,7 +6,7 @@ import java.util.Map;
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class Constants {
|
||||
private static Map<String, String> SYS_ENV = System.getenv();
|
||||
private static final Map<String, String> SYS_ENV = System.getenv();
|
||||
|
||||
public static String getEnv(String key) {
|
||||
return SYS_ENV.get(key) == null ? "" : SYS_ENV.get(key);
|
||||
|
@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class SdkProto {
|
||||
private static AtomicInteger requestId = new AtomicInteger(0);
|
||||
private static final AtomicInteger requestId = new AtomicInteger(0);
|
||||
|
||||
/**
|
||||
* 请求的ID
|
||||
|
@ -20,12 +20,12 @@ public class ServerStarter {
|
||||
long machineId = Constants.MACHINES_ID;
|
||||
|
||||
if (args != null && args.length == 2) {
|
||||
datacenterId = Long.valueOf(args[0]);
|
||||
machineId = Long.valueOf(args[1]);
|
||||
datacenterId = Long.parseLong(args[0]);
|
||||
machineId = Long.parseLong(args[1]);
|
||||
}
|
||||
|
||||
datacenterId = "".equals(Constants.getEnv("DATACENTER_ID")) ? datacenterId : Long.valueOf(Constants.getEnv("DATACENTER_ID"));
|
||||
machineId = "".equals(Constants.getEnv("MACHINES_ID")) ? machineId : Long.valueOf(Constants.getEnv("MACHINES_ID"));
|
||||
datacenterId = "".equals(Constants.getEnv("DATACENTER_ID")) ? datacenterId : Long.parseLong(Constants.getEnv("DATACENTER_ID"));
|
||||
machineId = "".equals(Constants.getEnv("MACHINES_ID")) ? machineId : Long.parseLong(Constants.getEnv("MACHINES_ID"));
|
||||
logger.info("SnowFlake datacenterId is: {}, machineId is: {}", datacenterId, machineId);
|
||||
|
||||
final SnowFlake snowFlake = new SnowFlake(datacenterId, machineId);
|
||||
|
@ -58,11 +58,11 @@ public class SnowFlake {
|
||||
/**
|
||||
* 数据中心
|
||||
*/
|
||||
private long datacenterId;
|
||||
private final long datacenterId;
|
||||
/**
|
||||
* 机器标识
|
||||
*/
|
||||
private long machineId;
|
||||
private final long machineId;
|
||||
/**
|
||||
* 序列号
|
||||
*/
|
||||
|
@ -25,7 +25,7 @@ public abstract class BaseServer implements Server {
|
||||
|
||||
public void init() {
|
||||
defLoopGroup = new DefaultEventLoopGroup(8, new ThreadFactory() {
|
||||
private AtomicInteger index = new AtomicInteger(0);
|
||||
private final AtomicInteger index = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
@ -33,7 +33,7 @@ public abstract class BaseServer implements Server {
|
||||
}
|
||||
});
|
||||
bossGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
|
||||
private AtomicInteger index = new AtomicInteger(0);
|
||||
private final AtomicInteger index = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
@ -41,7 +41,7 @@ public abstract class BaseServer implements Server {
|
||||
}
|
||||
});
|
||||
workGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactory() {
|
||||
private AtomicInteger index = new AtomicInteger(0);
|
||||
private final AtomicInteger index = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
|
@ -21,11 +21,11 @@ import java.net.InetSocketAddress;
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class HttpServer extends BaseServer {
|
||||
private SnowFlake snowFlake;
|
||||
private final SnowFlake snowFlake;
|
||||
|
||||
public HttpServer(SnowFlake snowFlake) {
|
||||
this.snowFlake = snowFlake;
|
||||
this.port = "".equals(Constants.getEnv("HTTP_PORT")) ? Constants.HTTP_PORT : Integer.valueOf(Constants.getEnv("HTTP_PORT"));
|
||||
this.port = "".equals(Constants.getEnv("HTTP_PORT")) ? Constants.HTTP_PORT : Integer.parseInt(Constants.getEnv("HTTP_PORT"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -27,8 +27,8 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
|
||||
/**
|
||||
* 通过信号量来控制流量
|
||||
*/
|
||||
private Semaphore semaphore = new Semaphore(Constants.HANDLE_HTTP_TPS);
|
||||
private SnowFlake snowFlake;
|
||||
private final Semaphore semaphore = new Semaphore(Constants.HANDLE_HTTP_TPS);
|
||||
private final SnowFlake snowFlake;
|
||||
|
||||
public HttpServerHandler(SnowFlake snowFlake) {
|
||||
this.snowFlake = snowFlake;
|
||||
|
@ -14,7 +14,7 @@ import java.net.InetSocketAddress;
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class SdkServer extends BaseServer {
|
||||
private SnowFlake snowFlake;
|
||||
private final SnowFlake snowFlake;
|
||||
|
||||
public SdkServer(SnowFlake snowFlake) {
|
||||
this.snowFlake = snowFlake;
|
||||
|
@ -15,44 +15,44 @@ import java.util.concurrent.TimeUnit;
|
||||
*
|
||||
* @author ehlxr
|
||||
*/
|
||||
public class SdkServerHandler extends SimpleChannelInboundHandler {
|
||||
public class SdkServerHandler extends SimpleChannelInboundHandler<SdkProto> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(SdkServerHandler.class);
|
||||
/**
|
||||
* 通过信号量来控制流量
|
||||
*/
|
||||
private Semaphore semaphore = new Semaphore(Constants.HANDLE_SDKS_TPS);
|
||||
private SnowFlake snowFlake;
|
||||
private final Semaphore semaphore = new Semaphore(Constants.HANDLE_SDKS_TPS);
|
||||
private final SnowFlake snowFlake;
|
||||
|
||||
SdkServerHandler(SnowFlake snowFlake) {
|
||||
this.snowFlake = snowFlake;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
if (msg instanceof SdkProto) {
|
||||
SdkProto sdkProto = (SdkProto) msg;
|
||||
if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) {
|
||||
try {
|
||||
sdkProto.setDid(snowFlake.nextId());
|
||||
ctx.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) {
|
||||
semaphore.release();
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
semaphore.release();
|
||||
logger.error("SdkServerhandler error", e);
|
||||
}
|
||||
} else {
|
||||
sdkProto.setDid(-1);
|
||||
ctx.channel().writeAndFlush(sdkProto);
|
||||
String info = String.format("SdkServerHandler tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d availablePermit: %d",
|
||||
Constants.ACQUIRE_TIMEOUTMILLIS, this.semaphore.getQueueLength(), this.semaphore.availablePermits());
|
||||
logger.warn(info);
|
||||
throw new Exception(info);
|
||||
protected void channelRead0(ChannelHandlerContext ctx, SdkProto sdkProto) throws Exception {
|
||||
// if (msg instanceof SdkProto) {
|
||||
// SdkProto sdkProto = (SdkProto) msg;
|
||||
if (semaphore.tryAcquire(Constants.ACQUIRE_TIMEOUTMILLIS, TimeUnit.MILLISECONDS)) {
|
||||
try {
|
||||
sdkProto.setDid(snowFlake.nextId());
|
||||
ctx.channel().writeAndFlush(sdkProto).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) {
|
||||
semaphore.release();
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
semaphore.release();
|
||||
logger.error("SdkServerhandler error", e);
|
||||
}
|
||||
} else {
|
||||
sdkProto.setDid(-1);
|
||||
ctx.channel().writeAndFlush(sdkProto);
|
||||
String info = String.format("SdkServerHandler tryAcquire semaphore timeout, %dms, waiting thread " + "nums: %d availablePermit: %d",
|
||||
Constants.ACQUIRE_TIMEOUTMILLIS, this.semaphore.getQueueLength(), this.semaphore.availablePermits());
|
||||
logger.warn(info);
|
||||
throw new Exception(info);
|
||||
}
|
||||
// }
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user