Spark函数之zip、zipParititions、zipWithIndex和zipWithUniqueId 2015-08-21 21:00

zip

将两个RDD按元素的顺序进行联合。新的RDD中每一个元素都是一个元组。元组是来自两个RDD的元素。

注意:两个RDD的元素个数需要相同,才能进行zip操作。

1
2
3
var a = sc.parallelize(1 to 10, 2)
var b = sc.parallelize(List("a","b","c","d","e","f","g","h","i","j"), 2)
a.zip(b).collect

输出:

res2: Array[(Int, String)] = Array((1,a), (2,b), (3,c), (4,d), (5,e), (6,f), (7,g), (8,h), (9,i), (10,j))

zipParititions

类似于zip。但更灵活。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
val a = sc.parallelize(0 to 9, 3)
val b = sc.parallelize(10 to 19, 3)
val c = sc.parallelize(100 to 109, 3)
def myfunc(aiter: Iterator[Int], biter: Iterator[Int], citer: Iterator[Int]): Iterator[String] =
{
  var res = List[String]()
  while (aiter.hasNext && biter.hasNext && citer.hasNext)
  {
    val x = aiter.next + " " + biter.next + " " + citer.next
    res ::= x
  }
  res.iterator
}
a.zipPartitions(b, c)(myfunc).collect
res50: Array[String] = Array(2 12 102, 1 11 101, 0 10 100, 5 15 105, 4 14 104, 
3 13 103, 9 19 109, 8 18 108, 7 17 107, 6 16 106)

zipWithIndex

1
2
3
val z = sc.parallelize(Array("A", "B", "C", "D"))
val r = z.zipWithIndex
res110: Array[(String, Long)] = Array((A,0), (B,1), (C,2), (D,3))

zipWithUniqueId

与zipWithIndex类似,但不按RDD中的元素顺序生成顺号。

1
2
3
4
val z = sc.parallelize(Array("A", "B", "C", "D"))
val r = z.zipWithUniqueId
r.collect
res25: Array[(String, Long)] = Array((A,0), (B,2), (C,1), (D,3))
Tags: #Spark    Post on Spark-API