Java-Interview-Advanced/docs/high-availability/hystrix-thread-pool-isolati...

115 lines
5.6 KiB
Markdown
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

## 基于 Hystrix 线程池技术实现资源隔离
上一讲提到,如果从 Nginx 开始缓存都失效了Nginx 会直接通过缓存服务调用商品服务获取最新商品数据(我们基于电商项目做个讨论),有可能出现调用延时而把缓存服务资源耗尽的情况。这里,我们就来说说,怎么通过 Hystrix 线程池技术实现资源隔离。
资源隔离,就是说,你如果要把对某一个依赖服务的所有调用请求,全部隔离在同一份资源池内,不会去用其它资源了,这就叫资源隔离。哪怕对这个依赖服务,比如说商品服务,现在同时发起的调用量已经到了 1000但是线程池内就 10 个线程,最多就只会用这 10 个线程去执行,不会说,对商品服务的请求,因为接口调用延时,将 tomcat 内部所有的线程资源全部耗尽。
Hystrix 进行资源隔离,其实是提供了一个抽象,叫做 command。这也是 Hystrix 最最基本的资源隔离技术。
### 利用 HystrixCommand 获取单条数据
我们通过将调用商品服务的操作封装在 HystrixCommand 中,限定一个 key比如下面的 `GetProductInfoCommandGroup`,在这里我们可以简单认为这是一个线程池,每次调用商品服务,就只会用该线程池中的资源,不会再去用其它线程资源了。
```java
public class GetProductInfoCommand extends HystrixCommand<ProductInfo> {
private Long productId;
public GetProductInfoCommand(Long productId) {
super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoCommandGroup"));
this.productId = productId;
}
@Override
protected ProductInfo run() {
String url = "http://localhost:8081/getProductInfo?productId=" + productId;
// 调用商品服务接口
String response = HttpClientUtils.sendGetRequest(url);
return JSONObject.parseObject(response, ProductInfo.class);
}
}
```
我们在缓存服务接口中,根据 productId 创建 command 并执行,获取到商品数据。
```java
@RequestMapping("/getProductInfo")
@ResponseBody
public String getProductInfo(Long productId) {
HystrixCommand<ProductInfo> getProductInfoCommand = new GetProductInfoCommand(productId);
// 通过command执行获取最新商品数据
ProductInfo productInfo = getProductInfoCommand.execute();
System.out.println(productInfo);
return "success";
}
```
上面执行的是 execute() 方法,其实是同步的。也可以对 command 调用 queue() 方法,它仅仅是将 command 放入线程池的一个等待队列,就立即返回,拿到一个 Future 对象,后面可以继续做其它一些事情,然后过一段时间对 Future 调用 get() 方法获取数据。这是异步的。
### 利用 HystrixObservableCommand 批量获取数据
只要是获取商品数据,全部都绑定到同一个线程池里面去,我们通过 HystrixObservableCommand 的一个线程去执行,而在这个线程里面,批量把多个 productId 的 productInfo 拉回来。
```java
public class GetProductInfosCommand extends HystrixObservableCommand<ProductInfo> {
private String[] productIds;
public GetProductInfosCommand(String[] productIds) {
// 还是绑定在同一个线程池
super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoGroup"));
this.productIds = productIds;
}
@Override
protected Observable<ProductInfo> construct() {
return Observable.unsafeCreate((Observable.OnSubscribe<ProductInfo>) subscriber -> {
for (String productId : productIds) {
// 批量获取商品数据
String url = "http://localhost:8081/getProductInfo?productId=" + productId;
String response = HttpClientUtils.sendGetRequest(url);
ProductInfo productInfo = JSONObject.parseObject(response, ProductInfo.class);
subscriber.onNext(productInfo);
}
subscriber.onCompleted();
}).subscribeOn(Schedulers.io());
}
}
```
在缓存服务接口中,根据传来的 id 列表,比如是以 `,` 分隔的 id 串,通过上面的 HystrixObservableCommand执行 Hystrix 的一些 API 方法,获取到所有商品数据。
```java
public String getProductInfos(String productIds) {
String[] productIdArray = productIds.split(",");
HystrixObservableCommand<ProductInfo> getProductInfosCommand = new GetProductInfosCommand(productIdArray);
Observable<ProductInfo> observable = getProductInfosCommand.observe();
observable.subscribe(new Observer<ProductInfo>() {
@Override
public void onCompleted() {
System.out.println("获取完了所有的商品数据");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
/**
* 获取完一条数据,就回调一次这个方法
* @param productInfo
*/
@Override
public void onNext(ProductInfo productInfo) {
System.out.println(productInfo);
}
});
return "success";
}
```
我们回过头来,看看 Hystrix 线程池技术是如何实现资源隔离的。
![hystrix-thread-pool-isolation](/images/hystrix-thread-pool-isolation.png)
从 Nginx 开始,缓存都失效了,那么 Nginx 通过缓存服务去调用商品服务。缓存服务默认的线程大小是 10 个,最多就只有 10 个线程去调用商品服务的接口。即使商品服务接口故障了,最多就只有 10 个线程会 hang 死在调用商品服务接口的路上,缓存服务的 tomcat 内其它的线程还是可以用来调用其它的服务,干其它的事情。