DAGSchduler主要创建job, 根据宽窄依赖切分stage, 提交stage个TaskScheduler
一、DAGScheduler的创建在SparkContext中创建
@volatile private var _dagScheduler: DAGScheduler = _
//getter setter
private[spark] def dagScheduler: DAGScheduler = _dagScheduler
private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = {
_dagScheduler = ds
}
//创建
_dagScheduler = new DAGScheduler(this)
1.1、DAGScheduler数据结构
主要维护jobId和stageId的关系,Stage,ActiveJob,以及缓存的RDD的partitions的位置信息
private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] def numTotalJobs: Int = nextJobId.get()
private val nextStageId = new AtomicInteger(0)
private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
/**
* Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for
* that dependency. Only includes stages that are part of currently running job (when the job(s)
* that require the shuffle stage complete, the mapping will be removed, and the only record of
* the shuffle data will be in the MapOutputTracker).
*/
private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]
// Stages we are running right now
private[scheduler] val runningStages = new HashSet[Stage]
// Stages that must be resubmitted due to fetch failures
private[scheduler] val failedStages = new HashSet[Stage]
private[scheduler] val activeJobs = new HashSet[ActiveJob]
/**
* Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids
* and its values are arrays indexed by partition numbers. Each array value is the set of
* locations where that RDD partition is cached.
*
* All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).
*/
private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
// For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
// every task. When we detect a node failing, we note the current epoch number and failed
// executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
//
// TODO: Garbage collect information about failure epochs when we know there are no more
// stray messages to detect.
private val failedEpoch = new HashMap[String, Long]
private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator
// A closure serializer that we reuse.
// This is only safe because DAGScheduler runs in a single thread.
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)