直接从Sentinel 源码demo ExceptionRatioCircuitBreakerDemo看起

直接看他的main函数

public static void main(String[] args) throws Exception {
initDegradeRule();
...... final int concurrency = 8;
for (int i = 0; i < concurrency; i++) {
Thread entryThread = new Thread(() -> {
while (true) {
Entry entry = null;
try {
entry = SphU.entry(KEY);
sleep(ThreadLocalRandom.current().nextInt(5, 10));
pass.addAndGet(1); // Error probability is 45%
if (ThreadLocalRandom.current().nextInt(0, 100) > 55) {
// biz code raise an exception.
throw new RuntimeException("oops");
}
} catch (BlockException e) {
block.addAndGet(1);
sleep(ThreadLocalRandom.current().nextInt(5, 10));
} catch (Throwable t) {
bizException.incrementAndGet();
// It's required to record exception here manually.
Tracer.traceEntry(t, entry);
} finally {
total.addAndGet(1);
if (entry != null) {
entry.exit();
}
}
}
});
entryThread.setName("sentinel-simulate-traffic-task-" + i);
entryThread.start();
}
}

从源码中可以看出,比如限流那一套多了一个在catch Throwable的操作 Tracer.traceEntry(t, entry);

看一下其中的源码

    public static void traceEntry(Throwable e, Entry entry) {
if (!shouldTrace(e)) {
return;
}
traceEntryInternal(e, entry);
} private static void traceEntryInternal(/*@NeedToTrace*/ Throwable e, Entry entry) {
if (entry == null) {
return;
} entry.setError(e);
} // Entry.java
public void setError(Throwable error) {
this.error = error;
}

从源码可以看出 这里仅仅是将error赋给entry的成员error。这里是为了后面的finally逻辑做铺垫

异常被触发并执行完catch逻辑后,会执行finally的逻辑,会调用entry.exit();方法。
//CtEntry.java

    protected void exitForContext(Context context, int count, Object... args) throws ErrorEntryFreeException {
if (context != null) {
// Null context should exit without clean-up.
if (context instanceof NullContext) {
return;
} if (context.getCurEntry() != this) {
String curEntryNameInContext = context.getCurEntry() == null ? null
: context.getCurEntry().getResourceWrapper().getName();
// Clean previous call stack.
CtEntry e = (CtEntry) context.getCurEntry();
while (e != null) {
e.exit(count, args);
e = (CtEntry) e.parent;
}
String errorMessage = String.format("The order of entry exit can't be paired with the order of entry"
+ ", current entry in context: <%s>, but expected: <%s>", curEntryNameInContext,
resourceWrapper.getName());
throw new ErrorEntryFreeException(errorMessage);
} else {
// Go through the onExit hook of all slots.
if (chain != null) {
//最终会调用到chain的exit
chain.exit(context, resourceWrapper, count, args);
}
// Go through the existing terminate handlers (associated to this invocation).
callExitHandlersAndCleanUp(context); // Restore the call stack.
context.setCurEntry(parent);
if (parent != null) {
((CtEntry) parent).child = null;
}
if (parent == null) {
// Default context (auto entered) will be exited automatically.
if (ContextUtil.isDefaultContext(context)) {
ContextUtil.exit();
}
}
// Clean the reference of context in current entry to avoid duplicate exit.
clearEntryContext();
}
}
}
最终会执行exitForContext,并调用到chain.exit(context, resourceWrapper, count, args);, 于是会调用到DegradeSlotexit
    @Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
fireExit(context, r, count, args);
return;
} if (curEntry.getBlockError() == null) {
// passed request
for (CircuitBreaker circuitBreaker : circuitBreakers) {
//这里是最关键的一步
circuitBreaker.onRequestComplete(context);
}
} fireExit(context, r, count, args);
}
这里的 circleBreaker 是 ExceptionCircuitBreaker 对象,最终会调用
    @Override
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
// 这里和上面的entry缓存error就关联上了,发现错误并计数
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
counter.getErrorCount().add(1);
}
counter.getTotalCount().add(1);
//核心函数,根据统计,继续状态切换 close -> open (--> open -> half-open)
handleStateChangeWhenThresholdExceeded(error);
}
 private void handleStateChangeWhenThresholdExceeded(Throwable error) {

       ......
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// Use errorRatio
curCount = errCount * 1.0d / totalCount;
}
if (curCount > threshold) {
transformToOpen(curCount);
}
}
通过counter计算出单位统计时间的错误量,判断是否超过锚点,调用transformToOpen -> fromCloseToOpen
    protected boolean fromCloseToOpen(double snapshotValue) {
State prev = State.CLOSED;
if (currentState.compareAndSet(prev, State.OPEN)) {
updateNextRetryTimestamp(); notifyObservers(prev, State.OPEN, snapshotValue);
return true;
}
return false;
}

保证线程安全,通过CAS (compareAndSet) 将状态改为OPEN。

这是本资源对应的熔断器状态变为OPEN,之后流量进来时

//DegradeSlot.java

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
//调用 breaker的tryPass方法
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
//AbstractCircuitBreaker.java
@Override
public boolean tryPass(Context context) {
// Template implementation.
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for probing.
//状态为Open,会进来这里
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}

会先检查,断开时间是否已到,如果到了,会切换到半开状态,然后返回false,不通过并抛出DegradeException

之后的第一个请求进来如果没有抛出异常,则将状态从halfOpen 改为Close,进行放行状态,这个过程在 entry.exit 时调用handleStateChangeWhenThresholdExceeded时完成

最新文章

  1. Win10环境下安装Vmware+Ubuntu14 注意点
  2. Windows Services
  3. jquery的一些用法
  4. java.util.Random深入理解
  5. html字符实体对照表
  6. oracle在imp订单具体解释
  7. javascript的闭包与一次重构的感受
  8. codeforces 757F Team Rocket Rises Again
  9. JAVA 将图片转换为Base64编码
  10. FusionCharts MSBar3D图
  11. [JZOJ5522] 图
  12. Vue项目模板--和--webpack自动化构建工具的---项目打包压缩使用
  13. REST(Representational State Transfer表述性状态转移)
  14. 浅谈servlet与jsp的关系
  15. 通过lua栈了解lua与c的交互
  16. git存储用户名与密码
  17. react native 初识生命周期
  18. OCMock 3 参考
  19. CCCC L2-005. 集合相似度
  20. uni-app - 如何打包

热门文章

  1. 面试突击61:说一下MySQL事务隔离级别?
  2. Java数组的基本操作
  3. Mysql错误:The server time zone value is unrecognized or represents more than one time zone
  4. typescript+webpack构建一个js库
  5. 一文讲明白K8S各核心架构组件
  6. Tapdata Cloud 2.1.5来啦:新增支持Amazon RDS数据库,错误日志查询更便捷,Agent部署细节再优化
  7. 方法的调用和JDK9的JShell简单使用
  8. Solution -「HDU」Professor Ben
  9. python subprocess相关操作
  10. SqlServer获取当前日期的详细写法