Spark任务运行机制 2015-04-15 20:30

概念

  • Job 一个RDD Graph触发的作业,往往由Spark Action算子触发。在SparkContext中通过RunJob方法向Spark提交Job。
  • Stage 每一个Job会根据RDD的宽依赖关系被切成很多Stage,每个Stage中包含一组相同的Task,这一组Task也叫TaskSet。
  • Task 一个分区对应一个Task,Task执行RDD中对应Stage中包含的算子。Task被封装好后放入Executor的线程池中执行。

原理

1
2
3
4
5
6
var logRDD = sc.textFile("hdfs://ctrl:9000/spark/test/1.log")
var ipRDD = logRDD.map( s => (s.split(" ",2)(0), 1))
var countRDD = ipRDD.reduceByKey((a, b) => a + b)
var sortRDD = countRDD.sortBy( c => c._2, false)
sortRDD.saveAsTextFile("hdfs://ctrl:9000/spark/test/output5")
sortRDD.collect()

saveAsTextFile()和collect()这两个Action会分别触发各产生一个Job。saveAsTextFile()触发产生Job1,collect触发产生Job2.
Job根据依赖关系再切分成Stage。这里Job1依赖于Stage4,称为Final stage.Stage4又依赖于Stage3.由于transformation是lazy的,这里的Stage3还没计算。因此开始计算,定义为一个TaskSet,划分为两个Task——因为这个RDD有两个Partition。

原理图:

日志:

scala> var logRDD = sc.textFile("hdfs://ctrl:9000/spark/test/1.log")
logRDD: org.apache.spark.rdd.RDD[String] = hdfs://ctrl:9000/spark/test/1.log MappedRDD[1] at textFile at <console>:12

scala> var ipRDD = logRDD.map( s => (s.split(" ",2)(0), 1))
ipRDD: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[2] at map at <console>:14

scala> var countRDD = ipRDD.reduceByKey((a, b) => a + b)
15/04/15 20:33:48 INFO mapred.FileInputFormat: Total input paths to process : 1
countRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at <console>:16

scala> var sortRDD = countRDD.sortBy( c => c._2, false)
15/04/15 20:33:52 INFO spark.SparkContext: Starting job: sortBy at <console>:18
15/04/15 20:33:52 INFO scheduler.DAGScheduler: Registering RDD 2 (map at <console>:14)
15/04/15 20:33:52 INFO scheduler.DAGScheduler: Got job 0 (sortBy at <console>:18) with 2 output partitions (allowLocal=false)
15/04/15 20:33:52 INFO scheduler.DAGScheduler: Final stage: Stage 1(sortBy at <console>:18)
15/04/15 20:33:52 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 0)
15/04/15 20:33:52 INFO scheduler.DAGScheduler: Missing parents: List(Stage 0)
15/04/15 20:33:52 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[2] at map at <console>:14), which has no missing parents
15/04/15 20:33:52 INFO storage.MemoryStore: ensureFreeSpace(3368) called with curMem=246670, maxMem=280248975
15/04/15 20:33:52 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.3 KB, free 267.0 MB)
15/04/15 20:33:52 INFO storage.MemoryStore: ensureFreeSpace(2425) called with curMem=250038, maxMem=280248975
15/04/15 20:33:52 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.4 KB, free 267.0 MB)
15/04/15 20:33:52 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on ctrl:39700 (size: 2.4 KB, free: 267.2 MB)
15/04/15 20:33:52 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0
15/04/15 20:33:52 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838
15/04/15 20:33:52 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[2] at map at <console>:14)
15/04/15 20:33:52 INFO cluster.YarnClientClusterScheduler: Adding task set 0.0 with 2 tasks
15/04/15 20:33:52 INFO util.RackResolver: Resolved data03 to /default-rack
15/04/15 20:33:52 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, data02, NODE_LOCAL, 1286 bytes)
15/04/15 20:33:52 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, data01, NODE_LOCAL, 1286 bytes)
15/04/15 20:34:09 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on data02:56915 (size: 2.4 KB, free: 133.6 MB)
15/04/15 20:34:09 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on data01:39830 (size: 2.4 KB, free: 133.6 MB)
15/04/15 20:34:15 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on data01:39830 (size: 30.8 KB, free: 133.6 MB)
15/04/15 20:34:15 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on data02:56915 (size: 30.8 KB, free: 133.6 MB)
15/04/15 20:34:24 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 31900 ms on data01 (1/2)
15/04/15 20:34:24 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 31945 ms on data02 (2/2)
15/04/15 20:34:24 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/04/15 20:34:24 INFO scheduler.DAGScheduler: Stage 0 (map at <console>:14) finished in 31.952 s
15/04/15 20:34:24 INFO scheduler.DAGScheduler: looking for newly runnable stages
15/04/15 20:34:24 INFO scheduler.DAGScheduler: running: Set()
15/04/15 20:34:24 INFO scheduler.DAGScheduler: waiting: Set(Stage 1)
15/04/15 20:34:24 INFO scheduler.DAGScheduler: failed: Set()
15/04/15 20:34:24 INFO scheduler.DAGScheduler: Missing parents for Stage 1: List()
15/04/15 20:34:24 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[6] at sortBy at <console>:18), which is now runnable
15/04/15 20:34:24 INFO storage.MemoryStore: ensureFreeSpace(3152) called with curMem=252463, maxMem=280248975
15/04/15 20:34:24 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.1 KB, free 267.0 MB)
15/04/15 20:34:27 INFO storage.MemoryStore: ensureFreeSpace(2181) called with curMem=255615, maxMem=280248975
15/04/15 20:34:27 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.1 KB, free 267.0 MB)
15/04/15 20:34:28 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on ctrl:39700 (size: 2.1 KB, free: 267.2 MB)
15/04/15 20:34:28 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0
15/04/15 20:34:28 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838
15/04/15 20:34:28 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (MapPartitionsRDD[6] at sortBy at <console>:18)
15/04/15 20:34:28 INFO cluster.YarnClientClusterScheduler: Adding task set 1.0 with 2 tasks
15/04/15 20:34:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, data02, PROCESS_LOCAL, 1056 bytes)
15/04/15 20:34:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, data01, PROCESS_LOCAL, 1056 bytes)
15/04/15 20:34:28 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on data02:56915 (size: 2.1 KB, free: 133.6 MB)
15/04/15 20:34:28 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on data01:39830 (size: 2.1 KB, free: 133.6 MB)
15/04/15 20:34:28 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@data01:42744
15/04/15 20:34:28 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 155 bytes
15/04/15 20:34:28 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to sparkExecutor@data02:51925
15/04/15 20:34:28 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 254 ms on data01 (1/2)
15/04/15 20:34:29 INFO scheduler.DAGScheduler: Stage 1 (sortBy at <console>:18) finished in 0.721 s
15/04/15 20:34:29 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 723 ms on data02 (2/2)
15/04/15 20:34:29 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/04/15 20:34:29 INFO scheduler.DAGScheduler: Job 0 finished: sortBy at <console>:18, took 37.390423 s
sortRDD: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[8] at sortBy at <console>:18

scala>

scala> sortRDD.saveAsTextFile("hdfs://ctrl:9000/spark/test/output6")
15/04/15 20:35:06 INFO storage.BlockManager: Removing broadcast 2
15/04/15 20:35:06 INFO storage.BlockManager: Removing block broadcast_2
15/04/15 20:35:06 INFO storage.MemoryStore: Block broadcast_2 of size 3152 dropped from memory (free 279994331)
15/04/15 20:35:06 INFO storage.BlockManager: Removing block broadcast_2_piece0
15/04/15 20:35:06 INFO storage.MemoryStore: Block broadcast_2_piece0 of size 2181 dropped from memory (free 279996512)
15/04/15 20:35:06 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on ctrl:39700 in memory (size: 2.1 KB, free: 267.2 MB)
15/04/15 20:35:06 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0
15/04/15 20:35:06 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on data01:39830 in memory (size: 2.1 KB, free: 133.6 MB)
15/04/15 20:35:06 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on data02:56915 in memory (size: 2.1 KB, free: 133.6 MB)
15/04/15 20:35:06 INFO spark.ContextCleaner: Cleaned broadcast 2
15/04/15 20:35:06 INFO storage.BlockManager: Removing broadcast 1
15/04/15 20:35:06 INFO storage.BlockManager: Removing block broadcast_1_piece0
15/04/15 20:35:06 INFO storage.MemoryStore: Block broadcast_1_piece0 of size 2425 dropped from memory (free 279998937)
15/04/15 20:35:06 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on ctrl:39700 in memory (size: 2.4 KB, free: 267.2 MB)
15/04/15 20:35:06 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0
15/04/15 20:35:06 INFO storage.BlockManager: Removing block broadcast_1
15/04/15 20:35:06 INFO storage.MemoryStore: Block broadcast_1 of size 3368 dropped from memory (free 280002305)
15/04/15 20:35:06 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on data02:56915 in memory (size: 2.4 KB, free: 133.6 MB)
15/04/15 20:35:06 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on data01:39830 in memory (size: 2.4 KB, free: 133.6 MB)
15/04/15 20:35:06 INFO spark.ContextCleaner: Cleaned broadcast 1
15/04/15 20:35:07 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/04/15 20:35:07 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/04/15 20:35:07 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/04/15 20:35:07 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/04/15 20:35:07 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/04/15 20:35:07 INFO spark.SparkContext: Starting job: saveAsTextFile at <console>:21
15/04/15 20:35:07 INFO scheduler.DAGScheduler: Registering RDD 4 (sortBy at <console>:18)
15/04/15 20:35:07 INFO scheduler.DAGScheduler: Got job 1 (saveAsTextFile at <console>:21) with 2 output partitions (allowLocal=false)
15/04/15 20:35:07 INFO scheduler.DAGScheduler: Final stage: Stage 4(saveAsTextFile at <console>:21)
15/04/15 20:35:07 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 3)
15/04/15 20:35:07 INFO scheduler.DAGScheduler: Missing parents: List(Stage 3)
15/04/15 20:35:07 INFO scheduler.DAGScheduler: Submitting Stage 3 (MappedRDD[4] at sortBy at <console>:18), which has no missing parents
15/04/15 20:35:07 INFO storage.MemoryStore: ensureFreeSpace(3072) called with curMem=246670, maxMem=280248975
15/04/15 20:35:07 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.0 KB, free 267.0 MB)
15/04/15 20:35:07 INFO storage.MemoryStore: ensureFreeSpace(2193) called with curMem=249742, maxMem=280248975
15/04/15 20:35:07 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.1 KB, free 267.0 MB)
15/04/15 20:35:07 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on ctrl:39700 (size: 2.1 KB, free: 267.2 MB)
15/04/15 20:35:07 INFO storage.BlockManagerMaster: Updated info of block broadcast_3_piece0
15/04/15 20:35:07 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:838
15/04/15 20:35:07 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 3 (MappedRDD[4] at sortBy at <console>:18)
15/04/15 20:35:07 INFO cluster.YarnClientClusterScheduler: Adding task set 3.0 with 2 tasks
15/04/15 20:35:07 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 4, data01, PROCESS_LOCAL, 1045 bytes)
15/04/15 20:35:07 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 3.0 (TID 5, data02, PROCESS_LOCAL, 1045 bytes)
15/04/15 20:35:07 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on data02:56915 (size: 2.1 KB, free: 133.6 MB)
15/04/15 20:35:07 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on data01:39830 (size: 2.1 KB, free: 133.6 MB)
15/04/15 20:35:07 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 4) in 112 ms on data01 (1/2)
15/04/15 20:35:07 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 3.0 (TID 5) in 116 ms on data02 (2/2)
15/04/15 20:35:07 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool
15/04/15 20:35:07 INFO scheduler.DAGScheduler: Stage 3 (sortBy at <console>:18) finished in 0.115 s
15/04/15 20:35:07 INFO scheduler.DAGScheduler: looking for newly runnable stages
15/04/15 20:35:07 INFO scheduler.DAGScheduler: running: Set()
15/04/15 20:35:07 INFO scheduler.DAGScheduler: waiting: Set(Stage 4)
15/04/15 20:35:07 INFO scheduler.DAGScheduler: failed: Set()
15/04/15 20:35:07 INFO scheduler.DAGScheduler: Missing parents for Stage 4: List()
15/04/15 20:35:07 INFO scheduler.DAGScheduler: Submitting Stage 4 (MappedRDD[9] at saveAsTextFile at <console>:21), which is now runnable
15/04/15 20:35:08 INFO storage.MemoryStore: ensureFreeSpace(112760) called with curMem=251935, maxMem=280248975
15/04/15 20:35:08 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 110.1 KB, free 266.9 MB)
15/04/15 20:35:08 INFO storage.MemoryStore: ensureFreeSpace(67506) called with curMem=364695, maxMem=280248975
15/04/15 20:35:08 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 65.9 KB, free 266.9 MB)
15/04/15 20:35:08 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on ctrl:39700 (size: 65.9 KB, free: 267.2 MB)
15/04/15 20:35:08 INFO storage.BlockManagerMaster: Updated info of block broadcast_4_piece0
15/04/15 20:35:08 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:838
15/04/15 20:35:08 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 4 (MappedRDD[9] at saveAsTextFile at <console>:21)
15/04/15 20:35:08 INFO cluster.YarnClientClusterScheduler: Adding task set 4.0 with 2 tasks
15/04/15 20:35:08 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 6, data02, PROCESS_LOCAL, 1056 bytes)
15/04/15 20:35:08 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 7, data01, PROCESS_LOCAL, 1056 bytes)
15/04/15 20:35:08 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on data01:39830 (size: 65.9 KB, free: 133.6 MB)
15/04/15 20:35:08 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on data02:56915 (size: 65.9 KB, free: 133.6 MB)
15/04/15 20:35:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to sparkExecutor@data01:42744
15/04/15 20:35:08 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 156 bytes
15/04/15 20:35:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to sparkExecutor@data02:51925
15/04/15 20:35:41 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 4.0 (TID 7) in 33257 ms on data01 (1/2)
15/04/15 20:35:41 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 6) in 33264 ms on data02 (2/2)
15/04/15 20:35:41 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 4.0, whose tasks have all completed, from pool
15/04/15 20:35:41 INFO scheduler.DAGScheduler: Stage 4 (saveAsTextFile at <console>:21) finished in 33.262 s
15/04/15 20:35:41 INFO scheduler.DAGScheduler: Job 1 finished: saveAsTextFile at <console>:21, took 34.568625 s

scala> sortRDD.collect()
15/04/15 20:35:54 INFO spark.SparkContext: Starting job: collect at <console>:21
15/04/15 20:35:54 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 155 bytes
15/04/15 20:35:54 INFO scheduler.DAGScheduler: Got job 2 (collect at <console>:21) with 2 output partitions (allowLocal=false)
15/04/15 20:35:54 INFO scheduler.DAGScheduler: Final stage: Stage 7(collect at <console>:21)
15/04/15 20:35:54 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 6)
15/04/15 20:35:54 INFO scheduler.DAGScheduler: Missing parents: List()
15/04/15 20:35:54 INFO scheduler.DAGScheduler: Submitting Stage 7 (MappedRDD[8] at sortBy at <console>:18), which has no missing parents
15/04/15 20:35:54 INFO storage.MemoryStore: ensureFreeSpace(2784) called with curMem=432201, maxMem=280248975
15/04/15 20:35:54 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 2.7 KB, free 266.9 MB)
15/04/15 20:35:54 INFO storage.MemoryStore: ensureFreeSpace(2031) called with curMem=434985, maxMem=280248975
15/04/15 20:35:54 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2031.0 B, free 266.8 MB)
15/04/15 20:35:54 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on ctrl:39700 (size: 2031.0 B, free: 267.2 MB)
15/04/15 20:35:54 INFO storage.BlockManagerMaster: Updated info of block broadcast_5_piece0
15/04/15 20:35:54 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:838
15/04/15 20:35:54 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 7 (MappedRDD[8] at sortBy at <console>:18)
15/04/15 20:35:54 INFO cluster.YarnClientClusterScheduler: Adding task set 7.0 with 2 tasks
15/04/15 20:35:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 7.0 (TID 8, data02, PROCESS_LOCAL, 1056 bytes)
15/04/15 20:35:54 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 7.0 (TID 9, data01, PROCESS_LOCAL, 1056 bytes)
15/04/15 20:35:54 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on data01:39830 (size: 2031.0 B, free: 133.6 MB)
15/04/15 20:35:54 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on data02:56915 (size: 2031.0 B, free: 133.6 MB)
15/04/15 20:35:54 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 7.0 (TID 9) in 88 ms on data01 (1/2)
15/04/15 20:35:54 INFO scheduler.DAGScheduler: Stage 7 (collect at <console>:21) finished in 0.095 s
15/04/15 20:35:54 INFO scheduler.DAGScheduler: Job 2 finished: collect at <console>:21, took 0.133171 s
15/04/15 20:35:54 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 7.0 (TID 8) in 98 ms on data02 (2/2)
15/04/15 20:35:54 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 7.0, whose tasks have all completed, from pool
res1: Array[(String, Int)] = Array((94.188.16.78,2), (220.181.108.166,1), (123.125.71.13,1), (123.125.71.51,1), (183.136.190.62,1), (123.125.71.71,1), (94.23.35.157,1), (123.125.71.72,1), (207.46.13.129,1))

scala>
Tags: #Spark    Post on Spark