spark2.1注册内部函数spark.udf.register("xx", xxx _),运行时抛出异常:Task not serializable
2024-10-15 01:15:59
函数代码:
class MySparkJob{
def entry(spark:SparkSession):Unit={
def getInnerRsrp(outer_rsrp: Double, wear_loss: Double, path_loss: Double): Double = {
val innerRsrp: Double = outer_rsrp - wear_loss - (XX) * path_loss innerRsrp
}
spark.udf.register("getXXX", getXXX _) import spark.sql
sql(s"""|select getInnerRsrp(t10.outer_rsrp,t10.wear_loss,t10.path_loss) as rsrp, xx from yy""".stripMargin)
}
}
使用spark-submit提交函数时,抛出异常:
User class threw exception: org.apache.spark.SparkException: Task not serializable org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:)
at org.apache.spark.SparkContext.clean(SparkContext.scala:)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$.apply(RDD.scala:)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$.apply(RDD.scala:)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:)
at com.dx.fpd_withscenetype.MySparkJob.entry(MySparkJob.scala:)
at com.dx.App$.main(App.scala:)
at com.dx.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:)
at java.lang.reflect.Method.invoke(Method.java:)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$.run(ApplicationMaster.scala:)
Caused by: java.io.NotSerializableException: com.dx.fpd_withscenetype.MySparkJob
Serialization stack:
- object not serializable (class: com.dx.fpd_withscenetype.MySparkJob, value: com.dx.fpd_withscenetype.MySparkJob@e4d4393)
- field (class: com.dx.fpd_withscenetype.MySparkJob$$anonfun$entry$, name: $outer, type: class com.dx.fpd_withscenetype.MySparkJob)
- object (class com.dx.fpd_withscenetype.MySparkJob$$anonfun$entry$, <function2>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$, name: func$, type: interface scala.Function2)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF:getInnerRsrp(cast(input[, double, true] as int), cast(input[, double, true] as int), cast(input[, double, true] as int)))
- element of array (index: )
- array (class [Ljava.lang.Object;, size )
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$, name: references$, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:)
... more
解决方案:
把当前MySparkJob集成Serializable
class MySparkJob extends Serializable {
xxx
}
最新文章
- logging 文件日志
- SQL Server XML Path[转]
- php array_combine 把array的默认键改成指定的字符串
- [RxJS] Reactive Programming - Why choose RxJS?
- osgearth介绍(转载)-feature_labels.earth
- Effective Java 之-----返回零长度的数组或集合而不是null
- 机器学习——交叉验证,GridSearchCV,岭回归
- 19个实例学会plsql
- Myeclipse运行单个jsp页面
- springboot常见写法
- bzoj千题计划322:bzoj2561: 最小生成树(最小割)
- 图片转换base64编码,点击div的时候选择文件
- 苹果 ios 微信浏览器界面 ajax 提交带 file 的 form 总是走error方法
- (转)vmware下给linux虚拟机扩容
- 前端学习 -- Html&;Css -- 背景
- Codeforces 686 D - Kay and Snowflake
- C#怎样用文件读写在文件的原有基础上追加一行数据
- GOOGLE高级搜索的秘籍
- CURL_SAFE_UPLOAD
- 本地项目关联git仓库