sparkStreaming(2.1.0)示范代码
2024-09-04 21:28:30
package cn.piesat import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words=lines.flatMap(_.split(" "))
words.foreachRDD(rdd=>{
val spark=SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val worldDataFrame=rdd.map(w=>{
Record(w)
}).toDF()
worldDataFrame.createOrReplaceTempView("words")
val wordCountsDataFram=spark.sql("select word,count(*) as total from words group by word")
wordCountsDataFram.show()
})
ssc.start()
ssc.awaitTermination()
}
}
case class Record(val word:String) { }
最新文章
- ahk之路:利用ahk在window7下实现窗口置顶
- 搭建TestNG环境( 一)
- js获取及判断按键的方法
- WebService如何根据对方提供的xml生成对象
- 【POJ 1981 】Circle and Points
- Git: 生成ssh公钥
- H5案例分享:html5重力感应事件
- PhpStorm 配置Xdebug
- poj-3255-Roadblocks-路径可重复次短路
- 怎样让老浏览器兼容html5新标签
- [置顶] Android系统移植与调试之------->;如何修改Android设备添加3G上网功能
- Java初始阶段
- java web jsp原理图 ,静态包含,动态包含,out与response.getWrite()
- mac中的myeclipse的控制台中文乱码问题解决办法
- 适配iOS11
- Wasserstein CNN: Learning Invariant Features for NIR-VIS Face Recognition
- Python --代码风格检查 pep8
- 让HTMLrunner 报告的子列表都 默认展示出来的 方法(方便发送邮件时可以方便查看)
- [JSOI2018]列队(主席树)
- 【Python】如何取到input中的value值?