org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 29.1 failed 4 times, most recent failure: Lost task 1.3 in stage 29.1 (TID 466, magnesium, executor 4): java.lang.RuntimeException: java.lang.reflect.InvocationTargetException

at shade.com.datastax.spark.connector.google.common.base.Throwables.propagate(Throwables.java:160)

at com.datastax.driver.core.NettyUtil.newEventLoopGroupInstance(NettyUtil.java:136)

at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:99)

at com.datastax.driver.core.Connection$Factory.(Connection.java:769)

at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1410)

at com.datastax.driver.core.Cluster.init(Cluster.java:159)

at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:330)

at com.datastax.driver.core.Cluster.connect(Cluster.java:280)

at StreamingIntegrationKafkaBac$$anonfun$main$1$$anonfun$apply$1.apply(StreamingIntegrationKafkaBac.scala:155)

at StreamingIntegrationKafkaBac$$anonfun$main$1$$anonfun$apply$1.apply(StreamingIntegrationKafkaBac.scala:144)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)

at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)

at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

我是在SparkStreaming查询Cassandra时遇到这个报错的。

dataFrame.foreachPartition { part =>
val poolingOptions = new PoolingOptions
poolingOptions
.setCoreConnectionsPerHost(HostDistance.LOCAL, 4)
.setMaxConnectionsPerHost(HostDistance.LOCAL, 10)
val cluster = Cluster
.builder
.addContactPoints("localhost")
.withCredentials("cassandra", "cassandra")
.withPoolingOptions(poolingOptions)
.build
val session = cluster.connect("keyspace")
part.foreach { item =>
// 业务逻辑
}
cluster.close()
session.close()
}

每个批次中,首先检查cluster和session,是否都close,没有close会报这个错误。

若还没有解决,需要检查netty的版本。

推荐在IDEA中安装Maven Helper插件。然后把冲突的低版本netty相关的依赖删掉即可。

最新文章

  1. !gluLookAt与glOrtho 参数解析
  2. Python学习——基础篇
  3. CSS 实现加载动画之六-大风车
  4. 将windows下的PLSQL转移到Ubuntu上
  5. Selenium用户扩展
  6. Cocos2d-X 2.2嵌入MFC的子窗口
  7. startActivityForResult
  8. libsvm工具箱C++编程实践2
  9. 2012-11-17 12:28 用MFC实现的计算器(详细版)
  10. 浅析Struts2中的OGNL和ValueStack
  11. centos7 python3 pip
  12. Python爬虫入门教程 6-100 蜂鸟网图片爬取之一
  13. CentOS7 下 Hadoop 单节点(伪分布式)部署
  14. 错误票据|2013年蓝桥杯B组题解析第七题-fishers
  15. 关于mobilesroll使用方法的再次声明
  16. 解决ConfigParser配置option的大小写问题
  17. d3 + geojson in node
  18. apache jmeter 压力测试
  19. Codeforces Round #527 (Div. 3)
  20. ElasticSearch 5.4 安装

热门文章

  1. Navicat for MySQ中文破解版(无需激活码)
  2. codeforces#1251E2. Voting (Hard Version)(贪心)
  3. 超轻量级虚拟终端sakura和tilda
  4. [APIO2015]八邻旁之桥——非旋转treap
  5. C# 控制反转
  6. [RK3399] 虚拟按键栏显示不全或者方向不对
  7. 【scikit-learn】06:make_blobs聚类数据生成器
  8. OpenJudge计算概论-最长单词2
  9. 不建议在for循环中使用”+”进行字符串拼接
  10. Ubuntu18.04修改主机名和网卡地址