Spark函数之name、setName等(一大波) 2015-08-25 23:00

name、setName

给RDD取一个名字。

1
2
3
4
5
6
val y = sc.parallelize(1 to 10, 10)
y.name
res13: String = null
y.setName("Fancy RDD Name")
y.name
res15: String = Fancy RDD Name

pip

对RDD的每一分区,通过指定的shell命令进行处理,生成新的RDD。

1
2
3
val a = sc.parallelize(1 to 9, 3)
a.pipe("head -n 1").collect
res2: Array[String] = Array(1, 4, 7)

randomSplit

按指定的比例,按一个大的RDD拆分成多个小的RDD。

1
2
3
4
5
6
7
8
val y = sc.parallelize(1 to 10)
val splits = y.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0)
val test = splits(1)
training.collect
res:85 Array[Int] = Array(1, 4, 5, 6, 8, 10)
test.collect
res86: Array[Int] = Array(2, 3, 7, 9)

reduce

知名的reduce函数。 对所有元素进行reduce操作。

1
2
3
val a = sc.parallelize(1 to 100, 3)
a.reduce(_ + _)
res41: Int = 5050

reduceByKey

类似于reduce。但对相同的key进行reduce操作。

1
2
3
4
val a = sc.parallelize(List("dog", "cat", "hello", "world", "hi"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res2: Array[(Int, String)] = Array((2,hi), (3,dogcat), (5,helloworld))

repartitionAndSortWithinPartitions

重分区,并保证分区内元素的顺序。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// first we will do range partitioning which is not sorted
val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), 
(4, "tv"), (1, "screen"), (5, "heater")), 3)
val rPartitioner = new org.apache.spark.RangePartitioner(3, randRDD)
val partitioned = randRDD.partitionBy(rPartitioner)
def myfunc(index: Int, iter: Iterator[(Int, String)]) : Iterator[String] = {
  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
partitioned.mapPartitionsWithIndex(myfunc).collect

res0: Array[String] = Array([partID:0, val: (2,cat)], [partID:0, val: (3,book)], 
[partID:0, val: (1,screen)], [partID:1, val: (4,tv)],
[partID:1, val: (5,heater)], [partID:2, val: (6,mouse)], [partID:2, val: (7,cup)])

// now lets repartition but this time have it sorted
val partitioned = randRDD.repartitionAndSortWithinPartitions(rPartitioner)
def myfunc(index: Int, iter: Iterator[(Int, String)]) : Iterator[String] = {
  iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}
partitioned.mapPartitionsWithIndex(myfunc).collect

res1: Array[String] = Array([partID:0, val: (1,screen)], [partID:0, val: (2,cat)], 
[partID:0, val: (3,book)], [partID:1, val: (4,tv)],
[partID:1, val: (5,heater)], [partID:2, val: (6,mouse)], [partID:2, val: (7,cup)])

sample

按比例数据采样,需要指定种子。第二个参数是采样比例。

1
2
3
4
5
6
7
8
9
val a = sc.parallelize(1 to 10000, 3)
a.sample(false, 0.1, 0).count
res24: Long = 960

a.sample(true, 0.3, 0).count
res25: Long = 2888

a.sample(true, 0.3, 13).count
res26: Long = 2985

sampleByKey

与sample类似,但指定了各个key在采样结果中的比例。

takeSample

类似于sample函数,也是抽样。但本函数指定的是抽样的个数,而不是抽样的比例。返回的是数组,而不是RDD。

1
2
3
val x = sc.parallelize(1 to 1000, 3)
x.takeSample(true, 10, 1)
res16: Array[Int] = Array(301, 418, 948, 706, 931, 844, 248, 895, 65, 172)

saveAsHadoopFile

将文件保存到HDFS上。

saveAsHadoopDataset

saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。

参考文档:RDD行动Action操作(6)–saveAsHadoopFile、saveAsHadoopDataset

saveAsTextFile、saveAsSequenceFile、saveAsObjectFile

保存为文件格式、Sequence文件格式、二进制格式。

sortBy

对RDD进行排序,可以指定升序或降低排序。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1))
y.sortBy(c => c, true).collect
res101: Array[Int] = Array(1, 1, 2, 3, 5, 7)

y.sortBy(c => c, false).collect
res102: Array[Int] = Array(7, 5, 3, 2, 1, 1)

val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5)))
z.sortBy(c => c._1, true).collect
res109: Array[(String, Int)] = Array((A,26), (H,10), (L,5), (Z,1))

z.sortBy(c => c._2, true).collect
res108: Array[(String, Int)] = Array((Z,1), (L,5), (H,10), (A,26))

sortByKey

对于key-value形式的RDD,按key进行排序。

1
2
3
4
5
val a = sc.parallelize(List("dog", "cat", "hello", "hi", ""))
val b = a.map(v => (v, v.length))
b.collect
b.sortByKey(true).collect
res4: Array[(String, Int)] = Array((cat,3), (cheyo,5), (dog,3), (hello,5), (hi,2))

union, ++

将两个RDD联合。

1
2
3
4
val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect
res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)

subtract

对两个RDD中,第一个RDD有,而第二个RDD没有的元素形成新的RDD。相当于一个减法操作。

1
2
3
4
5
val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b).sortBy(v => v, true)
c.collect
res5: Array[Int] = Array(4, 5, 6, 7, 8, 9)

subtractByKey

与subtract类似。但是对key-value型的RDD进行操作。只保留第一个RDD中有,而第二个RDD中没有的key。

1
2
3
4
5
6
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("ant", "falcon", "squid"), 2)
val d = c.keyBy(_.length)
b.subtractByKey(d).collect
res15: Array[(Int, String)] = Array((4,lion))

sum

对RDD求和。

1
2
3
val a = sc.parallelize(1 to 9, 3)
a.sum
res11: Double = 45.0

take

获取RDD的前n个元素。

1
2
3
val a = sc.parallelize(1 to 9, 3)
a.take(2)
res13: Array[Int] = Array(1, 2)

takeOrdered

获取RDD进行默认排序后,出现的前n个元素。

1
2
3
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.takeOrdered(2)
res14: Array[String] = Array(ape, cat)

toString

返回RDD描述信息。

1
2
c.toString
res24: String = MapPartitionsRDD[56] at subtract at <console>:25

toDebugString

输出RDD的调试信息和依赖关系。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.toDebugString
res18: String =
(3) MapPartitionsRDD[47] at subtract at <console>:25 []
 |  SubtractedRDD[46] at subtract at <console>:25 []
 +-(3) MapPartitionsRDD[44] at subtract at <console>:25 []
 |  |  ParallelCollectionRDD[42] at parallelize at <console>:21 []
 +-(3) MapPartitionsRDD[45] at subtract at <console>:25 []
    |  ParallelCollectionRDD[43] at parallelize at <console>:21 []

toJavaRDD

转换为JavaRDD。作用未知。

toLocalIterator

在master节点上,将RDD转换为scala的迭代器。

1
2
3
4
5
6
7
8
val z = sc.parallelize(List(1,2,3,4,5,6), 2)
val iter = z.toLocalIterator

iter.next
res51: Int = 1

iter.next
res52: Int = 2

top

用默认的排序方法,取出top的元素。

1
2
3
val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)
c.top(2)
res28: Array[Int] = Array(9, 8)

treeAggregate

跟aggregate差不多,但支持multi-level pattern(这个不理解)。

treeReduce

跟reduce差不多,但支持multi-level pattern(这个不理解)。

Tags: #Spark    Post on Spark-API