hystrix可以将同一个命令的多次执行合并到一起执行。

public class HelloWorldCommandCollapser extends HystrixCollapser<List<String>,String,String> {
private String name;
public HelloWorldCommandCollapser(String name){
this.name = name;
}
@Override
public String getRequestArgument() {
return name;
}
@Override
protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, String>> collapsedRequests) {
return new BatchHystrixCommand(collapsedRequests);
}
@Override
protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, String>> collapsedRequests) {
int i =0;
for(CollapsedRequest collapsedRequest:collapsedRequests){
collapsedRequest.setResponse(batchResponse.get(i));
i++;
}
}
private class BatchHystrixCommand extends HystrixCommand{
private Collection<CollapsedRequest<String, String>> collapsedRequests;
public BatchHystrixCommand(Collection<CollapsedRequest<String, String>> collapsedRequests){
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.collapsedRequests =collapsedRequests;
}
@Override
protected Object run() throws Exception {
List<String> result = new ArrayList<String>();
for(CollapsedRequest collapsedRequest:collapsedRequests){
result.add("helloworld:"+collapsedRequest.getArgument());
}
return result;
}
}

  方法调用

HystrixRequestContext context = HystrixRequestContext.initializeContext();
try{
String result1 = new HelloWorldCommandCollapser("one").execute();
String result2 = new HelloWorldCommandCollapser("two").execute();
String result3 = new HelloWorldCommandCollapser("three").execute();
String result4 = new HelloWorldCommandCollapser("four").execute();
String result5 = new HelloWorldCommandCollapser("five").execute();
String result6 = new HelloWorldCommandCollapser("six").execute();
System.out.println(result1);
System.out.println(result2);
System.out.println(result3);
System.out.println(result4);
System.out.println(result5);
System.out.println(result6);
}finally {
context.shutdown();
}

  继承HystrixCollapser的命令,命令将会被集合到一起,当数量或时间到达设定的触发点时,统一执行。

  getRequestArgument 获取请求参数,命令执行时,实际是将该方法的参数设置到批量执行对象中。

  createCommand 批量执行对象通过该方法获得实际执行批量的命令,并返回结果。

  mapResponseToRequests 批量执行对象获得执行结果后,将结果与请求进行匹配。

  本质原理如下:

  当执行继承HystrixCollapser方法时,命令不会被实际执行,会获取getRequestArgument获得执行参数,添加到批量执行的对象中去。

public Observable<ResponseType> toObservable(Scheduler observeOn) {
return Observable.defer(new Func0<Observable<ResponseType>>() {
@Override
public Observable<ResponseType> call() {
...
RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
...
return response;
}
});
}

  RequestCollapser是批量执行的对象,它有两种作用域,一个是全局范围,一个是一个请求范围内。全局范围通过今天变量实现,一个请求范围通过HystrixRequestVariableHolder实现。  

  当向RequestCollapser添加参数时,当参数到达一定数量时,就会执行批量。

public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
...
while (true) {
final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get();
...final Observable<ResponseType> response;
if (arg != null) {
response = b.offer(arg);
} else {
response = b.offer( (RequestArgumentType) NULL_SENTINEL);
}
//如果到达一定数量,respose返回null
if (response != null) {
return response;
} else {
//执行批量
createNewBatchAndExecutePreviousIfNeeded(b);
}
}
}

  RequestCollapser内部有一个定时器,每个一定时间就会批量执行并返回结果。  

private class CollapsedTask implements TimerListener {
final Callable<Void> callableWithContextOfParent;
CollapsedTask() {
callableWithContextOfParent = new HystrixContextCallable<Void>(concurrencyStrategy, new Callable<Void>() {
@Override
public Void call() throws Exception {
...
            RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> currentBatch = batch.get();
            if (currentBatch != null && currentBatch.getSize() > 0) {
              createNewBatchAndExecutePreviousIfNeeded(currentBatch);
            }
            ...
} });
}
@Override
public void tick() {
...
        callableWithContextOfParent.call();
       ...
}
@Override
public int getIntervalTimeInMilliseconds() {
return properties.timerDelayInMilliseconds().get();
}
}

  批量执行

public void executeBatchIfNotAlreadyStarted() {
...
try {
Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
try {
Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);//获取批量执行结果
              //批量执行结果映射到执行请求中
commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {
               ...
}).doOnCompleted(new Action0() {
               ...
}).subscribe(); } catch (Exception e) {
...
}
} } catch (Exception e) {
...
} finally {
batchLock.writeLock().unlock();
}
}
}

最新文章

  1. canvas :曲线的面积图 加渐变效果
  2. Java Gson 简要笔记
  3. 移动web
  4. 在IBM Bluemix上部署Hyperledger应用
  5. zencart分类页产品页去掉url中的id号
  6. 动作之CCActionInstant(立即动作)家族
  7. .net后台 Silverlight 页面 动态设置 ASPX 页面 控件的Margin值(位置设置)
  8. Asp.net 获取图片列表并打包下载
  9. jQuery使用小结
  10. pycharm .sqlite文件拖动到Database里面为空
  11. gdb调试用法
  12. vue中created、mounted、 computed,watch,method 等方法整理
  13. BZOJ4409 [Usaco2016 Feb]Circular barn 动态规划 斜率优化
  14. MySQL5.7使用错误解决:ERROR 1045 (28000): Access denied for user &#39;root&#39;@&#39;localhost&#39; (using password: NO)【取消或重设root密码】
  15. jQuery移除或禁用html元素点击事件常用方法小结
  16. [potatos][flex][TBC] 语义分析词法分析 flex
  17. 轻松快速实现MySql数据向SQLServer数据转移
  18. VS编程,WPF中两个滚动条 ScrollViewer 同步滚动的一种方法
  19. 【BZOJ5102】[POI2018]Prawnicy 堆
  20. C# 简单生成双色球代码

热门文章

  1. Python实现电脑控制,这个库让你可以控制和监控输入设备
  2. Docker 的前世今生
  3. 第2章 Hive安装
  4. Devops与敏捷二者能否结合?
  5. Java数据结构——循环队列
  6. jQuery源码分析系列(一)初识jQuery
  7. Java中解析wav音频文件信息:音频声道数,采样频率,采样位数、声音尺寸
  8. 【Gin-API系列】Gin中间件之异常处理(六)
  9. stf-多设备管理平台搭建
  10. Centos7.6系统下docker的安装