1.说明

  虽然DStream可以转换成RDD,但是如果比较复杂,可以考虑使用SparkSQL。

2.集成方式

  Streaming和Core整合:
    transform或者foreachRDD方法
  Core和SQL整合:
    RDD <==> DataFrame 互换

3.程序

 package com.sql.it
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object StreamingSQL {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("StreamingWindowOfKafka22")
.setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
val ssc = new StreamingContext(sc, Seconds(5))
// 当调用updateStateByKey函数API的时候,必须给定checkpoint dir
// 路径对应的文件夹不能存在
ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/9421151351") val kafkaParams = Map(
"group.id" -> "streaming-kafka-78912151",
"zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",
"auto.offset.reset" -> "smallest"
)
val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1
val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
ssc, // 给定SparkStreaming上下文
kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接
topics, // 给定读取对应topic的名称以及读取数据的线程数量
StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别
).map(_._2) /**
* transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
*/
dstream.transform(rdd => {
// 使用sql统计wordcoount
val sqlContext = SQLContextSingelton.getSQLContext(rdd.sparkContext)
import sqlContext.implicits._
val procedRDD = rdd.filter(_.nonEmpty).flatMap(_.split(" ").map((_, 1)))
procedRDD.toDF("word", "c").registerTempTable("tb_word")
val resultRDD = sqlContext.sql("select word, count(c) as vc from tb_word group by word").map(row => {
val word = row.getAs[String]("word")
val count = row.getAs[Long]("vc")
(word, count)
}) resultRDD
}).print() // 启动开始处理
ssc.start()
ssc.awaitTermination() // 等等结束,监控一个线程的中断操作
}
} object SQLContextSingelton {
@transient private var instance: SQLContext = _ def getSQLContext(sc: SparkContext): SQLContext = {
if (instance == null) {
synchronized[SQLContext] {
if (instance == null) {
instance = new SQLContext(sc)
}
instance
}
}
instance
}
}

4.效果

  

最新文章

  1. UML入门
  2. PHP三元运算符 isset($_GET[&#39;id&#39;]) ? $_GET[&#39;id&#39;] : ”
  3. [Asp.net 开发系列之SignalR篇]专题五:SignalR支持的平台
  4. Git远程和分支管理
  5. Mac 安装 eclipse
  6. redis集群配置
  7. 马哥教育视频笔记:01(Linux常用命令)
  8. Mecanim的Retargeting和BodyMask
  9. VS2010中手动重命名项目
  10. 一篇memcache基础教程
  11. Win32汇编开始 Hello Asm
  12. jquery ajax验证用户名是否存在(后台spring mvc)
  13. SQL Server中如何备份存储过程(SP)和函数(Fun)
  14. jquery ajax file upload NET MVC 无刷新文件上传
  15. 中国居民18位身份证号验证方法,Java算法实现
  16. maya模板lock工具
  17. 数据仓库分层ODS DW DM 主题 标签
  18. 让DOM从页面中消失的方法
  19. 黄聪:初识Pjax:pjax是什么
  20. EF学习笔记-CODE FIRST-约定

热门文章

  1. java学习——异常处理
  2. Web框架之Bootstrap
  3. Confluence 6 基本性能问题诊断步骤
  4. 图书管理系统(无中间件,用装饰器的)-----未基于FORM组件
  5. 【python】获取http响应
  6. Nginx详解十四:Nginx场景实践篇之代理服务
  7. Django将默认的SQLite更换为MySQL
  8. ActiveSync 学习记录
  9. MongoDB 入门
  10. 浏览器LocalStroage使用