1.自定义sink

  在flink中,sink负责最终数据的输出。使用DataStream实例中的addSink方法,传入自定义的sink类

定义一个printSink(),使得其打印显示的是真正的task号(默认的情况是task的id+1)

MyPrintSink

package cn._51doit.flink.day02;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction; public class MyPrintSink<T> extends RichSinkFunction<T> { @Override
public void invoke(T value, Context context) throws Exception { int index = getRuntimeContext().getIndexOfThisSubtask(); System.out.println(index + " > " + value);
}
}

MyPrintSinkDemo

package cn._51doit.flink.day02;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector; public class MyPrintSinkDemo { public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> res = wordAndOne.keyBy(0).sum(1); res.addSink(new MyPrintSink<>()); env.execute();
}
}

2. StreamingSink

 用的比较多,可以将结果输出到本地或者hdfs中去,并且支持exactly once

package cn._51doit.flink.day02;

import akka.remote.WireFormats;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import java.util.concurrent.TimeUnit; public class StreamFileSinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<String> upper = lines.map(String::toUpperCase);
String path = "E:\\flink"; env.enableCheckpointing(10000); StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(path), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
// 滚动生成文件的最长时间
.withRolloverInterval(TimeUnit.SECONDS.toMillis(30))
// 间隔多长时间没写文件,则文件滚动
.withInactivityInterval(TimeUnit.SECONDS.toMillis(10))
// 文件大小超过1m,则滚动
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
upper.addSink(sink);
env.execute(); }
}

3. Time

 

(1)Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,flink通过时间戳分配器访问事件时间戳

(2)Ingestion:数据进入Flink的时间

(3)Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time

4. Window(窗口)

Window可以分成两类:

(1)GlobalWindow(countWindow)按照指定的数据条数生成一个window,与时间无关。

(2)TimeWindow:按照时间生成Window

  对于TimeWindow,可以根据窗口实现原理的不同分为三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

4.1 countWindow/countWindowAll

  countWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果

(1)滚动窗口:默认就是滚动窗口

  • 未分组的情况:使用countWindowAll,输入的总数超过窗口的大小就会触发窗口

package cn._51doit.flink.day02.window;

import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; public class CountWindowAllDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);
// 传入窗口分配器(划分器),传入具体划分窗口规则
AllWindowedStream<Integer, GlobalWindow> window = nums.countWindowAll(3);
SingleOutputStreamOperator<Integer> result = window.sum(0);
result.print();
env.execute();
}
}
  • keyBy分组后,使用countWindow,输入数的每个分组的数超过窗口的大小就会触发窗口

package cn._51doit.flink.day02.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; public class CountWindowDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
// 划分窗口,若是调用了keyBy分组,调用window
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
// 按照key进行分组
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
// 对KeyedStream划分窗口
WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> window = keyed.countWindow(5);
SingleOutputStreamOperator<Tuple2<String, Integer>> sumed = window.sum(1);
sumed.print();
env.execute(); }
}

(2)滑动窗口

  • 未分组的情况 与(1)相似,只是窗口分配的规则发生变化,变化的代码如下
AllWindowedStream<Integer, GlobalWindow> window = nums.countWindowAll(3,2);

运算结果

  • 同理分组的情况

4.2 TimeWindow

  TimeWindow是将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算

4.2.1 Processing Time

(1)滚动窗口

  Flink默认的时间窗口根据Processing Time进行窗口的划分,将Flink获取到的数据进入Flink的时间划分到不同的窗口中

  • 未分组

ProcessingTumblingWindowAllDemo

package cn._51doit.flink.day02.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class ProcessingTumblingWindowAllDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
//如果是划分窗口,未分组,调用window
AllWindowedStream<Tuple2<String, Integer>, TimeWindow> window = wordAndOne.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = window.sum(1);
sum.print();
env.execute();
}
}
wordAndOne.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))

表示processingTime每5秒划分一个窗口

  • 分组

  同理

(2)滑动窗口

  滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size

ProcessingSlidingWindowAllDemo

package cn._51doit.flink.day02.window;

import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class ProcessingSlidingWindowAllDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); //如果是划分窗口,如果没有调用keyBy分组(Non-Keyed Stream),调用windowAll
SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt); //划分滚动窗口
AllWindowedStream<Integer, TimeWindow> window = nums.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10))); SingleOutputStreamOperator<Integer> sum = window.sum(0); sum.print(); env.execute();
}
}

ProcessingSlidingWindowDemo

package cn._51doit.flink.day02.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class ProcessingSlidingWindowDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
}); KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); //如果是划分窗口,如果调用keyBy分组(Keyed Stream),调用window
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed.window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10))); SingleOutputStreamOperator<Tuple2<String, Integer>> sum = window.sum(1);
sum.print();
env.execute();
}
}

(3)会话窗口

  由一系列列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会⽣生成新的窗口。

ProcessingSessionWindowAllDemo

package cn._51doit.flink.day02.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class ProcessingSessionWindowAllDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
// 不分组,调用windowAll
SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);
// 划分滚动窗口
AllWindowedStream<Integer, TimeWindow> window = nums.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
SingleOutputStreamOperator<Integer> sum = window.sum(0);
sum.print();
env.execute();
}
}

此处程序5秒没收到数据,就会触发一个新的窗口

ProcessingSessionWindowDemo

package cn._51doit.flink.day02.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class ProcessingSessionWindowDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
}); KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); //如果是划分窗口,如果调用keyBy分组(Keyed Stream),调用window
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))); SingleOutputStreamOperator<Tuple2<String, Integer>> sum = window.sum(1);
sum.print();
env.execute();
}
}

4.2.2 Event Time

 原理同上,只是划分窗口的时间变成事件产生时的时间。另外,由于Flink默认使用ProcessingTime作为时间标准,所以需要设置EventTime作为时间标准

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准

(1)滚动窗口

EventTimeTumblingWindowAllDemo

package cn._51doit.flink.day02.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date; public class EventTimeTumblingWindowAllDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Flink默认使用ProcessingTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准
//需要将时间转成Timestamp格式
//2020-03-01 00:00:00,1
//2020-03-01 00:00:04,2
//2020-03-01 00:00:05,3
DataStreamSource<String> lines = env.socketTextStream("feng05", 8888);
//提取数据中的EventTime
SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public long extractTimestamp(String element) {
String[] fields = element.split(",");
String dateStr = fields[0];
try {
Date date = sdf.parse(dateStr);
long timestamp = date.getTime();
return timestamp;
} catch (ParseException e) {
throw new RuntimeException("时间转换异常");
}
}
});
dataStreamWithWaterMark.print();
SingleOutputStreamOperator<Integer> nums = dataStreamWithWaterMark.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
String[] fields = value.split(",");
String numStr = fields[1];
return Integer.parseInt(numStr); }
});
nums.print(); //如果是划分窗口,如果没有调用keyBy分组(Non-Keyed Stream),调用windowAll
AllWindowedStream<Integer, TimeWindow> window = nums
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); SingleOutputStreamOperator<Integer> sum = window.sum(0);
sum.print();
env.execute();
}
}

注意点:

EventTimeTumblingWindowDemo

package cn._51doit.flink.day02.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class EventTimeTumblingWindowDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Flink默认使用ProcessingTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准 //需要将时间转成Timestamp格式
//1000,a
//3000,b
//4000,c
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); //提取数据中的EventTime字段,并且转换成Timestamp格式
SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(2)) {
@Override
public long extractTimestamp(String element) {
String[] fields = element.split(",");
return Long.parseLong(fields[0]);
}
}); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataStreamWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] fields = value.split(",");
String word = fields[1];
return Tuple2.of(word, 1);
}
}); KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5))); SingleOutputStreamOperator<Tuple2<String, Integer>> res = window.sum(1); res.print(); env.execute();
}
}

(2)滑动窗口

EventTimeSlidingWindowAllDemo

package cn._51doit.flink.day02.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class EventTimeSlidingWindowAllDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Flink默认使用ProcessingTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准 //需要将时间转成Timestamp格式
//1000,1
//2000,2
//3000,3
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); //提取数据中的EventTime字段,并且转换成Timestamp格式
SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String element) {
String[] fields = element.split(",");
return Long.parseLong(fields[0]);
}
}); SingleOutputStreamOperator<Integer> nums = dataStreamWithWaterMark.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
String[] fields = value.split(",");
String numStr = fields[1];
return Integer.parseInt(numStr);
}
}); //如果是划分窗口,如果没有调用keyBy分组(Non-Keyed Stream),调用windowAll
//Non-Keyed Stream 调用完windowAll 返回的是Non-Keyed Window(AllWindowed)
AllWindowedStream<Integer, TimeWindow> window = nums
.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))); SingleOutputStreamOperator<Integer> sum = window.sum(0);
sum.print();
env.execute();
}
}

EventTimeSlidingWindowDemo

package cn._51doit.flink.day02.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class EventTimeSlidingWindowDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Flink默认使用ProcessingTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准 //需要将时间转成Timestamp格式
//1000,a
//3000,b
//4000,c
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); //提取数据中的EventTime字段,并且转换成Timestamp格式
SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String element) {
String[] fields = element.split(",");
return Long.parseLong(fields[0]);
}
}); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataStreamWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] fields = value.split(",");
String word = fields[1];
return Tuple2.of(word, 1);
}
}); KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))); SingleOutputStreamOperator<Tuple2<String, Integer>> res = window.sum(1); res.print(); env.execute();
}
}

(3)会话窗口

EventTimeSessionWindowAllDemo

package cn._51doit.flink.day02.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class EventTimeSessionWindowAllDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Flink默认使用ProcessingTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准 //需要将时间转成Timestamp格式
//1000,1
//2000,2
//3000,3
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); //提取数据中的EventTime字段,并且转换成Timestamp格式
SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String element) {
String[] fields = element.split(",");
return Long.parseLong(fields[0]);
}
}); SingleOutputStreamOperator<Integer> nums = dataStreamWithWaterMark.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) throws Exception {
String[] fields = value.split(",");
String numStr = fields[1];
return Integer.parseInt(numStr);
}
}); //如果是划分窗口,如果没有调用keyBy分组(Non-Keyed Stream),调用windowAll
//Non-Keyed Stream 调用完windowAll 返回的是Non-Keyed Window(AllWindowed)
AllWindowedStream<Integer, TimeWindow> window = nums
.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5))); SingleOutputStreamOperator<Integer> sum = window.sum(0);
sum.print();
env.execute();
}
}

EventTimeSessionWindowDemo

package cn._51doit.flink.day02.window;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; public class EventTimeSessionWindowDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Flink默认使用ProcessingTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置EventTime作为时间标准 //需要将时间转成Timestamp格式
//1000,a
//3000,b
//4000,c
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888); //提取数据中的EventTime字段,并且转换成Timestamp格式
SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
@Override
public long extractTimestamp(String element) {
String[] fields = element.split(",");
return Long.parseLong(fields[0]);
}
}); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = dataStreamWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] fields = value.split(",");
String word = fields[1];
return Tuple2.of(word, 1);
}
}); KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyed
.window(EventTimeSessionWindows.withGap(Time.seconds(5))); SingleOutputStreamOperator<Tuple2<String, Integer>> res = window.sum(1); res.print(); env.execute();
}
}

5 Watermark(水位线)

  我们知道,流处理理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产?生的时间顺序来的,但是也不不排除由于网络、背压等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不不是严格按照事件的Event Time顺序排列列的。

  那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。

  Watermark是用于处理乱序事件的,而正确的处理从乱序事件,通常用Watermark机制结合window来实现。

  数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。

  Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime-t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。

下面便是创建了一个watermark

SingleOutputStreamOperator<String> dataStreamWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) { //延迟时间0秒
private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public long extractTimestamp(String element) {
String[] fields = element.split(",");
String dateStr = fields[0];
try {
Date date = sdf.parse(dateStr);
long timestamp = date.getTime();
return timestamp;
} catch (ParseException e) {
throw new RuntimeException("时间转换异常");
}
}
});
BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)),此种的参数即为延迟时间

窗口的尺寸是左闭右开,比如一个长度为5s的窗口,其范围为[0,4999)

最新文章

  1. RedHat/Centos修改root密码
  2. Linux Oracle 转换编码格式
  3. windows2003开启vpn服务
  4. FZU 2184 逆序数还原
  5. 对象映射组件Tiny Mapper
  6. [Effective JavaScript 笔记]第63条:当心丢弃错误
  7. 48. Remove Duplicates from Sorted List &amp;&amp; Remove Duplicates from Sorted List II
  8. 查看lock
  9. VC++、MFC、COM和ATL的区别
  10. struts2+hibernate+poi导出Excel实例
  11. 解决unity3d发布的网页游戏放到服务器上无法使用的问题
  12. cocos2d-x的初步学习二十一之iosandroid跨平台环境配置
  13. poj 2823 Sliding Window(单调队列)
  14. orcad10.5启动加速
  15. chapter 13_4 跟踪table的访问
  16. 2018.12/17 function 的闭包
  17. WebService CXF知识总结
  18. bouncing-balls
  19. shell中的算数
  20. C#给整个panel添加点击事件的方法

热门文章

  1. 百亿级小文件存储,JuiceFS 在自动驾驶行业的最佳实践
  2. Memory Analyzer Tool 使用
  3. jenkins 生成HTML报表,邮件推送
  4. Java try catch语句块中try()的括号中代码作用
  5. coding game, 边打游戏边学编程,是一种怎么样的体验?
  6. django前后端分离403 csrf token missing or incorrect
  7. Java学习(八)
  8. [gym102412D]The Jump from Height of Self-importance to Height of IQ Level
  9. [loj3368]数蘑菇
  10. [bzoj1263]整数划分