函数代码:

class MySparkJob{
def entry(spark:SparkSession):Unit={
def getInnerRsrp(outer_rsrp: Double, wear_loss: Double, path_loss: Double): Double = {
val innerRsrp: Double = outer_rsrp - wear_loss - (XX) * path_loss innerRsrp
}
spark.udf.register("getXXX", getXXX _) import spark.sql
sql(s"""|select getInnerRsrp(t10.outer_rsrp,t10.wear_loss,t10.path_loss) as rsrp, xx from yy""".stripMargin)
}
}

使用spark-submit提交函数时,抛出异常:

User class threw exception: org.apache.spark.SparkException: Task not serializable 

org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:)
at org.apache.spark.SparkContext.clean(SparkContext.scala:)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$.apply(RDD.scala:)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$.apply(RDD.scala:)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:)
at com.dx.fpd_withscenetype.MySparkJob.entry(MySparkJob.scala:)
at com.dx.App$.main(App.scala:)
at com.dx.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:)
at java.lang.reflect.Method.invoke(Method.java:)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$.run(ApplicationMaster.scala:)
Caused by: java.io.NotSerializableException: com.dx.fpd_withscenetype.MySparkJob
Serialization stack:
- object not serializable (class: com.dx.fpd_withscenetype.MySparkJob, value: com.dx.fpd_withscenetype.MySparkJob@e4d4393)
- field (class: com.dx.fpd_withscenetype.MySparkJob$$anonfun$entry$, name: $outer, type: class com.dx.fpd_withscenetype.MySparkJob)
- object (class com.dx.fpd_withscenetype.MySparkJob$$anonfun$entry$, <function2>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$, name: func$, type: interface scala.Function2)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF:getInnerRsrp(cast(input[, double, true] as int), cast(input[, double, true] as int), cast(input[, double, true] as int)))
- element of array (index: )
- array (class [Ljava.lang.Object;, size )
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$, name: references$, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:)
... more

解决方案:

把当前MySparkJob集成Serializable

class MySparkJob extends Serializable {
xxx
}

最新文章

  1. logging 文件日志
  2. SQL Server XML Path[转]
  3. php array_combine 把array的默认键改成指定的字符串
  4. [RxJS] Reactive Programming - Why choose RxJS?
  5. osgearth介绍(转载)-feature_labels.earth
  6. Effective Java 之-----返回零长度的数组或集合而不是null
  7. 机器学习——交叉验证,GridSearchCV,岭回归
  8. 19个实例学会plsql
  9. Myeclipse运行单个jsp页面
  10. springboot常见写法
  11. bzoj千题计划322:bzoj2561: 最小生成树(最小割)
  12. 图片转换base64编码,点击div的时候选择文件
  13. 苹果 ios 微信浏览器界面 ajax 提交带 file 的 form 总是走error方法
  14. (转)vmware下给linux虚拟机扩容
  15. 前端学习 -- Html&amp;Css -- 背景
  16. Codeforces 686 D - Kay and Snowflake
  17. C#怎样用文件读写在文件的原有基础上追加一行数据
  18. GOOGLE高级搜索的秘籍
  19. CURL_SAFE_UPLOAD
  20. 本地项目关联git仓库

热门文章

  1. iframe标签的定时刷新
  2. Angular开发实践(一):环境准备及框架搭建
  3. svn版本提交冲突问题解决详解
  4. webpack-dev-server 搭建本地服务以及浏览器实时刷新
  5. Myeclipse快速补充返回值快捷键
  6. oracle帐号scott被锁定如何解锁
  7. Spring学习笔记(1)
  8. 【Alpha】随笔集合
  9. C语言第十一次博客作业---函数嵌套调用
  10. Markdown文本测试