hystrix总结之请求批量执行
2024-10-09 16:52:23
hystrix可以将同一个命令的多次执行合并到一起执行。
public class HelloWorldCommandCollapser extends HystrixCollapser<List<String>,String,String> {
private String name;
public HelloWorldCommandCollapser(String name){
this.name = name;
}
@Override
public String getRequestArgument() {
return name;
}
@Override
protected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, String>> collapsedRequests) {
return new BatchHystrixCommand(collapsedRequests);
}
@Override
protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, String>> collapsedRequests) {
int i =0;
for(CollapsedRequest collapsedRequest:collapsedRequests){
collapsedRequest.setResponse(batchResponse.get(i));
i++;
}
}
private class BatchHystrixCommand extends HystrixCommand{
private Collection<CollapsedRequest<String, String>> collapsedRequests;
public BatchHystrixCommand(Collection<CollapsedRequest<String, String>> collapsedRequests){
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.collapsedRequests =collapsedRequests;
}
@Override
protected Object run() throws Exception {
List<String> result = new ArrayList<String>();
for(CollapsedRequest collapsedRequest:collapsedRequests){
result.add("helloworld:"+collapsedRequest.getArgument());
}
return result;
}
}
方法调用
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try{
String result1 = new HelloWorldCommandCollapser("one").execute();
String result2 = new HelloWorldCommandCollapser("two").execute();
String result3 = new HelloWorldCommandCollapser("three").execute();
String result4 = new HelloWorldCommandCollapser("four").execute();
String result5 = new HelloWorldCommandCollapser("five").execute();
String result6 = new HelloWorldCommandCollapser("six").execute();
System.out.println(result1);
System.out.println(result2);
System.out.println(result3);
System.out.println(result4);
System.out.println(result5);
System.out.println(result6);
}finally {
context.shutdown();
}
继承HystrixCollapser的命令,命令将会被集合到一起,当数量或时间到达设定的触发点时,统一执行。
getRequestArgument 获取请求参数,命令执行时,实际是将该方法的参数设置到批量执行对象中。
createCommand 批量执行对象通过该方法获得实际执行批量的命令,并返回结果。
mapResponseToRequests 批量执行对象获得执行结果后,将结果与请求进行匹配。
本质原理如下:
当执行继承HystrixCollapser方法时,命令不会被实际执行,会获取getRequestArgument获得执行参数,添加到批量执行的对象中去。
public Observable<ResponseType> toObservable(Scheduler observeOn) {
return Observable.defer(new Func0<Observable<ResponseType>>() {
@Override
public Observable<ResponseType> call() {
...
RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
...
return response;
}
});
}
RequestCollapser是批量执行的对象,它有两种作用域,一个是全局范围,一个是一个请求范围内。全局范围通过今天变量实现,一个请求范围通过HystrixRequestVariableHolder实现。
当向RequestCollapser添加参数时,当参数到达一定数量时,就会执行批量。
public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
...
while (true) {
final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get();
...final Observable<ResponseType> response;
if (arg != null) {
response = b.offer(arg);
} else {
response = b.offer( (RequestArgumentType) NULL_SENTINEL);
}
//如果到达一定数量,respose返回null
if (response != null) {
return response;
} else {
//执行批量
createNewBatchAndExecutePreviousIfNeeded(b);
}
}
}
RequestCollapser内部有一个定时器,每个一定时间就会批量执行并返回结果。
private class CollapsedTask implements TimerListener {
final Callable<Void> callableWithContextOfParent;
CollapsedTask() {
callableWithContextOfParent = new HystrixContextCallable<Void>(concurrencyStrategy, new Callable<Void>() {
@Override
public Void call() throws Exception {
...
RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> currentBatch = batch.get();
if (currentBatch != null && currentBatch.getSize() > 0) {
createNewBatchAndExecutePreviousIfNeeded(currentBatch);
}
...
} });
}
@Override
public void tick() {
...
callableWithContextOfParent.call();
...
}
@Override
public int getIntervalTimeInMilliseconds() {
return properties.timerDelayInMilliseconds().get();
}
}
批量执行
public void executeBatchIfNotAlreadyStarted() {
...
try {
Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
try {
Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);//获取批量执行结果
//批量执行结果映射到执行请求中
commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {
...
}).doOnCompleted(new Action0() {
...
}).subscribe(); } catch (Exception e) {
...
}
} } catch (Exception e) {
...
} finally {
batchLock.writeLock().unlock();
}
}
}
最新文章
- canvas :曲线的面积图 加渐变效果
- Java Gson 简要笔记
- 移动web
- 在IBM Bluemix上部署Hyperledger应用
- zencart分类页产品页去掉url中的id号
- 动作之CCActionInstant(立即动作)家族
- .net后台 Silverlight 页面 动态设置 ASPX 页面 控件的Margin值(位置设置)
- Asp.net 获取图片列表并打包下载
- jQuery使用小结
- pycharm .sqlite文件拖动到Database里面为空
- gdb调试用法
- vue中created、mounted、 computed,watch,method 等方法整理
- BZOJ4409 [Usaco2016 Feb]Circular barn 动态规划 斜率优化
- MySQL5.7使用错误解决:ERROR 1045 (28000): Access denied for user &#39;root&#39;@&#39;localhost&#39; (using password: NO)【取消或重设root密码】
- jQuery移除或禁用html元素点击事件常用方法小结
- [potatos][flex][TBC] 语义分析词法分析 flex
- 轻松快速实现MySql数据向SQLServer数据转移
- VS编程,WPF中两个滚动条 ScrollViewer 同步滚动的一种方法
- 【BZOJ5102】[POI2018]Prawnicy 堆
- C# 简单生成双色球代码