package cn.piesat

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words=lines.flatMap(_.split(" "))
words.foreachRDD(rdd=>{
val spark=SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val worldDataFrame=rdd.map(w=>{
Record(w)
}).toDF()
worldDataFrame.createOrReplaceTempView("words")
val wordCountsDataFram=spark.sql("select word,count(*) as total from words group by word")
wordCountsDataFram.show()
})
ssc.start()
ssc.awaitTermination()
}
}
case class Record(val word:String) {

}

最新文章

  1. ahk之路:利用ahk在window7下实现窗口置顶
  2. 搭建TestNG环境( 一)
  3. js获取及判断按键的方法
  4. WebService如何根据对方提供的xml生成对象
  5. 【POJ 1981 】Circle and Points
  6. Git: 生成ssh公钥
  7. H5案例分享:html5重力感应事件
  8. PhpStorm 配置Xdebug
  9. poj-3255-Roadblocks-路径可重复次短路
  10. 怎样让老浏览器兼容html5新标签
  11. [置顶] Android系统移植与调试之------->如何修改Android设备添加3G上网功能
  12. Java初始阶段
  13. java web jsp原理图 ,静态包含,动态包含,out与response.getWrite()
  14. mac中的myeclipse的控制台中文乱码问题解决办法
  15. 适配iOS11
  16. Wasserstein CNN: Learning Invariant Features for NIR-VIS Face Recognition
  17. Python --代码风格检查 pep8
  18. 让HTMLrunner 报告的子列表都 默认展示出来的 方法(方便发送邮件时可以方便查看)
  19. [JSOI2018]列队(主席树)
  20. 【Python】如何取到input中的value值?

热门文章

  1. 【VS开发】MFC运行时库与debug、release版本之间的配置关系
  2. 笔记本通过命令配置wifi win7系统
  3. Java小知识---Java请求一个URL。获取网站返回的数据
  4. MySQL教程详解之存储引擎介绍及默认引擎设置
  5. Maven设置阿里仓库镜像增加访问速度
  6. 设计模式:门面模式(Facade)
  7. Win32汇编-创建窗体代码
  8. 通过PlayBook部署Zabbix
  9. Tomcat的安全性
  10. 常用的TCP/UDP端口