Spark-从wordCount到job调度过程

以wordCount为例, 研究学习spark(版本2.1.0)的整个job调度过程,整理总结如下:

WordCount

首先, 一个简单的wordCount程序

1
2
3
4
5
val rawFile = sc.textFile("README.md")
val words = rawFile.flatMap(w => w.split(" "))
val wordNum = words.map(w => (w, 1))
val wordCount = wordNum.reduceByKey(_ + _)
wordCount.collect

我是用idea + spark-shell断点调试spark源码的, 可以一行代码一行代码的追执行过程, 调试方法可见我的另一篇文章 Spark-断点调试.

第一行

1
val rawFile = sc.textFile("README.md")

调用的SparkContext的textFile方法, 看源码:

1
2
3
4
5
6
7
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}

可以看出此处先是hadoopFile方法读取hdfs上的一个README.md文件, 并生成了一个HadoopRDD, 随后又调用map方法, 生成了一个MapPartitionsRDD.

执行结果:

1
2
scala> val rawFile = sc.textFile("README.md")
rawFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[3] at textFile at <console>:24

第二行

1
val words = rawFile.flatMap(w => w.split(" "))

此处调用了MapPartitionsRDD继承自RDD类的flatMap方法, 源码:

1
2
3
4
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

flatMap方法可以将RDD中的每一个元素进行一对多转换, 所以此处使用flatMap方法将读入的内容按空格分割, 每个单词成为一个元素, 转变完仍为MapPartitionsRDD.

第三行

1
val wordNum = words.map(w => (w, 1))

此处调用了MapPartitionsRDD继承自RDD类的map方法, 见源码:

1
2
3
4
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

map方法将RDD中类型为T的元素一对一映射为类型为U的元素, 所以此处我们要统计的单个单词被转换为了(w, 1)形式的键值对, 进过此步转换仍为MapPartitionsRDD.

第四行

1
val wordCount = wordNum.reduceByKey(_ + _)

这次调用的reduceByKey方法不在RDD类里, 而在PairRDDFunctions类, 这里发生了一个隐式转换, 将MapPartitionsRDD转换成了PairRDDFunctions, 方法源码:

1
2
3
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}

reduceByKey按key把相同单词加到一起, 得出每个单词出现的频率.

第五行

1
wordCount.collect

到第四行为止, 所有任务并没有执行, 只到第五步, 调用RDD的collect方法, 会调用sc.runJob, 源码:

1
2
3
4
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}

从这里开始生成Job并提交到Spark集群中运行, 至此才引出我们研究的重点, Job的整个调度过程.

此处调用的是SparkContext的runJob方法, 在SparkContext中重载了很多runJob方法, 通过一连串的runJob间调用, 设置了RDD, function, 分区数, 匿名函数转换等, 最后到了最重要的DAGScheduler.runJob.

Jobd调度流程

1. DAGScheduler提交Job

DAGScheduler最重要的任务之一就是分析依赖关系划分Stage, 而发起job调度入口有两个, 一个是submitJob:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}

val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}

assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}

它返回一个JobWaiter对象, 可以用在异步调用中.
另一个入口就是runJob:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
// Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
// which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
// due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
// safe to pass in null here. For more detail, see SPARK-13747.
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}

runJob在内部调用submitJob, 阻塞等待直到Job完成或失败.
从submitJob方法里可以看到, 在此处向eventProcessLoop里发送了一个JobSubmitted的消息.
那么eventProcessLoop是什么呢

1
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

这就是DAGScheduler自己维护的一个消息队列, 处理各种类型的消息, 当收到JobSubmitted消息时会调用handleJobSubmitted方法, 在这个方法里开始重要的第二步, 分析继承关系拆分Stages.

2. 拆分提交Stages

在handleJobSubmitted方法中, 首先会根据这个RDD的信息计算出这个Job的所有Stages.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))

val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}

可以看到createResultStage方法, 生成了一个ResultStage, 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Create a ResultStage associated with the provided jobId.
*/
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

其中getOrCreateParentStages方法根据依赖关系拆分了Stage, 返回了一个List[Stage]又传入了ResultStage中, 拆分Stage部分的代码我就不贴出来了, 感兴趣可以自行阅读.
handleJobSubmitted方法中得到finalStage后, 进行了一系列操作, 构建ActiveJob, 启动Job, 最后提交了Stage, 准备开始生成真正下发执行的Task任务.

那么看submitStage方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}

递归提交Stage, 有parent的先提交parent, 没有parent的才开始生成Task.

3. 创建提交Task

创建提交Task调用的是submitStage方法里的submitMissingTasks方法, 这个方法代码比较长, 我就不全部贴出来了.
Stage分ShuffleMapStage和ResultStage, Task也分为ShuffleMapTask和ResultTask两种, 方法里导出都是模式匹配分别处理这两种Stage, 关键生成Task的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}

生成Task之后通过TaskScheduler把Task提交.

1
2
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

submitTasks方法的实现在TaskSchedulerimpl.scala, 这里首先创建了一个TaskSetManager来辅助调度, 然后调用了SchedulerBackend的reviveOffers方法去申请资源.

4. 分配executors

这里reviveOffers方法的实现跳到了CoarseGrainedSchedulerBackend.scala文件:

1
2
3
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}

此处发送了一条ReviveOffers消息, 被自身接收到然后继续处理:

1
2
3
4
override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
makeOffers()
}

makeOffers方法很重要, 这里调用了resourceOffers方法去获取当前可用的资源信息, 而当前正在执行的多个TaskSet会根据这些资源信息将当前可执行的Task和这个Task要运行在哪个executor上包装到一个TaskDescription中返回回来, 再调用launchTasks正式把Task推倒executor端去执行.

1
2
3
4
5
6
7
8
9
private def makeOffers(executorId: String) {
// Filter out executors under killing
if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
val workOffers = IndexedSeq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
launchTasks(scheduler.resourceOffers(workOffers))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
"spark.rpc.message.maxSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK

logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}

在launchTasks方法中才把executors资源真的分配给Task并把分配掉的资源扣除, 然后把Task序列化后发送往executor端.

5. executor执行Task

接下来程序就运行到了CoarseGrainedExecutorBackend.scala的receive方法, 这里接收到driver端发来的LaunchTask消息开始触发执行, 关键代码:

1
2
3
4
5
6
7
8
9
10
11
override def receive: PartialFunction[Any, Unit] = {
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
}

这里首先把Task反序列化, 然后交给Executor.scala的launchTask方法:

1
2
3
4
5
6
7
8
9
10
11
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}

这里new了一个TaskRunner, 继续执行TaskRunner的run方法, run方法代码很长就不贴了, 这里就是具体执行Task的实现, 可以自己去看源码.

6. 执行结果返回

当run方法执行完以后, 把结果数据序列化返回, 如果数据过大, 就把数据写磁盘返回数据的位置, 通过statusUpdate方法回传给
CoarseGrainedExecutorBackend.scala, executorBackend再发送了一条StatusUpdate消息把结果返回给了CoarseGrainedSchedulerBackend.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
}

driver端收到消息后, 先把结果传给了TaskScheduler, 然后释放了executor资源.
接下来到TaskScheduler之后调用比较绕, 首先把Task清理掉, 然后使用TaskResultGetter来处理结果:

1
2
3
4
5
6
7
8
9
if (TaskState.isFinished(state)) {
cleanupTaskState(tid)
taskSet.removeRunningTask(tid)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}

在TaskResultGetter中判断结果数据的存放位置, 如果在内存中就直接取结果, 如果在磁盘, 就根据blockid信息去对应机器上拉取数据, 然后放到driver的内存, 最后调用handleSuccessfulTask方法把结果返回给TaskScheduler.

1
scheduler.handleSuccessfulTask(taskSetManager, tid, result)

接下来用到了之前辅助调度创建的TaskSetManager

1
2
3
4
5
6
def handleSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_]): Unit = synchronized {
taskSetManager.handleSuccessfulTask(tid, taskResult)
}

TaskScheduler调用TaskSetManager, TaskSetManager再调用DAGScheduler, 并将结果数据返回给了DAGScheduler.

1
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)

taskEnded方法向DAGScheduler维护的队列里发送了一个CompletionEvent消息, 来触发DAGScheduler的handleTaskCompletion方法来数据数据.
handleTaskCompletion方法里会判断这是一个ShuffleMapTask还是一个ResultTask, 如果是ShuffleMapTask则继续提交下一个Stage, 如果是ResultTask, 则会通过以下代码把结果交给JobWaiter.

1
job.listener.taskSucceeded(rt.outputId, event.result)

JobWaiter最后做一些处理, 然后把结果一路返回给调用SparkContext.runJob的地方, 至此整个Job调度就完成了.

总结

下面用几张图做一个总结:

Job的调度执行流程

整个Job的执行流程

Job提交执行期间的函数调用
此处输入图片的描述

ps:
最近学习Spark的Job调度过程, 看了一遍源码后发现扭头就忘, 所以就整理了下来. Spark代码实在量太大, Job执行的有些细节实现也没自己研究, 只是把大体流程梳理了下来, 如有错误欢迎指正.

打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  • © 2017-2023 王丹鹏
  • Powered by Hexo Theme Ayer
  • 冀ICP备15029707号

请我喝杯咖啡吧~

支付宝
微信