Checkpoint机制

通过前期对Spark Streaming的理解,我们知道,Spark Streaming应用程序如果不手动停止,则将一直运行下去,在实际中应用程序一般是24小时*7天不间断运行的,因此Streaming必须对诸如系统错误、JVM出错等与程序逻辑无关的错误(failures )具体很强的弹性,具备一定的非应用程序出错的容错性。Spark Streaming的Checkpoint机制便是为此设计的,它将足够多的信息checkpoint到某些具备容错性的存储系统如HDFS上,以便出错时能够迅速恢复。有两种数据可以chekpoint:

(1)Metadata checkpointing 
将流式计算的信息保存到具备容错性的存储上如HDFS,Metadata Checkpointing适用于当streaming应用程序Driver所在的节点出错时能够恢复,元数据包括: 
Configuration(配置信息) - 创建streaming应用程序的配置信息 
DStream operations - 在streaming应用程序中定义的DStreaming操作 
Incomplete batches - 在列队中没有处理完的作业

(2)Data checkpointing 
将生成的RDD保存到外部可靠的存储当中,对于一些数据跨度为多个bactch的有状态tranformation操作来说,checkpoint非常有必要,因为在这些transformation操作生成的RDD对前一RDD有依赖,随着时间的增加,依赖链可能会非常长,checkpoint机制能够切断依赖链,将中间的RDD周期性地checkpoint到可靠存储当中,从而在出错时可以直接从checkpoint点恢复。

具体来说,metadata checkpointing主要还是从drvier失败中恢复,而Data Checkpoing用于对有状态的transformation操作进行checkpointing

http://blog.csdn.net/wisgood/article/details/55667612

http://www.cnblogs.com/dt-zhw/p/5664663.html

import java.io.File
import java.nio.charset.Charset import com.google.common.io.Files import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam /**
* Counts words in text encoded with UTF8 received from the network every second.
*
* Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
* data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
* <output-file> file to which the word counts will be appended
*
* <checkpoint-directory> and <output-file> must be absolute paths
*
* To run this on your local machine, you need to first run a Netcat server
*
* `$ nc -lk 9999`
*
* and run the example as
*
* `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
* localhost 9999 ~/checkpoint/ ~/out`
*
* If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
* a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
* checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
* the checkpoint data.
*
* Refer to the online documentation for more details.
*/
object RecoverableNetworkWordCount { def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String)
: StreamingContext = { //程序第一运行时会创建该条语句,如果应用程序失败,则会从checkpoint中恢复,该条语句不会执行
println("Creating new context")
val outputFile = new File(outputPath)
if (outputFile.exists()) outputFile.delete()
val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount").setMaster("local[4]")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds())
ssc.checkpoint(checkpointDirectory) //将socket作为数据源
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, )).reduceByKey(_ + _)
wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
println(counts)
println("Appending to " + outputFile.getAbsolutePath)
Files.append(counts + "\n", outputFile, Charset.defaultCharset())
})
ssc
}
//将String转换成Int
private object IntParam {
def unapply(str: String): Option[Int] = {
try {
Some(str.toInt)
} catch {
case e: NumberFormatException => None
}
}
}
def main(args: Array[String]) {
if (args.length != ) {
System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
System.err.println(
"""
|Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
| <output-file>. <hostname> and <port> describe the TCP server that Spark
| Streaming would connect to receive data. <checkpoint-directory> directory to
| HDFS-compatible file system which checkpoint data <output-file> file to which the
| word counts will be appended
|
|In local mode, <master> should be 'local[n]' with n >
|Both <checkpoint-directory> and <output-file> must be absolute paths
""".stripMargin
)
System.exit()
}
val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
//getOrCreate方法,从checkpoint中重新创建StreamingContext对象或新创建一个StreamingContext对象
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(ip, port, outputPath, checkpointDirectory)
})
ssc.start()
ssc.awaitTermination()
}
}

最新文章

  1. map集合键值对存储,键值不重复,值可以重复
  2. SPFA+寻路(行路难,洛谷2832)
  3. phpStydy配置memcache扩展
  4. 兼容IE的写法收集||bug修复
  5. Nhibernate基础
  6. JavaScript BOM 遗漏知识再整理;弹窗和记时事件;
  7. 关于JS的一些代码效果图
  8. java比较相等符
  9. redis基础的字符串类型
  10. MySQL的char和varchar
  11. C++ 安全单例模式总结
  12. 【LeetCode】73. Set Matrix Zeroes
  13. Java9相关资料(JShell简易教程等)
  14. Dubbo入门—搭建一个最简单的Demo框架
  15. 转载:[Java面经]干货整理, Java面试题(覆盖Java基础,Java高级,JavaEE,数据库,设计模式等)
  16. 学习MySQL过程中的随笔一
  17. Web.xml中Filter过滤器标签几个说明
  18. Metasploit one test
  19. MyBatis #{} 取值注意事项
  20. Codewars笔记

热门文章

  1. NET MVC全局异常处理(一) 【转载】网站遭遇DDoS攻击怎么办 使用 HttpRequester 更方便的发起 HTTP 请求 C#文件流。 Url的Base64编码以及解码 C#计算字符串长度,汉字算两个字符 2019周笔记(2.18-2.23) Mysql语句中当前时间不能直接使用C#中的Date.Now传输 Mysql中Count函数的正确使用
  2. 中小研发团队架构实践之生产环境诊断工具WinDbg 三分钟学会.NET微服务之Polly 使用.Net Core+IView+Vue集成上传图片功能 Fiddler原理~知多少? ABP框架(asp.net core 2.X+Vue)模板项目学习之路(一) C#程序中设置全局代理(Global Proxy) WCF 4.0 使用说明 如何在IIS上发布,并能正常访问
  3. appium简明教程(7)——Desired Capabilities详解
  4. apache提示没有设置 max-age or expires解决办法
  5. 在ToolStrip中加入具有更好体验性的DateTimePicker
  6. 从github下载某个git库的4种方法[zz]
  7. linux(centos6)搭建ftp服务器 -摘自网络
  8. supervisor 完整安装步骤
  9. Android Studio 1.1.0 切换主题和绑定 代码提示 快捷键
  10. mysql获得60天前unix时间示例