SparkStreaming和KafKa结合报错!报错之前代码如下:

 object KafkaWordCount{
val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])])=>{
iter.flatMap{case(x,y,z) => Some(y.sum+z.getOrElse(0)).map(i => (x,i))}
}
def main(args: Array[String]): Unit = {
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("c://ck2")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
val words = data.map(_._2).flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
println(wordCounts)
ssc.start()
ssc.awaitTermination()
}
}

注意:  在14行 应该是 wordCounts.print()  报错原因 :  在使用Streaming 的时候需要触发如下方法 print否则出现如下的错误

 17/07/28 17:11:59 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.bianqi.spark.day5.KafkaWordCount$.main(KafkaWordCount.scala:24)
at org.bianqi.spark.day5.KafkaWordCount.main(KafkaWordCount.scala)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)

修改后的代码如下:

 object KafkaWordCount{
val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])])=>{
iter.flatMap{case(x,y,z) => Some(y.sum+z.getOrElse(0)).map(i => (x,i))}
}
def main(args: Array[String]): Unit = {
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("c://ck2")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
val words = data.map(_._2).flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

但是在stackoverflow上看到 报这样的错误 会是另外一在原因 具体地址如下:

https://stackoverflow.com/questions/34188274/spark-no-output-operations-registered-so-nothing-to-execute

最新文章

  1. NGUI 指定视口大小
  2. spring与redis简单整合
  3. win7出现无法连接到代理服务器的错误,不能上网的问题的解决
  4. Eclipse 3.5使用dropins的插件安装方式
  5. 【Map】MapTest
  6. CISA 信息系统审计知识点 [第一章. 信息系统审计过程 ]
  7. EAX、ECX、EDX、EBX寄存器的作用
  8. POJ 2241 Mondriaan's Dream
  9. Android--消除“Permission is only granted to system apps”错误
  10. C#.NET Winform 通用开发框架
  11. toolkit,phonetextbox中实现用户按回车键会换行
  12. Android 自定义view实现水波纹效果
  13. Hive:添加、删除分区
  14. Linux命令面试集
  15. 有道词典翻译(携带请求头和post参数请求)
  16. hive表的存储路径查找以及表的大小
  17. git 合并多个commit
  18. Eclipse tomcat配置 未在Eclipse中添加.jar包出错
  19. P2398 GCD SUM
  20. Java多线程之syncrhoized内置互斥锁的用法详解

热门文章

  1. Android中的一些小技巧
  2. git config 的全局配置
  3. es5 - array - concat
  4. 从零开始编写自己的C#框架(25)——网站部署 【转】
  5. 算法笔记_042:求最小公倍数(Java)
  6. gre tunnel搭建
  7. Linux中setup命令command not found如何解决?
  8. iOS开发之UITableView的使用
  9. 〖Linux〗不知谁写的,很实用的Linux命令
  10. 33、深入理解Java的接口和抽象类