Streaming Window:

上图意思:每隔2秒统计前3秒的数据

slideDuration: 2

windowDuration: 3

例子:

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent object WindowStreaming { def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("KafkaDirect").setMaster("local[1]")
val ssc = new StreamingContext(conf, Seconds(1))
val kafkaMapParams = Map[String, Object](
"bootstrap.servers" -> "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "g1",
"auto.offset.reset" -> "latest", //earliest|latest
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topicsSet = Set("ScalaTopic")
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaMapParams)
) val finalResultRDD: DStream[(Int, String)] = kafkaStream.flatMap(row => row.value().split(" "))
.map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(3), Seconds(2))
.transform(rdd => rdd.map(tuple => (tuple._2, tuple._1))
.sortByKey(false).map(tuple => (tuple._1, tuple._2))
) finalResultRDD.print() ssc.start()
ssc.awaitTermination()
} }

运行结果:

最新文章

  1. SOAPUI使用教程-MockService脚本概述
  2. django authenticate
  3. redis集群讨论
  4. 使用zabbix邮件发送报表
  5. 初次使用 git 的“核弹级选项”:filter-branch 从仓库中删除文件
  6. JSP的那些事儿(2)---- DWR2.0 的配置和使用
  7. 使用命令参数方式指定log4j配置文件
  8. usb wifi driver run in ubuntu support 360/xiaodu and with 3.13.0-32-generic
  9. Cookie、Session
  10. jquery 部分效果
  11. Android开发随手记
  12. Mysql-单表查询的操作和注意事项
  13. 如何通过Chrome远程调试android设备上的Web网站
  14. IDEA+Maven:cannot download sources
  15. SpringMvc常见问题汇总
  16. adb常用命令教程
  17. JS 字符串 作为变量名
  18. Hadoop使用场景
  19. 【learning】vim爆改记 (如何让vim用起来像devc++)
  20. Autovacuum 的运行限制

热门文章

  1. gym/102059 E
  2. hdu6312 2018杭电多校第二场 1004 D Game 博弈
  3. hdu 6016 Count the Sheep(思维)
  4. Java集合:LinkedList (JDK1.8 源码解读)
  5. Python---网络爬虫初识
  6. Intellij IDEA在maven项目中添加外部Jar包运行
  7. Nginx使用GeoIP模块来限制地区访问
  8. 【LeetCode】34-在排序数组中查找元素的第一个和最后一个位置
  9. Java高性能编程之CAS与ABA及解决方法
  10. .Net基础篇_学习笔记_第八天_复杂数据类型(常量/枚举/结构)