
private[streaming] val scheduler = new JobScheduler(this) //JobScheduler.scala
private val jobGenerator = new JobGenerator(this) //JobGenerator.scala,SparkStreaming最核心的一句,用离散化来模拟流处理
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventActor ! GenerateJobs(new Time(longTime))) //以batchDuration为间隔,不断发起jobs,这是整个SparkStreaming的发动机 //JobGenerator.eventActor对GenerateJobs event处理逻辑
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
Try(graph.generateJobs(time)) match {
case Success(jobs) => jobScheduler.runJobs(time, jobs)
case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e)
eventActor ! DoCheckpoint(time)
} //这里两步,首先是调用DStreamGraph.generateJobs生成jobs, 然后使用JobScheduler.runJobs去执行job
def generateJobs(time: Time): Seq[Job] = {
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
} //DStream.generateJob
//DStream中的实现,可以看到stream处理最终仍然是转化为context.sparkContext.runJob(rdd, emptyFunc)来执行
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
Some(new Job(time, jobFunc))
case None => None
} //JobScheduler.runJobs
def runJobs(time: Time, jobs: Seq[Job]) {
if (jobs.isEmpty) {
logInfo("No jobs added for time " + time)
} else {
val jobSet = new JobSet(time, jobs) //创建JobSet
jobSets.put(time, jobSet)
jobSet.jobs.foreach(job => executor.execute(new JobHandler(job))) //启动executor线程执行JobHandler
logInfo("Added jobs for time " + time)
} //JobScheduler.JobHandler
private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
job.run() //这里job.run只是向Spark提交一个job,具体的事情Spark会去做
eventActor ! JobCompleted(job)
} //Job.run
class Job(val time: Time, func: () => _) {
var id: String = _
var result: Try[_] = null def run() {
result = Try(func())



* Save each RDD in this DStream as at text file, using string representation
* of elements. The file name at each batch interval is generated based on
* `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
def saveAsTextFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
} /**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit) {
//封装output函数, 并使用DStream.register将outputStream注册到DStreamGragh中去
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
} //org.apache.spark.streaming.dstream
class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) { override def dependencies = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Unit]] = None override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
foreachFunc(rdd, time)
Some(new Job(time, jobFunc))
case None => None


override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
val blockIds = ssc.scheduler.networkInputTracker.getBlockIds(id, validTime) //从networkInputTracker获取InputDStream已经产生的blockids
Some(new BlockRDD[T](ssc.sc, blockIds)) //封装成BlockRDD
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
} //NetworkInputTracker.getBlockIds
/** Return all the blocks received from a receiver. */
def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized { //虽然时间作为参数,其实是返回所有该InputDStream的所有blockids
val queue = receivedBlockIds.synchronized {
receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]())
val result = queue.synchronized {
queue.dequeueAll(x => true) //dequeue所有,和时间产生无关,不会check RDD的时间和block之间的关系
logInfo("Stream " + receiverId + " received " + result.size + " blocks")


* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
* the jobs and runs them using a thread pool.
class JobScheduler(val ssc: StreamingContext) extends Logging { private val jobSets = new ConcurrentHashMap[Time, JobSet] // 缓存没有被run的jobset,即pending,当job run成功后会从jobSets中删除
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) // 决定StreamingJob的并发度
private val executor = Executors.newFixedThreadPool(numConcurrentJobs) // 创建executor线程池
private val jobGenerator = new JobGenerator(this)
val clock = jobGenerator.clock
val listenerBus = new StreamingListenerBus() // These two are created only when scheduler starts.
// eventActor not being null means the scheduler has been started and not stopped
var networkInputTracker: NetworkInputTracker = null
private var eventActor: ActorRef = null def start() = synchronized {
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { // 创建eventActor来后台处理JobSchedulerEvent
def receive = {
case event: JobSchedulerEvent => processEvent(event)
}), "JobScheduler")
networkInputTracker = new NetworkInputTracker(ssc) // 初始化和启动NetworkInputTracker,开始从input读取数据
jobGenerator.start() // 启动JobGenerator,开始提交job
logInfo("JobScheduler started")
} def runJobs(time: Time, jobs: Seq[Job]) {
if (jobs.isEmpty) {
logInfo("No jobs added for time " + time)
} else {
val jobSet = new JobSet(time, jobs)
jobSets.put(time, jobSet)
jobSet.jobs.foreach(job => executor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + time)
} private def processEvent(event: JobSchedulerEvent) {
try {
event match {
case JobStarted(job) => handleJobStart(job)
case JobCompleted(job) => handleJobCompletion(job)
case ErrorReported(m, e) => handleError(m, e)
} catch {
} private def handleJobStart(job: Job) {
val jobSet = jobSets.get(job.time)
if (!jobSet.hasStarted) {
} private def handleJobCompletion(job: Job) {
job.result match {
case Success(_) =>
val jobSet = jobSets.get(job.time)
if (jobSet.hasCompleted) {
case Failure(e) =>
} private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
eventActor ! JobCompleted(job)


* This class generates jobs from DStreams as well as drives checkpointing and cleaning
* up DStream metadata.
class JobGenerator(jobScheduler: JobScheduler) extends Logging { private val ssc = jobScheduler.ssc
private val graph = ssc.graph
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, // 不断的GenerateJobs,SparkStreaming的心脏
longTime => eventActor ! GenerateJobs(new Time(longTime)))
private lazy val checkpointWriter =
if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
} else {
} // eventActor is created when generator starts.
// This not being null means the scheduler has been started and not stopped
private var eventActor: ActorRef = null /** Start generation of jobs */
def start() = synchronized {
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { // 创建actor用于后台处理JobGeneratorEvent
def receive = {
case event: JobGeneratorEvent =>
}), "JobGenerator")
if (ssc.isCheckpointPresent) {
} else {
} /** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time) => doCheckpoint(time)
case ClearCheckpointData(time) => clearCheckpointData(time)
} /** Starts the generator for the first time */
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
logInfo("JobGenerator started at " + startTime)
} /** Restarts the generator based on the information in checkpoint */
private def restart() {
// If manual clock is being used for testing, then
// either set the manual clock to the last checkpointed time,
// or if the property is defined set it to that time
if (clock.isInstanceOf[ManualClock]) {
val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
} val batchDuration = ssc.graph.batchDuration // Batches when the master was down, that is,
// between the checkpoint and current restart time
val checkpointTime = ssc.initialCheckpoint.checkpointTime
val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
val downTimes = checkpointTime.until(restartTime, batchDuration) // Batches that were unprocessed before failure
val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
// Reschedule jobs for these times
val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) // Restart the timer
} /** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) { // generateJobs
Try(graph.generateJobs(time)) match {
case Success(jobs) => jobScheduler.runJobs(time, jobs)
case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e)
eventActor ! DoCheckpoint(time)
} /** Clear DStream metadata for the given `time`. */
private def clearMetadata(time: Time) {
eventActor ! DoCheckpoint(time)
} /** Clear DStream checkpoint data for the given `time`. */
private def clearCheckpointData(time: Time) {
} /** Perform checkpoint for the give `time`. */
private def doCheckpoint(time: Time) = synchronized {
if (checkpointWriter != null && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
checkpointWriter.write(new Checkpoint(ssc, time))

用于track job中的inputStreams和outputStreams,并做为DStream workflow对外的接口

final private[streaming] class DStreamGraph extends Serializable with Logging {

  private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]() var rememberDuration: Duration = null
var checkpointInProgress = false var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null def start(time: Time) {
this.synchronized {
if (zeroTime != null) {
throw new Exception("DStream graph computation already started")
zeroTime = time
startTime = time
} def addInputStream(inputStream: InputDStream[_]) {
this.synchronized {
inputStreams += inputStream
} def addOutputStream(outputStream: DStream[_]) {
this.synchronized {
outputStreams += outputStream
} def generateJobs(time: Time): Seq[Job] = {
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time)) //只是对outputStream调用
} def clearMetadata(time: Time) {
logDebug("Clearing metadata for time " + time)
this.synchronized {
logDebug("Cleared old metadata for time " + time)
} def updateCheckpointData(time: Time) {
logInfo("Updating checkpoint data for time " + time)
this.synchronized {
logInfo("Updated checkpoint data for time " + time)
} def clearCheckpointData(time: Time) {
logInfo("Clearing checkpoint data for time " + time)
this.synchronized {
logInfo("Cleared checkpoint data for time " + time)
} def restoreCheckpointData() {
logInfo("Restoring checkpoint data")
this.synchronized {
logInfo("Restored checkpoint data")


