Spark 学习笔记之 Streaming Window
2024-09-02 00:34:18
Streaming Window:
上图意思:每隔2秒统计前3秒的数据
slideDuration: 2
windowDuration: 3
例子:
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent object WindowStreaming { def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("KafkaDirect").setMaster("local[1]")
val ssc = new StreamingContext(conf, Seconds(1))
val kafkaMapParams = Map[String, Object](
"bootstrap.servers" -> "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "g1",
"auto.offset.reset" -> "latest", //earliest|latest
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topicsSet = Set("ScalaTopic")
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaMapParams)
) val finalResultRDD: DStream[(Int, String)] = kafkaStream.flatMap(row => row.value().split(" "))
.map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(3), Seconds(2))
.transform(rdd => rdd.map(tuple => (tuple._2, tuple._1))
.sortByKey(false).map(tuple => (tuple._1, tuple._2))
) finalResultRDD.print() ssc.start()
ssc.awaitTermination()
} }
运行结果:
最新文章
- SOAPUI使用教程-MockService脚本概述
- django authenticate
- redis集群讨论
- 使用zabbix邮件发送报表
- 初次使用 git 的“核弹级选项”:filter-branch 从仓库中删除文件
- JSP的那些事儿(2)---- DWR2.0 的配置和使用
- 使用命令参数方式指定log4j配置文件
- usb wifi driver run in ubuntu support 360/xiaodu and with 3.13.0-32-generic
- Cookie、Session
- jquery 部分效果
- Android开发随手记
- Mysql-单表查询的操作和注意事项
- 如何通过Chrome远程调试android设备上的Web网站
- IDEA+Maven:cannot download sources
- SpringMvc常见问题汇总
- adb常用命令教程
- JS 字符串 作为变量名
- Hadoop使用场景
- 【learning】vim爆改记 (如何让vim用起来像devc++)
- Autovacuum 的运行限制
热门文章
- gym/102059 E
- hdu6312 2018杭电多校第二场 1004 D Game 博弈
- hdu 6016 Count the Sheep(思维)
- Java集合:LinkedList (JDK1.8 源码解读)
- Python---网络爬虫初识
- Intellij IDEA在maven项目中添加外部Jar包运行
- Nginx使用GeoIP模块来限制地区访问
- 【LeetCode】34-在排序数组中查找元素的第一个和最后一个位置
- Java高性能编程之CAS与ABA及解决方法
- .Net基础篇_学习笔记_第八天_复杂数据类型(常量/枚举/结构)