Spark函数之aggregateByKey 2015-08-20 21:02

aggregateByKey

aggregateByKey与aggregate类似。区别是:aggregateByKey把相关key的元素进行reduce计算,并且初始化只运用于Partition内的reduce操作。。

举例:

有2个Partition。

Parition 0: List((cat,2), (cat,5), (mouse,4))
Parition 1: List((cat,12), (dog,12), (mouse,2))
aggregate的初始值参数为:10。
第一个reduce函数为:乘积。
第二个reduce函数为:求和。

则计算过程如下:

Parition内:
    Parition 0:
        cat: 10(初始化) * 2 = 20
        cat: 20(上一步计算结果) * 5 = 100
        mouse: 10(初始值) * 4 = 40
    Parition 1:
        cat: 10(初始值) * 12 = 120
        dog: 10(初始值) * 12 = 120
        mouse: 10(初始值) * 2 = 20
第二个reduce:
    cat: 100(Partiion0结果) + 120(Partiion1结果) = 220
    dog: 120(Partiion1结果) = 120
    mouse: 40(Partiion0结果) + 20(Partiion1结果) = 60

代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
var z = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

//打印出各partition的数据
def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[(String, Int)] = {
  println("[partID:" +  index + ", val: " + iter.toList + "]")
  iter
}
z.mapPartitionsWithIndex(myfunc).collect

def myfunc1(a:Int, b:Int) : Int = {
    println("[myfunc1: " + a + "\t" + b + "]");
    a * b
}

def myfunc2(a:Int, b:Int) : Int = {
    println("[myfunc2: " + a + "\t" + b + "]");
    a + b
    //math.max(a, b)
}

z.aggregateByKey(10)(myfunc1, myfunc2).collect

输出:

[myfunc1: 20     5]
[myfunc1: 10     2]
[myfunc1: 10     4]
[myfunc1: 10     12]
[myfunc1: 10     12]
[myfunc1: 10     2]
res3: Array[(String, Int)] = Array((dog,120), (cat,220), (mouse,60))

通常写法:

1
2
var z = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
z.aggregateByKey(10)(_ * _, _ + _)
Tags: #Spark    Post on Spark-API