Spark函数之map、mapPartitions、mapPartitionsWithContext和mapPartitionsWithIndex 2015-08-24 21:02

map

最常用的一个函数。对RDD中的每一个元素,分别调用指定的函数,进行生成新的值。

1
2
3
4
5
6
7
8
scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"))
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[133] at parallelize at <console>:21

scala> val b = a.map(_ + "=")
b: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[134] at map at <console>:23

scala> b.collect
res86: Array[String] = Array(dog=, tiger=, lion=, cat=, panther=, eagle=)

mapPartitions

类似于map。但不是对RDD中的每一个元素分别调用指定的函数。而是以分区为单位,以一个分区调用一次指定的函数进行变换。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10))
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
  var res = List[Int]()
  while (iter.hasNext) {
    val cur = iter.next;
    val new_value = cur + 20
    res = new_value :: res
  }
  res.iterator
}
x.mapPartitions(myfunc).sortBy(e => (e,true)).collect
res91: Array[Int] = Array(21, 22, 23, 24, 25, 26, 27, 28, 29, 30)

mapPartitionsWithContext

与mapPartitions类似,但带了上下文信息参数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import org.apache.spark.TaskContext

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10))
def myfunc(tc: TaskContext, iter: Iterator[Int]) : Iterator[Int] = {
    tc.addOnCompleteCallback(() => println(
        "Partition: "     + tc.partitionId +
        ", AttemptID: "   + tc.attemptId ))

    iter.toList.filter(_ % 2 == 0).iterator
}
x.mapPartitionsWithContext(myfunc).sortBy(e => (e,true)).collect

Partition: 0, AttemptID: 0
Partition: 1, AttemptID: 1
Partition: 1, AttemptID: 3
Partition: 0, AttemptID: 2
res1: Array[Int] = Array(2, 4, 6, 8, 10)

mapPartitionsWithIndex

与mapPartitions类似,但带了区分序号。

1
2
3
4
5
6
7
8
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10))
def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {
  iter.toList.map(x => "[" + index + "]" + (x + 20)).iterator
}
x.mapPartitionsWithIndex(myfunc).sortBy(e => (e,true)).collect

res3: Array[String] = Array([0]21, [0]22, [0]23, [0]24, [0]25, 
[1]26, [1]27, [1]28, [1]29, [1]30)
Tags: #Spark    Post on Spark-API