Spark函数之combineByKey 2015-08-21 21:01

说明

combineByKey用于将一个PairRDD按key进行聚合操作。

combineByKey的参数共有三个函数。

第一个函数:同一个Partition内第一次碰到一个key的处理函数。用于在遍历RDD的数据集合过程中,对于遍历到的(k,v)。如果combineByKey第一次遇到值为k的Key(类型K),那么将对这个(k,v)调用此函数。它的作用是将v转换为C(类型是C,聚合对象的类型,c作为局和对象的初始值)。

第二个函数:同一个Partition内不是第一次碰到一个key的处理函数。在遍历RDD的数据集合过程中,对于遍历到的(k,v),如果combineByKey不是第一次(或者第二次,第三次…)遇到值为k的Key(类型K),那么将对这个(k,v)调用此函数,它的作用是将v累加到聚合对象(类型C)中。此函数的类型是(C,V)=>C。

第三个函数:不同Partition相同key的聚合函数。因为combineByKey是在分布式环境下执行,RDD的每个分区单独进行combineByKey操作,最后需要对各个分区的结果进行最后的聚合,它的函数类型是(C,C)=>C,每个参数是分区聚合得到的聚合对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//  准备数据
var a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
var b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
var c = b.zip(a)

//打印出各partition的数据
def myfunc(index: Int, iter: Iterator[(Int,String)]) : Iterator[(Int,String)] = {
  println("[partID:" +  index + ", val: " + iter.toList + "]")
  iter
}
c.mapPartitionsWithIndex(myfunc).collect

// 同一个Partition内第一次碰到一个key的处理函数
def myfunc1(v: String): List[String] = {
    println("myfunc1 value: " + v)
    List(v)
}

// 同一个Partition内不是第一次碰到一个key的处理函数
def myfunc2(l: List[String], v: String): List[String] = {
    println("myfunc2 list: " + l + " value: " + v)
    v :: l
}

// 不同Partition相同key的聚合函数
def myfunc3(l1: List[String], l2: List[String]): List[String] = {
    println("myfunc3 list1: " + l1 + " list2: " + l2)
    l1 ::: l2
}

var d = c.combineByKey(myfunc1, myfunc2, myfunc3)
d.collect

输出:

[partID:1, val: List((2,salmon), (2,rabbit), (1,turkey))]
[partID:0, val: List((1,dog), (1,cat), (2,gnu))]
[partID:2, val: List((2,wolf), (2,bear), (2,bee))]

myfunc1 value: salmon
myfunc2 list: List(salmon) value: rabbit
myfunc1 value: turkey
myfunc1 value: dog
myfunc2 list: List(dog) value: cat
myfunc1 value: gnu
myfunc1 value: wolf
myfunc2 list: List(wolf) value: bear
myfunc2 list: List(bear, wolf) value: bee
myfunc3 list1: List(cat, dog) list2: List(turkey)
myfunc3 list1: List(gnu) list2: List(rabbit, salmon)
myfunc3 list1: List(gnu, rabbit, salmon) list2: List(bee, bear, wolf)
res5: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))

常规写法:

1
2
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.collect
Tags: #Spark    Post on Spark-API