HystrixCommand在执行的过程中如何探测超时,本篇主要对此进行介绍说明。

1.主入口:executeCommandAndObserve

#com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
···省略部分代码···
Observable<R> execution; //判断是否开启超时监测
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
} return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}

executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

可以简单的认为lift 里面的对前面的Observable包含,类似装饰者,后面的parent就是指上层的Observable。其中 HystrixObservableTimeoutOperator 就是关键的部分。

2.关键点: HystrixObservableTimeoutOperator

先看下HystrixObservableTimeoutOperator.call(),TimerListener的实现

TimerListener listener = new TimerListener() {

                @Override
public void tick() { if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// 标记事件,可以认为是开的hook,这里暂忽略
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); //取消原Obserable的订阅
s.unsubscribe(); final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
timeoutRunnable.run();
}
} //获取配置的超时时间配置
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};

这段代码的意思就是,给当前command的超时状态置为超时,如果设置成功就抛出HystrixTimeoutException异常,紧接着被command的 doOnErron接收走 fallback逻辑

fallback
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); ................................. final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
//此处catch到超时异常
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
} return handleFailureViaFallback(e);
}
}
}; ................................. return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}

同时s.unsubscribe()通知正在执行的线程,终止任务。如何终止呢?

executeCommandWithSpecifiedIsolation.subscribeOn()

subscribeOne的参数就是HystrixContextScheduler, Rxjava里 scheduler具体干活的是 worker,我们先看下Hystrix自定义scheduler的结构示意图

那么我们直奔主题,直接看 ThreadPoolWorker

//ThreadPoolWorker.schedule
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
return Subscriptions.unsubscribed();
} ScheduledAction sa = new ScheduledAction(action); subscription.add(sa);
sa.addParent(subscription); ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor)); return sa;
}

1.开始的时候判断observable是否被订阅
2.被订阅后,将任务 submit到线程池
3.FutureCompleterWithConfigurableInterrupt scheduler在执行的时候,增加了observable的中断探测

private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final FutureTask<?> f;
private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor; private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
this.executor = executor;
} @Override
public void unsubscribe() {
executor.remove(f);
if (shouldInterruptThread.call()) {
f.cancel(true);
} else {
f.cancel(false);
}
} .....省略代码.......
}

当observable 取消订阅时,就会把当前任务移除,并中断任务

到这里只是讲说了超时后的处理,如何认定执行超时呢?

3.匠心之巧

这里有个很巧妙的设计,再探HystrixObservableTimeoutOperator

final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

#com.netflix.hystrix.util.HystrixTimer#addTimerListener
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
startThreadIfNeeded();
// add the listener Runnable r = new Runnable() { @Override
public void run() {
try {
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
}; ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}

利用了ScheduledThreadPoolExecutor,延迟执行,延迟时间就是我们设定的超时时间,我们再看下

#HystrixObservableTimeoutOperator
Subscriber<R> parent = new Subscriber<R>() { @Override
public void onCompleted() {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onCompleted();
}
} @Override
public void onError(Throwable e) {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onError(e);
}
} ..... ..... ..... ..... ..... ..... ..... ..... ..... private boolean isNotTimedOut() {
// if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
} };

这里parent就是指上层的obserable,这里可以抽象的认为是我们的HystrixCommand执行线程, 当command执行线程执行完成的时候或异常的时候,会执行 tl.clear(), 也就是Future.cancel()会中断 TimerListener 的ScheduledFuture 线程,迫使超时机制失效。

// tl.clear()
private static class TimerReference extends SoftReference<TimerListener> {
private final ScheduledFuture<?> f;
.... .... .... .... ....
@Override
public void clear() {
super.clear();
// stop this ScheduledFuture from any further executions
f.cancel(false);
}
}

4.回归文字

HystrixCommand里有个 TimedOutStatus 超时状态

现在可以认为有两个线程,一个是hystrixCommand任务执行线程,一个是等着给hystrixCommand判定超时的线程,现在两个线程看谁能先把hystrixCommand的状态置换,只要任何一个线程对hystrixCommand打上标就意味着超时判定结束。

系列文章推荐

作者:青芒v5
链接:https://www.jianshu.com/p/60074fe1bd86
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

最新文章

  1. 如何获取hibernate代理类代理的实际对象实例?
  2. 数据采集器移动手持打印POS终端(PDA)商超抄单方案-PDA抄单无线开单+进销存软件
  3. [CentOS] 打造vim环境
  4. jQuery ui datepicker 日历转中文
  5. boost总结之variant
  6. 使用javah生成.h文件, 出现无法访问android.app,Activity的错误的解决
  7. 正确合理的建立MYSQL数据库索引
  8. C# WinForm 拖动无边框窗体 改变无边框窗体尺寸
  9. Spring 通过来AOP 实现前置,环绕,异常通知,注解(转)
  10. 微信JS分享功能--微信JS系列文章(二)
  11. tomcat The specified JRE installation does not exist
  12. UVALive - 3027 Corporative Network (并查集)
  13. python 简单实现淘宝关键字商品爬取
  14. 以编程方式使用 Microsoft Office Visio 2003 ActiveX 控件
  15. 永久注册Oracle工具PL/SQL
  16. AI 偏微分方程
  17. Kaggle(1):数据挖掘的基本流程
  18. bzoj4020: 未来程序&#183;改
  19. Java 范例 - 线程
  20. paxos协议

热门文章

  1. English--音标拼读
  2. 【转载】Gradle学习 第七章:Java快速入门
  3. Android调用系统相机和相册并解决data为空,OOM,图片角度不对的问题
  4. Elasticsearch高版本安装head插件
  5. nbu虚拟机恢复样例(之后补图)
  6. 【JavaScript】BOM对象——Window对象&amp;History对象&amp;Location 对象
  7. 搭建代理服务器时的笔记,request使用笔记
  8. 开始Golang之旅了
  9. 2019 Nowcoder Multi-University Training Contest 4 E Explorer
  10. windows10下Docker开启nginx服务访问页面没有反应