No output operations registered, so nothing to execute
2024-08-27 05:38:26
SparkStreaming和KafKa结合报错!报错之前代码如下:
object KafkaWordCount{
val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])])=>{
iter.flatMap{case(x,y,z) => Some(y.sum+z.getOrElse(0)).map(i => (x,i))}
}
def main(args: Array[String]): Unit = {
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("c://ck2")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
val words = data.map(_._2).flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
println(wordCounts)
ssc.start()
ssc.awaitTermination()
}
}
注意: 在14行 应该是 wordCounts.print() 报错原因 : 在使用Streaming 的时候需要触发如下方法 print否则出现如下的错误
17/07/28 17:11:59 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.bianqi.spark.day5.KafkaWordCount$.main(KafkaWordCount.scala:24)
at org.bianqi.spark.day5.KafkaWordCount.main(KafkaWordCount.scala)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
修改后的代码如下:
object KafkaWordCount{
val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])])=>{
iter.flatMap{case(x,y,z) => Some(y.sum+z.getOrElse(0)).map(i => (x,i))}
}
def main(args: Array[String]): Unit = {
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("c://ck2")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
val words = data.map(_._2).flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
但是在stackoverflow上看到 报这样的错误 会是另外一在原因 具体地址如下:
最新文章
- NGUI 指定视口大小
- spring与redis简单整合
- win7出现无法连接到代理服务器的错误,不能上网的问题的解决
- Eclipse 3.5使用dropins的插件安装方式
- 【Map】MapTest
- CISA 信息系统审计知识点 [第一章. 信息系统审计过程 ]
- EAX、ECX、EDX、EBX寄存器的作用
- POJ 2241 Mondriaan's Dream
- Android--消除“Permission is only granted to system apps”错误
- C#.NET Winform 通用开发框架
- toolkit,phonetextbox中实现用户按回车键会换行
- Android 自定义view实现水波纹效果
- Hive:添加、删除分区
- Linux命令面试集
- 有道词典翻译(携带请求头和post参数请求)
- hive表的存储路径查找以及表的大小
- git 合并多个commit
- Eclipse tomcat配置 未在Eclipse中添加.jar包出错
- P2398 GCD SUM
- Java多线程之syncrhoized内置互斥锁的用法详解