一、缓存与持久化机制

与RDD类似,Spark Streaming也可以让开发人员手动控制,将数据流中的数据持久化到内存中。对DStream调用persist()方法,就可以让Spark Streaming自动
将该数据流中的所有产生的RDD,都持久化到内存中。如果要对一个DStream多次执行操作,那么,对DStream持久化是非常有用的。因为多次操作,可以共享
使用内存中的一份缓存数据。 对于基于窗口的操作,比如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操作,比如updateStateByKey,默认就隐式开启了持久化机制。即Spark Streaming
默认就会将上述操作产生的Dstream中的数据,缓存到内存中,不需要开发人员手动调用persist()方法。 对于通过网络接收数据的输入流,比如socket、Kafka、Flume等,默认的持久化级别,是将数据复制一份,以便于容错。相当于是,用的是类似MEMORY_ONLY_SER_2。 与RDD不同的是,默认的持久化级别,统一都是要序列化的。

二、Checkpoint机制

1、Checkpoint机制概述

每一个Spark Streaming应用,正常来说,都是要7 * 24小时运转的,这就是实时计算程序的特点。因为要持续不断的对数据进行计算。因此,对实时计算应用的要求,
应该是必须要能够对与应用程序逻辑无关的失败,进行容错。 如果要实现这个目标,Spark Streaming程序就必须将足够的信息checkpoint到容错的存储系统上,从而让它能够从失败中进行恢复。有两种数据需要被进行checkpoint: 1、元数据checkpoint——将定义了流式计算逻辑的信息,保存到容错的存储系统上,比如HDFS。当运行Spark Streaming应用程序的Driver进程所在节点失败时,该信息
可以用于进行恢复。元数据信息包括了:
1.1 配置信息——创建Spark Streaming应用程序的配置信息,比如SparkConf中的信息。
1.2 DStream的操作信息——定义了Spark Stream应用程序的计算逻辑的DStream操作信息。
1.3 未处理的batch信息——那些job正在排队,还没处理的batch信息。 2、数据checkpoint——将实时计算过程中产生的RDD的数据保存到可靠的存储系统中。 对于一些将多个batch的数据进行聚合的,有状态的transformation操作,这是非常有用的。在这种transformation操作中,生成的RDD是依赖于之前的batch的RDD的,
这会导致随着时间的推移,RDD的依赖链条变得越来越长。 要避免由于依赖链条越来越长,导致的一起变得越来越长的失败恢复时间,有状态的transformation操作执行过程中间产生的RDD,会定期地被checkpoint到可靠的
存储系统上,比如HDFS。从而削减RDD的依赖链条,进而缩短失败恢复时,RDD的恢复时间。 一句话概括,元数据checkpoint主要是为了从driver失败中进行恢复;
而RDD checkpoint主要是为了,使用到有状态的transformation操作时,能够在其生产出的数据丢失时,进行快速的失败恢复。

2、何时启用Checkpoint机制?

1、使用了有状态的transformation操作——比如updateStateByKey,或者reduceByKeyAndWindow操作,被使用了,那么checkpoint目录要求是必须提供的,
也就是必须开启checkpoint机制,从而进行周期性的RDD checkpoint。 2、要保证可以从Driver失败中进行恢复——元数据checkpoint需要启用,来进行这种情况的恢复。 要注意的是,并不是说,所有的Spark Streaming应用程序,都要启用checkpoint机制,如果即不强制要求从Driver失败中自动进行恢复,又没使用有状态
的transformation操作,那么就不需要启用checkpoint。事实上,这么做反而是有助于提升性能的。 如何启用Checkpoint机制? 1、对于有状态的transformation操作,启用checkpoint机制,定期将其生产的RDD数据checkpoint,是比较简单的。 可以通过配置一个容错的、可靠的文件系统(比如HDFS)的目录,来启用checkpoint机制,checkpoint数据就会写入该目录。使用StreamingContext的checkpoint()方法即可。
然后,你就可以放心使用有状态的transformation操作了。 2、如果为了要从Driver失败中进行恢复,那么启用checkpoint机制,是比较复杂的。需要改写Spark Streaming应用程序。 当应用程序第一次启动的时候,需要创建一个新的StreamingContext,并且调用其start()方法,进行启动。当Driver从失败中恢复过来时,需要从checkpoint目录中记录的
元数据中,恢复出来一个StreamingContext。

3、为Driver失败的恢复机制重写程序

###Java###
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
JavaStreamingContext jssc = new JavaStreamingContext(...);
JavaDStream<String> lines = jssc.socketTextStream(...);
jssc.checkpoint(checkpointDirectory);
return jssc;
}
}; JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
context.start();
context.awaitTermination();

###scala####
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...)
val lines = ssc.socketTextStream(...)
ssc.checkpoint(checkpointDirectory)
ssc
} val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
context.start()
context.awaitTermination()

4、配置spark-submit提交参数

按照上述方法,进行Spark Streaming应用程序的重写后,当第一次运行程序时,如果发现checkpoint目录不存在,那么就使用定义的函数来第一次创建一个StreamingContext,
并将其元数据写入checkpoint目录;当从Driver失败中恢复过来时,发现checkpoint目录已经存在了,那么会使用该目录中的元数据创建一个StreamingContext。 但是上面的重写应用程序的过程,只是实现Driver失败自动恢复的第一步。第二步是,必须确保Driver可以在失败时,自动被重启。 要能够自动从Driver失败中恢复过来,运行Spark Streaming应用程序的集群,就必须监控Driver运行的过程,并且在它失败时将它重启。对于Spark自身的standalone模式,
需要进行一些配置去supervise driver,在它失败时将其重启。 首先,要在spark-submit中,添加--deploy-mode参数,默认其值为client,即在提交应用的机器上启动Driver;但是,要能够自动重启Driver,就必须将其值设置为cluster;
此外,需要添加--supervise参数。 使用上述第二步骤提交应用之后,就可以让driver在失败时自动被重启,并且通过checkpoint目录的元数据恢复StreamingContext。

5、Checkpoint的说明

将RDD checkpoint到可靠的存储系统上,会耗费很多性能。当RDD被checkpoint时,会导致这些batch的处理时间增加。因此,checkpoint的间隔,需要谨慎的设置。
对于那些间隔很多的batch,比如1秒,如果还要执行checkpoint操作,则会大幅度削减吞吐量。而另外一方面,如果checkpoint操作执行的太不频繁,那就会导致
RDD的lineage变长,又会有失败恢复时间过长的风险。 对于那些要求checkpoint的有状态的transformation操作,默认的checkpoint间隔通常是batch间隔的数倍,至少是10秒。使用DStream的checkpoint()方法,可以
设置这个DStream的checkpoint的间隔时长。通常来说,将checkpoint间隔设置为窗口操作的滑动间隔的5~10倍,是个不错的选择。

最新文章

  1. Fiddler学习笔记
  2. Mersenne twister 随机数算法实现 in Scheme
  3. cellmap for iphone
  4. Python 文件处理
  5. CENTOS 6.4 安装oracle 10g,手工建库及升级到10.2.0.5
  6. Java中方法与数组
  7. BZOJ 1657 奶牛的歌声
  8. Codeforces 682 D. Alyona and Strings (dp)
  9. Flume连接Kafka的broker出错
  10. 被Oracle全局暂时表坑了
  11. c# datagridviewcomboboxcell值无效的解决办法
  12. 统计函数:MAX,MIN,SUM,AVG,COUNT
  13. POJ 3154 Graveyard【多解,数论,贪心】
  14. 关于hibernate中hql语句 case when的写法
  15. Struts2深入
  16. Java的基础知识一
  17. 浅探网络1---tcp协议详解(三次握手和四次挥手)
  18. [CSL 的魔法][求排序最少交换次数]
  19. SpringBoot整合redis哨兵主从服务
  20. 处女座和小姐姐(三)-数位dp1.0

热门文章

  1. Scala 系列(三)—— 流程控制语句
  2. springboot 通过@WebFilter(urlPatterns )配置Filter过滤路径
  3. IQueryable,IEnumerable,IList区别
  4. ASP.NET Nlog上手练习小例子
  5. ServiceStack JWT 准备
  6. 单IP、网络、别名管道限速的设置
  7. Java基础之枚举类型Enum的使用
  8. IntelliJ idea鼠标移动到类上显示文档document(javadoc)内容
  9. workermanPHP聊天框架项目windows环境部署实践
  10. Hybris产品主数据的价格折扣维护