Spark函数之aggregate 2015-08-20 21:01

aggregate

aggregate通过两个reduce函数对RDD进行聚合。一个用于每一个partition内的数据进行reduce,生成单一的一个结果。另一个用于将不同partition的reduce结果进行最终的reduce。

aggregate还有一个参数是初始值,这个函数会被应用于两次的reduce运算之中。

举例:

有3个Partition。

Parition 0: 1、2
Parition 1: 3、4
Parition 2: 5、6
aggregate的初始值参数为:10。
第一个reduce函数为:乘积。
第二个reduce函数为:求和。

则计算过程如下:

第一个reduce:
    Parition 0: 10 * 1 * 2 = 20
    Parition 1: 10 * 3 * 4 = 120
    Parition 2: 10 * 5 * 6 = 300
第二个reduce:
    10 + 20 + 120 + 300 = 450

代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var z = sc.parallelize(List(1,2,3,4,5,6), 3)

//打印出各partition的数据
def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[Int] = {
  println("---------------[partID:" +  index + ", val: " + iter.toList + "]")
  iter
}
z.mapPartitionsWithIndex(myfunc).collect


def myfunc1(a:Int, b:Int) : Int = {
    println("[myfunc1: " + a + "\t" + b + "]");
    a * b
}

def myfunc2(a:Int, b:Int) : Int = {
    println("[myfunc2: " + a + "\t" + b + "]");
    a + b
    //math.max(a, b)
}


z.aggregate(10)(myfunc1, myfunc2)

输出:

[myfunc1: 10     1]
[myfunc1: 10     2]
[myfunc1: 10     3]
[myfunc1: 30     4]
[myfunc2: 10     120]
[myfunc2: 130    20]
[myfunc1: 10     5]
[myfunc1: 50     6]
[myfunc2: 150    300]
res3: Int = 450

通常写法:

1
2
3
var z = sc.parallelize(List(1,2,3,4,5,6), 3)
z.aggregate(10)(_ * _, _ + _)
res40: Int = 9
Tags: #Spark    Post on Spark-API