1. 总览

    • Window 是flink处理无限流的核心,Windows将流拆分为有限大小的“桶”,我们可以在其上应用计算。

    • Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。

    • 而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。

    • Flink 提供了非常完善的窗口机制。

    • 在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。

    • 当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。

    • 在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

    • 窗口可以是基于时间驱动的(Time Window,例如:每30秒钟)

    • 也可以是基于数据驱动的(Count Window,例如:每一百个元素)

    • 同时基于不同事件驱动的窗口又可以分成以下几类:

      • 翻滚窗口 (Tumbling Window, 无重叠)
      • 滑动窗口 (Sliding Window, 有重叠)
      • 会话窗口 (Session Window, 活动间隙)
      • 全局窗口 (略)
    • Flink要操作窗口,先得将StreamSource 转成WindowedStream

      Window操作 其作用
      Window Keyed Streaming → WindowedStream 可以在已经分区的KeyedStream上定义Windows,即K,V格式的数据。
      WindowAll DataStream → AllWindowedStream 对常规的DataStream上定义Window,即非K,V格式的数据
      Window Apply WindowedStream → AllWindowedStream AllWindowedStream → DataStream 将函数应用于整个窗口中的数据
      Window Reduce WindowedStream → DataStream 对窗口里的数据进行”reduce”减少聚合统计
      Aggregations on windows WindowedStream → DataStream 对窗口里的数据进行聚合操作: sum(), max(), min()
  2. Tumbling Window(翻滚窗口)

    • 翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口

    • 翻滚窗具有固定的尺寸,不重叠。

    • 例图:

      • 代码

        package com.ronnie.flink.stream.window;
        
        import org.apache.flink.api.common.functions.MapFunction;
        import org.apache.flink.api.java.tuple.Tuple;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.datastream.DataStreamSource;
        import org.apache.flink.streaming.api.datastream.KeyedStream;
        import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        import org.apache.flink.streaming.api.datastream.WindowedStream;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.streaming.api.windowing.time.Time;
        import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
        import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat;
        import java.util.Random; /**
        * 翻滚窗口:窗口不可重叠
        * 1、基于时间驱动
        * 2、基于事件驱动
        */
        public class TumblingWindow { public static void main(String[] args) {
        //设置执行环境,类似spark中初始化sparkContext
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> map(String value) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long timeMillis = System.currentTimeMillis(); int random = new Random().nextInt(10); System.out.println("value: " + value + " random: " + random + "timestamp: " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<String, Integer>(value, random);
        }
        }); KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0); // 基于时间驱动,每隔10s划分一个窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10)); // 基于事件驱动, 每相隔3个事件(即三个相同key的数据), 划分一个窗口进行计算
        // WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow = keyedStream.countWindow(3); // apply是窗口的应用函数,即apply里的函数将应用在此窗口的数据上。
        timeWindow.apply(new MyTimeWindowFunction()).print();
        // countWindow.apply(new MyCountWindowFunction()).print(); try {
        // 转换算子都是lazy init的, 最后要显式调用 执行程序
        env.execute();
        } catch (Exception e) {
        e.printStackTrace();
        } }
        }
    • 基于时间驱动

      • 场景1:我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(Tumbling Time Window)。

        package com.shsxt.flink.stream.window;
        
        import org.apache.flink.api.java.tuple.Tuple;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
        import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
        import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; public class MyTimeWindowFunction implements WindowFunction<Tuple2<String,Integer>, String, Tuple, TimeWindow> { @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); int sum = 0; for(Tuple2<String,Integer> tuple2 : input){
        sum +=tuple2.f1;
        } long start = window.getStart();
        long end = window.getEnd(); out.collect("key:" + tuple.getField(0) + " value: " + sum + "| window_start :"
        + format.format(start) + " window_end :" + format.format(end)
        ); }
        }
    • 基于事件驱动

      • 场景2:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满100个”相同”元素了,就会对窗口进行计算。

        package com.ronnie.flink.stream.window;
        
        import org.apache.flink.api.java.tuple.Tuple;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
        import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
        import org.apache.flink.util.Collector; import java.text.SimpleDateFormat; public class MyCountWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> { @Override
        public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); int sum = 0; for (Tuple2<String, Integer> tuple2 : input){
        sum += tuple2.f1;
        }
        //无用的时间戳,默认值为: Long.MAX_VALUE,因为基于事件计数的情况下,不关心时间。
        long maxTimestamp = window.maxTimestamp(); out.collect("key:" + tuple.getField(0) + " value: " + sum + "| maxTimeStamp :"
        + maxTimestamp + "," + format.format(maxTimestamp)
        );
        }
        }
  3. Sliding Window(滑动窗口)

    • 滑动窗口和翻滚窗口类似,区别在于:滑动窗口可以有重叠的部分。

    • 在滑窗中,一个元素可以对应多个窗口。

    • 例图:

    • 基于时间的滑动窗口

      • 场景: 我们可以每30秒计算一次最近一分钟用户购买的商品总数。
    • 基于事件的滑动窗口

      • 场景: 每10个 “相同”元素计算一次最近100个元素的总和.
    • 代码:

      package com.ronnie.flink.stream.window;
      
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.java.tuple.Tuple;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.KeyedStream;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.datastream.WindowedStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.windowing.time.Time;
      import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
      import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat;
      import java.util.Random; /**
      * 滑动窗口:窗口可重叠
      * 1、基于时间驱动
      * 2、基于事件驱动
      */
      public class SlidingWindow { public static void main(String[] args) {
      // 设置执行环境, 类似spark中初始化SparkContext
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
      @Override
      public Tuple2<String, Integer> map(String value) throws Exception {
      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
      long timeMillis = System.currentTimeMillis(); int random = new Random().nextInt(10);
      System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<String, Integer>(value, random);
      }
      });
      KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0); //基于时间驱动,每隔5s计算一下最近10s的数据
      // WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream.timeWindow(Time.seconds(10), Time.seconds(5));
      //基于事件驱动,每隔2个事件,触发一次计算,本次窗口的大小为3,代表窗口里的每种事件最多为3个
      WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow = keyedStream.countWindow(3, 2); // timeWindow.sum(1).print(); countWindow.sum(1).print(); // timeWindow.apply(new MyTimeWindowFunction()).print(); try {
      env.execute();
      } catch (Exception e) {
      e.printStackTrace();
      }
      }
      }
  4. Session Window(会话窗口)

    • 会话窗口不重叠,没有固定的开始和结束时间

    • 与翻滚窗口和滑动窗口相反, 当会话窗口在一段时间内没有接收到元素时会关闭会话窗口。

    • 后续的元素将会被分配给新的会话窗口

    • 例图:

    • 举例:

      • 计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开。
    • 代码:

      package com.ronnie.flink.stream.window;
      
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.java.tuple.Tuple;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.datastream.KeyedStream;
      import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
      import org.apache.flink.streaming.api.datastream.WindowedStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
      import org.apache.flink.streaming.api.windowing.time.Time;
      import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import java.text.SimpleDateFormat;
      import java.util.Random; public class SessionWindow { public static void main(String[] args) { // 设置执行环境, 类似spark中初始化sparkContext StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> dataStreamSource = env.socketTextStream("ronnie01", 9999); SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
      @Override
      public Tuple2<String, Integer> map(String value) throws Exception {
      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
      long timeMillis = System.currentTimeMillis(); int random = new Random().nextInt(10); System.err.println("value : " + value + " random : " + random + " timestamp : " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2<String, Integer>(value, random);
      }
      });
      KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0); //如果连续10s内,没有数据进来,则会话窗口断开。
      WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window = keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))); // window.sum(1).print(); window.apply(new MyTimeWindowFunction()).print(); try {
      env.execute();
      } catch (Exception e) {
      e.printStackTrace();
      }
      }
      }

最新文章

  1. js正则获取url所带参数值
  2. SharePoint Server 2013 让上传文件更精彩
  3. linux升级openssh
  4. [Android] 转-LayoutInflater丢失View的LayoutParams
  5. VS2013 有效密钥
  6. 网络编程——基于TCP协议的Socket编程,基于UDP协议的Socket编程
  7. SPFA(负环) LightOJ 1074 Extended Traffic
  8. foreach属性-动态-mybatis中使用map类型参数,其中key为列名,value为列值
  9. 在Openfire中使用自己的数据表之修改配置文件
  10. ubuntu.sh: 113: ubuntu.sh: Syntax error: &quot;(&quot; unexpected
  11. 第3章 K近邻法
  12. Working with Data &#187; Getting started with ASP.NET Core and Entity Framework Core using Visual Studio &#187; 排序、筛选、分页以及分组
  13. jquery 评论等级(很差,差,一般,好,很好)代码
  14. UNITY 打包安卓APK
  15. 獲取 Textarea 的光標位置(摘自網絡)
  16. Jquery css函数用法(判断标签是否拥有某属性)
  17. 利用python进行数据分析之pandas库的应用(二)
  18. 一步一步重写 CodeIgniter 框架 (9) —— 使用 CodeIgniter 类库
  19. PHP防XSS 防SQL注入的代码
  20. 安装vue错误详情解决办法

热门文章

  1. BZOJ 4166: 月宫的符卡序列
  2. Linux系统监控 zabbix-agent 主机添加的操作页面
  3. c-指针的理解
  4. 浅谈Windows入侵检查
  5. 【SqlServer】利用sql语句附加,分离数据库-----转载
  6. Jquery - ajax url路径问题
  7. tomcat点击startup.bat出现闪退,启动不成功的解决办法
  8. Shiro登录身份认证(从SecurityUtils.getSubject().login(token))到Realm的doGetAuthenticationInfo
  9. SpringBoot 静态资源的配置
  10. Vue项目中v-for无法渲染数据