Spark-Task not serializable错误解析

2018年05月17日 15:33:03 沙拉控 阅读数:1509
 

在学习SparkStreaming的时候偶然出现的一个问题,先看下面一段代码:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Administrator on 2017/11/6.
*/
object ForEachTest {
val checkpointDirectory="hdfs://hadoop1:9000/streamingchekpoint4"
def functionToCreateContext(): StreamingContext = {
//程序入口
val conf = new SparkConf().setMaster("local[2]").setAppName(s"${this.getClass.getSimpleName}")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc,Seconds(1))
//数据的输入
val dStream = ssc.socketTextStream("192.168.32.10",9999)
//数据的处理
val resultDStream = dStream.flatMap(_.split(","))
.map((_, 1))
.updateStateByKey((values: Seq[Int], valuesState: Option[Int]) => {
val currentCount = values.sum
val lastCount = valuesState.getOrElse(0)
Some(currentCount + lastCount)
})
//程序的输出
resultDStream.foreachRDD( rdd =>{
//Driver
val jdbcCoon = MysqlPool.getJdbcCoon()
val statement = jdbcCoon.createStatement()
rdd.foreachPartition( partition =>{
//Executor
partition.foreach( recored =>{
//Executor
val word = recored._1
val count = recored._2
val sql=s"insert into aura.1706wordcount values(now(),'${word}',${count})"
statement.execute(sql)
})
MysqlPool.releaseConn(jdbcCoon)
})
})
//设置检查点
ssc.checkpoint(checkpointDirectory)
ssc
} def main(args: Array[String]): Unit = { val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
//启动程序
ssc.start()
ssc.awaitTermination()
}
}

这段代码是一个SparkStraming与mysql交互的Demo,用到了foreachRDD算子,mysql连接池的代码这里先省略,因为不是重点,会在另一片专门写SparkStreaming的博客中给出。这段代码看似没有问题,但是运行报错:

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: java.lang.Object

表示任务没有被序列化,那么这个序列化到底是指哪里呢?通过查阅官网,发现在介绍foreachRDD的时候有过这么一个介绍:

dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}

这个说明foreachRDD是在driver端执行的,而foreach是在worker端执行的。我们知道我们在提交代码的时候,提交这个动作是在driver端执行的,提交的这台服务器就是driver,那么哪些代码是在drvier端执行的呢?

    val conf = new SparkConf()
conf.setAppName(s"${this.getClass.getSimpleName}").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc: StreamingContext = new StreamingContext(sc, Seconds(1))

以上的这些初始化的代码和:textfile、foreachRDD都是在driver端执行的;

而map、flatmap、reduceByKey、foreach、foreachPartition...这类算子都是在worker端执行的。

从driver到worker是要先序列化再可以传输的,所以你如果要在foreachRDD里面写代码,如果没有经过序列化,就会报错。那么怎么解决呢?

1、让它序列化啊

2、如果这个对象不支持序列化,那就不要写在foreachRDD里面啊

所以,原文的这段代码应该修改为:

    resultDStream.foreachRDD( rdd  =>{
//Driver
rdd.foreachPartition( partition =>{
//Executor
val jdbcCoon = MysqlPool.getJdbcCoon()
val statement = jdbcCoon.createStatement()
partition.foreach( recored =>{
//Executor
val word = recored._1
val count = recored._2
val sql=s"insert into aura.1706wordcount values(now(),'${word}',${count})"
statement.execute(sql)
})
MysqlPool.releaseConn(jdbcCoon)
})
})

最新文章

  1. MySQL配置、使用规范
  2. mongoose 的 model,query:增删改查
  3. Volley框架的使用
  4. cURL的几个经典实例
  5. ServiceBroker创建流程
  6. WCF基礎
  7. MySQL之重设密码(忘记密码)讲解
  8. MVC 中 使用TagBuilder扩展HtmlHelper
  9. 团队项目·冰球模拟器——cmake 自动化构建系统的配置文件的编写
  10. 关于Python网络爬虫实战笔记③
  11. 1.如何安装maven
  12. JavaScript实现登录窗口的拖拽
  13. SQLServer之索引简介
  14. 请问在EXECUTE IMMEDIATE中如何使用带有引号
  15. Excel函数详解:[127]ROWS函数用法
  16. React Native 学习资料
  17. UI设计工资有多高?怎么快速拿高薪?
  18. Spring 3.1新特性之二:@Enable*注解的源码,spring源码分析之定时任务Scheduled注解
  19. WinForm 之 VS2010发布、打包安装程序
  20. 导出 java.io.IOException: 权限不够

热门文章

  1. 华三F100系列、华为USG6300系列防火墙 策略路由配置实例
  2. 《ThinkPHP 5.0快速入门》 请求和响应
  3. pynput模块—键盘鼠标操作和监听
  4. poj1905 Expanding Rods(二分)
  5. 22.把hive表中数据导入到mysql中
  6. GridControl gridView显示筛选行,设置条件为包含
  7. SqlServer判断表中某列是否包含中文,英文,纯数字
  8. 关于centOS安装配置mysql5.6那点事
  9. Q_OBJECT提供了信号槽机制、国际化机、RTTI 的反射能力(cpp中使用Q_OBJECT导致无法处理moc,就需要#include “moc_xxx.h”)
  10. 【weixin】微信企业号和公众号区别和关系是什么?