Spark Streaming 的一些问题,做选型前关注这些问题可以有效的降低使用风险。

checkpoint

checkpoint 是个很好的恢复机制。但是方案比较粗暴,直接通过序列化的机制写入到文件系统,导致代码变更和配置变更无法生效。实际场景是升级往往比系统崩溃的频率高太多。但是升级需要能够无缝的衔接上一次的偏移量。所以spark streaming在无法容忍数据有丢失的情况下,你需要自己记录偏移量,然后从上一次进行恢复。

我们目前是重写了相关的代码,每次记录偏移量,不过只有在升级的时候才会读取自己记录的偏移量,其他情况都是依然采用checkpoint机制。

Kafka

这个和Spark Streaming相关,也不太相关。说相关是因为Spark 对很多异常处理比较简单。很多是和Kafka配置相关的。我举个例子:

如果消息体太大了,超过 fetch.message.max.bytes=1m,那么Spark Streaming会直接抛出OffsetOutOfRangeException异常,然后停止服务。
对应的错误会从这行代码抛出:
if (!iter.hasNext) {
 assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
 finished = true
 null.asInstanceOf[R]
}

其实就是消费的完成后 实际的消费数据量和预先估计的量不一致。
你在日志中看到的信息其实是这个代码答应出来的:
private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
 s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
 s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +    " This should not happen, and indicates that messages may have been lost"

解决办法自然是把 fetch.message.max.bytes 设置大些。

如果你使用Spark Streaming去追数据,从头开始消费kafka,而Kafka因为某种原因,老数据快速的被清理掉,也会引发OffsetOutOfRangeException错误。并且使得Spark Streaming程序异常的终止。

解决办法是事先记录kafka偏移量和时间的关系(可以隔几秒记录一次),然后根据时间找到一个较大的偏移量开始消费。

或者你根据目前Kafka新增数据的消费速度,给smallest获取到的偏移量再加一个较大的值,避免出现Spark Streaming 在fetch的时候数据不存在的情况。

textFileStream

其实使用textFileStream 的人应该也不少。因为可以很方便的监控HDFS上某个文件夹下的文件,并且进行计算。这里我们遇到的一个问题是,如果底层比如是压缩文件,遇到有顺坏的文件,你是跳不过去的,直接会让Spark Streaming 异常退出。 官方并没有提供合适的方式让你跳过损坏的文件。我们目前是通过重写FileInputDStream 等相关类来修正该问题。

内存

Shuffle (尤其是每个周期数据量很大的情况)是Spark Streaming 不可避免的疼痛。譬如,与Kafka的集成, Kafka的分区数决定了你的并行度(我们假设你使用Direct Approach的模式集成)。你为了获得更大的并行度,则需要进行一次repatition。 为了能够避免Shuffle,并且提高Spark Streaming处理的并行度,我们重写了DirectKafkaInputDStream,KafkaRDD,KafkaUtils等类,实现可以按Kafka 分区按倍数扩大并行度。

我们期望官方能够实现将一个Kafka的partition 映射为多个Spark 的partition,避免数据的多次移动。

再次,如果单个Executor 并行度过大,可能也会导致对内存压力增大。在使用Spark Streaming的过程中,我们多次遇到Executor Lost 相关的问题(譬如 shuffle fetch 失败,Task失败重试等),目前比较有效的方式是:

提高Executor 数目
减少单个Executor的 CPU 核数

为了保证处理的效率,请保证CPU总核数保持不变。

监控

Spark Streaming 的UI 上的Executors Tab缺少一个最大的监控,就是Worker内存GC详情。虽然我们可以将这些信息导入到 第三方监控中,然而终究是不如在 Spark UI上展现更加方便。 为此我们也将该功能列入研发计划。

最新文章

  1. MATLAB 绘图时的相关心得
  2. 数据库知识整理<四>
  3. Android应用安全之Content Provider安全
  4. css权重是什么
  5. [分享]运维分享一一阿里云linux系统mysql密码修改脚本
  6. 【leetcode❤python】235. Lowest Common Ancestor of a Binary Search Tree
  7. 查看SQL Server 备份信息
  8. myeclipse 写java代码提示 dead code 原因
  9. Jsp中获得集合List或Set的长度
  10. PHP中的数组方法及访问方法总结
  11. JFreeChart与AJAX+JSON+ECharts两种处理方式生成热词统计可视化图表
  12. 后台方庄List razor 循环
  13. bzoj1127[POI2008]KUP 悬线法
  14. OpenCV RGB2LAB执行效率测试
  15. 传统C/S软件的"断骨增高"
  16. python小知识点随笔
  17. Angular7.1.4+Typescript3.1框架学习(二)
  18. js array 数组添加与删除数据
  19. SQLServer 学习笔记之超详细基础SQL语句 Part 5
  20. October 23rd, 2017 Week 43rd Monday

热门文章

  1. jmeter使用复习
  2. Docker Remote API v1.24
  3. phantomjs抛出IOException
  4. jdk1.8新特性-Lambda表达式使用要点
  5. Appium基础环境搭建(windows)---基于python
  6. Thunder团队第六周 - Scrum会议3
  7. Java中I/O流之Object流
  8. TCP系列29—窗口管理&流控—3、Nagle算法
  9. 3ds Max学习日记(四)
  10. bsxfun函数