摘自Apache Flink官网

最早的streaming 架构是storm的lambda架构

分为三个layer

  • batch layer
  • serving layer
  • speed layer

一、在streaming中Flink支持的通知时间

Flink官网写了个了解streaming和各种时间的博客

https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101#F2

1、Processing time:执行时候的机器系统时间。

  • 如果使用时间窗口的话,如果一个应用在9:15开始,那么第一次的结束时间在10:00. 然后是10:00~11:00, 之后都是整点。就第一个点比较特殊

2、Event time:每一个Event在其设备上产生的时间,是在进入Flink之前的时间。

  • 可以从data里面提取出来
  • Event time的程序必须声明怎么产生Event Time Watermarks。
  • Event time处理会发生延时,因为有可能有的Event没有到达
  • 如果所有的events都到达了,那么event time operations会按照预期的执行

3、Ingestion time:events进入Flink的时间

  • 在source算子,每一个记录得到当前算子的时间,基于时间的操作根据这个时间。
  • 记录时间有点开销,因为是在source上,但是非常可靠。因为如果是processing time的话,有可能机器的local time不一样
  • Ingestion time和event time不一样,这个不能处理过期时间

4、watermark:在Flink中Event time程序衡量执行的是watermarks

  • watermark携带了时间戳
  • watermark在source function之后产生
  • 每一个并行的子任务独立的产生watermarks
  • 可以设置迟到时间,来容忍迟到的watermak

注册watermark的代码:

 public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<OrderRecord> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds private long currentMaxTimestamp; @Override
public long extractTimestamp(OrderRecord record, long previousElementTimestamp) { // 将数据中的时间戳字段(long 类型,精确到毫秒)赋给 timestamp 变量,此处是 OrderRecord 的 timestamp 字段
long timestamp = record.timestamp;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
} @Override
public Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}

5、Late Elements:迟到元素。即使在watermark(k)已经产生了之后,仍然有迟到元素

  • 设置很长的延迟时间不太实际
  • 默认上Late Elements是drop掉的
  • Flink支持allowedLateness,在被drop前可以容忍的最大延迟时间
  • 如果设置了allowedLateness,当迟到元素到达的时候,会再计算一遍窗口
  • 也可以设置side output将废弃的数据当成side output

6、idling sources: 在一段时间内,watermark没有到来,窗口内的元素就不执行,这就是idling sources

二、生成TimeStamps / Watermarks

1、指派timestamps

这部分通常在实例中的一些filed内进行accessing/extracting the timestamp。

2、生成timestamps 和 watermark的方法

  • Directly in the data source.
  • 通过watermark 和 timestamp generator

3、在source下生成timestamps和watermark

  • 需要使用collectWithTimestamp方法在SourceContext下面
  • watermark需要使用emitWatermark

如果使用了generator那么source生成的watermark和timestamp会被复写

Java Code:

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo); DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()); withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);

三、预定义的TimeStamp Extractor和Watermark Emmiter

1、最简单的watermark generator

如果并行数据是升序的,那么最简单的方法是使用 AscendingTimestampExtractor。即便是kafka消息源,如果每个partition的消息是升序的,那么在shuffle阶段,会把每个partition的watermark正确的进行shuffle。

 DataStream<MyEvent> stream = ...

 DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() { @Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});

2、允许延迟的watermark

可以设定固定的延时时间,延迟=迟到时间戳   -  上一个元素的watermark。如果延迟 > lateness,会被忽略。

 DataStream<MyEvent> stream = ...

 DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) { @Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});

最新文章

  1. 企业管理软件ERP演变之一
  2. osgi学习
  3. ArcMap打开越来越慢
  4. Calibrating delay loop... 问题以及解决方法(RealARM开发板)
  5. rust尝鲜
  6. Dictionary Size
  7. mysql优化 - mysql 的 hint
  8. PHP使用自定义key实现对数据加密解密
  9. Django + Axios &amp; Ajax post和get 传参
  10. 基于Mybatis实现一个查库的接口
  11. Selenium操作示例——鼠标悬停显示二级菜单,再点击二级菜单或下拉列表
  12. java实现最小堆
  13. Python——eventlet.websocket
  14. 使用 webpack 优化资源
  15. 【Apache】的运营之道
  16. Markdown的写法
  17. Mac OSX 正确地同时安装Python 2.7 和Python3
  18. CentOS7 yum方式安装 MongoDB 3.4 复制集
  19. 初识Qt鼠标、键盘事件及定时器和随机数
  20. Php与Erlang的Socket通信

热门文章

  1. 【Python 11】汇率兑换4.0(函数)
  2. vue 首页问题
  3. day15-面向对象基础(二)
  4. p1305 新二叉树
  5. html基础和CSS选择器
  6. 函数rand,randn,randi
  7. odoo12.0 在Ubutu 18.04下环境的搭建
  8. Neutron:ML2 Core Plugin
  9. centos 6.8 nginx+mysql+php
  10. OpenStack-Keystone(2)