/*
* ProcessWinFunOnWindow
*/ final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple3<String, String, Long>> input = streamExecutionEnvironment.fromElements(ENGLISH_TRANSCRIPT); DataStream<Double> avgEnglishScore = input.keyBy(0).countWindow(2).process(new MyProcessWindowFunction()); avgEnglishScore.print(); streamExecutionEnvironment.execute(); public static final Tuple3[] ENGLISH_TRANSCRIPT = new Tuple3[] { Tuple3.of("class1","张三",100L), Tuple3.of("class1","李四",78L), Tuple3.of("class1","王五",99L), Tuple3.of("class2","赵六",81L), Tuple3.of("class2","钱七",59L), Tuple3.of("class2","马二",97L) }; private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, String, Long>, Double, Tuple, GlobalWindow> { @Override
public void process(Tuple tuple,
ProcessWindowFunction<Tuple3<String, String, Long>, Double, Tuple, GlobalWindow>.Context context,
Iterable<Tuple3<String, String, Long>> elements, Collector<Double> out) throws Exception {
Long sum = 0L;
Long count = 0L;
for (Tuple3<String, String, Long> element : elements) {
sum += element.f2;
count++;
}
out.collect(sum.doubleValue() / count.doubleValue());
}
} // 运行结果
2> 89.0
1> 70.0 // 如果是input.keyBy(0).countWindow(3)
1> 79.0
2> 92.33333333333333

/**
*AggFunctionOnWindow
*/
final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple3<String, String, Long>> input = streamExecutionEnvironment.fromElements(ENGLISH_TRANSCRIPT); DataStream<Double> avgEnglishScore = input.keyBy(0).countWindow(3).aggregate(new AverageAggregate()); avgEnglishScore.print(); streamExecutionEnvironment.execute(); private static class AverageAggregate implements AggregateFunction<Tuple3<String, String, Long>, Tuple2<Long, Long>, Double> { /**
* 创建累加器来保存中间状态
*/
@Override
public Tuple2<Long, Long> createAccumulator() {
// TODO Auto-generated method stub
return new Tuple2<>(0L, 0L);
} /**
* 来一个元素计算一下sum和count并保存中间结果到累加器
*/
@Override
public Tuple2<Long, Long> add(Tuple3<String, String, Long> value, Tuple2<Long, Long> accmulator) {
// TODO Auto-generated method stub
return new Tuple2<>(accmulator.f0 + value.f2, accmulator.f1 + 1);
} /**
* 从累加器提取结果
*/
@Override
public Double getResult(Tuple2<Long, Long> accmulator) {
// TODO Auto-generated method stub
return accmulator.f0.doubleValue() / accmulator.f1.doubleValue();
} /**
*
*/
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2) {
// TODO Auto-generated method stub
return new Tuple2<>(value1.f0 + value2.f0, value1.f1 + value2.f1);
} } // 运行结果
1> 79.0
2> 92.33333333333333
/**
*ReduceFunctionOnWindowAll
*/ final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple3<String, String, Long>> input = streamExecutionEnvironment.fromElements(ENGLISH_TRANSCRIPT); DataStream<Tuple3<String, String, Long>> totalEnglishScore = input.keyBy(0).countWindow(3).reduce(new ReduceFunction<Tuple3<String, String, Long>>(){ @Override
public Tuple3<String, String, Long> reduce(Tuple3<String, String, Long> value1,
Tuple3<String, String, Long> value2) throws Exception {
// TODO Auto-generated method stub
return new Tuple3<>(value1.f0, value1.f1, value1.f2 + value2.f2);
}
}); totalEnglishScore.map(new MapFunction<Tuple3<String, String, Long>, Tuple2<String, Long>>() { @Override
public Tuple2<String, Long> map(Tuple3<String, String, Long> value) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<>(value.f0, value.f2);
}
}).print(); streamExecutionEnvironment.execute(); // 运行结果
2> (class1,277)
1> (class2,237)

最新文章

  1. JavaScript中的日期处理注意事项
  2. 安装运行Hadoop
  3. oracle 表类型变量的使用
  4. 【翻译】Windows 10 中为不同设备加载不同页面的3种方法
  5. 【POJ 3294】Life Forms 不小于k个字符串中的最长子串
  6. github删除带有文件的文件夹
  7. 在 Area 中使用RouteAttribute 定义路由, 并支持多语言
  8. python学习笔记22(group)
  9. Kill 正在执行的存储过程
  10. C++基础学习教程(八)
  11. CCF-CSP 最大的矩形
  12. python的eval函数
  13. 2018-2019-2 20165313 《网络对抗技术》 Exp5:MSF基础应用
  14. canvas简易画板。
  15. Html标签及各种属性(持续更新)
  16. 明明白白你的Linux服务器——日志篇
  17. jqgrid 宽度自适应
  18. eclipse override报错
  19. PHP网站(Drupal7)响应过慢之“Wating(TTFB)时间过长”
  20. Differential Geometry之第八章常Gauss曲率曲面

热门文章

  1. avformat_open_input阻塞操作中断的支持
  2. 如何将业务代码写得像诗一样(使用注解+单例+工厂去掉一大波if和else判断)
  3. SpringCloud基础
  4. jquery checkbox全选和取消
  5. Android studio配置国内镜像源
  6. (CSDN 迁移) JAVA循环删除List的某个元素
  7. 五、Spring中的@Import注解
  8. Java操作fastDFS
  9. Echarts的提示(Tooltip)显示额外内容
  10. python 职责链模式