Spark RDD API实例 2015-03-12 19:00

map

Applies a transformation function on each item of the RDD and returns the result as a new RDD.

1
2
3
4
5
6
7
8
9
//3表示指定为3个Partitions
var a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
//以a各元素的长度建议新的RDD
var b = a.map(_.length)
//将两个RDD组合新一个新的RDD
var c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), 
(elephant,8))

zip

Joins two RDDs by combining the i-th of either partition with each other. The resulting RDD will consist of two-component tuples which are interpreted as key-va lue pairs by the methods provided by the PairRDDFunctions extension.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
var a1 = sc.parallelize(1 to 10, 3)
var b1 = sc.parallelize(11 to 20, 3)
a1.zip(b1).collect
res1: Array[(Int, Int)] = Array((1,11), (2,12), (3,13), (4,14), \
(5,15), (6,16), (7,17), (8,18), (9,19), (10,20))

var a2 = sc.parallelize(1 to 10, 3)
var b2 = sc.parallelize(11 to 20, 3)
var c2 = sc.parallelize(21 to 30, 3)
a2.zip(b2).zip(c2).collect
res3: Array[((Int, Int), Int)] = Array(((1,11),21), ((2,12),22),
((3,13),23), ((4,14),24), ((5,15),25), ((6,16),26), ((7,17),27),
((8,18),28), ((9,19),29), ((10,20),30))
a2.zip(b2).zip(c2).map((x) => (x._1._1, x._1._2, x._2 )).collect
res2: Array[(Int, Int, Int)] = Array((1,11,21), (2,12,22), (3,13,23),
(4,14,24), (5,15,25), (6,16,26), (7,17,27), (8,18,28), (9,19,29), (10,20,30))

filter

Evaluates a boolean function for each data item of the RDD and puts the items fo r which the function returned true into the resulting RDD.

1
2
3
4
val a = sc.parallelize(1 to 10, 3)
val b = a.filter(_ % 2 == 0)
b.collect
res4: Array[Int] = Array(2, 4, 6, 8, 10)

对于元素存储多种数据类型时,无法直接运用大于、小于、求余等符号进行计算过滤。此时 ,需要传入一个参数,办法不断的数据类型。详见: homepage.cs.latrobe.edu….

flatMap

Similar to map, but allows emitting more than one item in the map function. map是一个元素,变成另一个元素。flatMap是一个元素变成1个或多个元素。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
var a = sc.parallelize(1 to 10, 5)
a.flatMap(1 to _).collect
res8: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4,
5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8,
1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect
res9: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

var x  = sc.parallelize(1 to 5, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(5))(_)).collect
res10: Array[Int] = Array(1, 1, 2, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5)

mapPartitions

This is a specialized map that is called only once for each partition. The entir e content of the respective partitions is available as a sequential stream of va lues via the input argument (Iterarator[T]). The custom function must return yet another Iterator[U]. The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from th e following result due to the partitioning we chos 对每一个Partiion中的各个元素,以指定的函数进行处理,生成新的RDD。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
  var res = List[(T, T)]()
  var pre = iter.next
  while (iter.hasNext)
  {
    val cur = iter.next;
    res .::= (pre, cur)
    pre = cur;
  }
  res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

mapPartitionsWithIndex

Similar to mapPartitions, but takes two parameters. The first parameter is the i ndex of the partition and the second is an iterator through all the items within this partition. The output is an iterator containing the list of items after ap plying whatever transformation the function encodes.

1
2
3
4
5
6
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {
  iter.toList.map(x => index + "," + x).iterator
}
x.mapPartitionsWithIndex(myfunc).collect()
res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)

sample

Randomly selects a fraction of the items of a RDD and returns them in a new RDD.

1
2
3
4
5
6
7
8
9
val a = sc.parallelize(1 to 10000, 3)
a.sample(false, 0.1, 0).count
res24: Long = 960

a.sample(true, 0.3, 0).count
res25: Long = 2888

a.sample(true, 0.3, 13).count
res26: Long = 2985

union, ++

Performs the standard set operation: A union B. union,++是两个RDD中的元素,都直接作为新RDD的元素。zip是两个RDD中的元素组合成tupl e,tuple作为新RDD的元素。

1
2
3
4
val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect
res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)

intersection

Returns the elements in the two RDDs which are the same.

1
2
3
4
5
val x = sc.parallelize(1 to 20)
val y = sc.parallelize(10 to 30)
val z = x.intersection(y)
z.collect
res74: Array[Int] = Array(16, 12, 20, 13, 17, 14, 18, 10, 19, 15, 11)

distinct

Returns a new RDD that contains each unique value only once.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.distinct.collect
res6: Array[String] = Array(Dog, Gnu, Cat, Rat)

val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
a.distinct(2).partitions.length
res16: Int = 2

a.distinct(3).partitions.length
res17: Int = 3

groupBy

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val a = sc.parallelize(1 to 9, 3)
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect
res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), 
(odd,ArrayBuffer(1, 3, 5, 7, 9)))

val a = sc.parallelize(1 to 9, 3)
def myfunc(a: Int) : Int =
{
  a % 2
}
a.groupBy(myfunc).collect
res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), 
(1,ArrayBuffer(1, 3, 5, 7, 9)))

val a = sc.parallelize(1 to 9, 3)
def myfunc(a: Int) : Int =
{
  a % 2
}
a.groupBy(x => myfunc(x), 3).collect
a.groupBy(myfunc(_), 1).collect
res7: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), 
(1,ArrayBuffer(1, 3, 5, 7, 9)))

keyBy

Constructs two-component tuples (key-value pairs) by applying a function on each data item. The result of the function becomes the key and the original data item becomes the value of the newly created tuples.

1
2
3
4
5
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
b.collect
res26: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), 
(8,elephant))

groupByKey

Very similar to groupBy, but instead of supplying a function, the key-component of each pair will automatically be presented to the partitioner.

1
2
3
4
5
6
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length)
b.groupByKey.collect
res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), 
(6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), 
(5,ArrayBuffer(tiger, eagle)))

reduceByKey

This function provides the well-known reduce functionality in Spark. Please note that any function f you provide, should be commutative in order to generate reproducible results.

1
2
3
4
5
6
7
8
9
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

aggregate

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
val z = sc.parallelize(List(1,2,3,4,5,6), 2)
z.aggregate(0)(math.max(_, _), _ + _)
res40: Int = 9

val z = sc.parallelize(List("a","b","c","d","e","f"),2)
z.aggregate("")(_ + _, _+_)
res115: String = abcdef

z.aggregate("x")(_ + _, _+_)
res116: String = xxdefxabc

val z = sc.parallelize(List("12","23","345","4567"),2)
z.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
res141: String = 42

z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res142: String = 11

val z = sc.parallelize(List("12","23","345",""),2)
z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
res143: String = 10

aggregateByKey

sortByKey

This function sorts the input RDD's data and stores it in a new RDD. The output RDD is a shuffled RDD because it stores data that is output by a reducer which has been shuffled. The implementation of this function is actually very clever. First, it uses a range partitioner to partition the data in ranges within the shuffled RDD. Then it sorts these ranges individually with mapPartitions using standard sort mechanisms.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)
val c = a.zip(b)
c.sortByKey(true).collect
res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))
c.sortByKey(false).collect
res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))

val a = sc.parallelize(1 to 100, 5)
val b = a.cartesian(a)
val c = sc.parallelize(b.takeSample(true, 5, 13), 2)
val d = c.sortByKey(false)
res56: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4))

cogroup

cogroup对两个RDD数据集按key进行group by,并对每个RDD的value进行单独group by。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
b.cogroup(c).collect
res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
(2,(ArrayBuffer(b),ArrayBuffer(c))),
(3,(ArrayBuffer(b),ArrayBuffer(c))),
(1,(ArrayBuffer(b, b),ArrayBuffer(c, c)))
)

val d = a.map((_, "d"))
b.cogroup(c, d).collect
res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array(
(2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
(3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))),
(1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d)))
)

val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2)
val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2)
x.cogroup(y).collect
res23: Array[(Int, (Iterable[String], Iterable[String]))] = Array(
(4,(ArrayBuffer(kiwi),ArrayBuffer(iPad))),
(2,(ArrayBuffer(banana),ArrayBuffer())),
(3,(ArrayBuffer(orange),ArrayBuffer())),
(1,(ArrayBuffer(apple),ArrayBuffer(laptop, desktop))),
(5,(ArrayBuffer(),ArrayBuffer(computer))))

pipe

对每一个Partition的数据应用指定的shell命令,并输出到stdin:

1
2
3
val a = sc.parallelize(1 to 9, 3)
a.pipe("head -n 1").collect
res2: Array[String] = Array(1, 4, 7)

coalesce,repartition

调整RDD的Partition个数,生成新的RDD。repartition固定会执行shuffle操作,coalesce可以指定是否shuffle。

1
2
3
4
val y = sc.parallelize(1 to 10, 10)
val z = y.coalesce(2, false)
z.partitions.length
res9: Int = 2

参考文档

Apache Spark RDD API Examples.

Tags: #Spark    Post on Spark