滑动窗口技术是Sentinel比较关键的核心技术,主要用于数据统计

通过分析StatisticSlot来慢慢引出这个概念
 @Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args); // Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
......
}
......
从代码中可以看出,在其他Slot通过后,会调用node进行计数,我们来看node.addPassRequest(count);, 由于我们使用的是FlowQpsDemo
    @Override
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
直接看this.clusterNode.addPassRequest(count); 因为clusterNode是default模式下主要用得统计数据node,而它继承于StatisticsNode,于是调用的是
//StatisticsNode.java
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); @Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
两个成员类似,我们看其中之一rollingCounterInMinute.addPass,最终会调用
// ArrayMetric.java
this.data = new BucketLeapArray(sampleCount, intervalInMs); @Override
public void addPass(int count) {
//核心方法
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
这里的data 是一个非常核心的成员,主要用于判断将数据放到那个窗口(桶)中,wrap.value().addPass(count)负责将数据写入
这个 BucketLeapArray就实现了滑动窗口
当调用data.currentWindow()寻找当前该写入的windows时,最终会调用以下方法,这个是非常关键的方法
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
//1. 根据当前的诗句计算数组的Index
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
//2. 由于是相当于是环形数组,需要计算一下窗口开始时间,用于复用窗口时覆盖用
long windowStart = calculateWindowStart(timeMillis); /*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
//如果进到这里,说明刚启动不久,还有桶还没创建,新建一个
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
//进到这里来,说明还在同一个桶的时间区间内(start相同),直接返回
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
// 进到这里来,说明已经在数据中转了一圈,复用的是旧的窗口,需要重置下旧窗口
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
我们的demo是创建了一个有60个桶的BucketLeapArray,就这样一直顺序向后循环使用。

定义桶结构的类为MetricBucket

public class MetricBucket {
private final LongAdder[] counters; ...
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
...
} public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
} }

可以看出Bucket就是用线程安全类型LongAdder来进行技术逻辑(比AtomicLong性能好一些)

最新文章

  1. C语言内存分配方法。
  2. 关于Hadoop的集群环境下虚拟机采用NAT方式连不上网的解决
  3. height和line-height有什么区别?
  4. System.DateUtils 3. IsPM、IsAM 判断是否为上、下午
  5. 【2016.3.30项目技术记录】]VS2010自动生成MFC单文档框架程序的修改:去除属性框,在CViewTree类中添加鼠标单击响应
  6. winfrom 导入Excel表到access数据库(来自小抽奖系统)
  7. tomcat服务
  8. MapReduce扩展:应用程序如何运行于Hadoop Yarn之上
  9. opnet的sink模块学习 分类: opnet 2014-05-18 10:28 161人阅读 评论(0) 收藏
  10. c++-STL:删除子串
  11. PDO连接mysql数据库加载慢
  12. 解决访问swaggerUI接口文档显示basic-error-controler问题
  13. MATLAB 统计不同区间中元素的个数
  14. 转载 javascript中(function($){...})(jQuery)写法是什么意思
  15. 《软件测试自动化之道》读书笔记 之 SQL 存储过程测试
  16. centos7分区建议
  17. Mysql逻辑分层、存储引擎
  18. sql 经典面试题及答案(选课表)
  19. 《区块链100问》第13集:比特币和Q币有哪些不同?
  20. Wireshark数据抓包教程之Wireshark的基础知识

热门文章

  1. Spring IOC 为什么能降低耦合
  2. mysql备份数据库linux
  3. Java TIF、JPG、PNG等图片转换
  4. Tapdata 在线研讨会:DaaS vs 大数据平台,是竞争还是共处?
  5. JDK的下载与安装和环境变量的配置
  6. python虚拟环境(python+conda)
  7. 华为云Stack南向开放框架,帮助生态伙伴高效入云
  8. 4 zookeeper集群和基本命令
  9. 云表平台VS永中office,谁更胜一筹?
  10. hive SQL 初学者题目,实战题目 字符串函数,日期拼接,开窗函数。。。。