Spark API使用实例(1) 2015-08-25 21:00

需求

数据结构

cbd`4`57
nby`5`55
cbd`2`32
jjf`4`53
nby`7`47
cbd`6`37
jjf`1`63
cbd`9`56
nby`3`99
hur`2`45
jjf`2`42
cbd`1`59
hur`6`54
jjf`8`66
jjf`8`33
hur`9`41
gfe`7`19
abd`5`46
gfe`8`09
gfe`6`33
jjf`3`35
abd`9`64
gfe`3`34
abd`1`50
hur`3`62
abd`8`43
hur`5`58
hur`4`65
jjf`7`39
gfe`2`48
jjf`4`38
hur`6`52
jjf`5`51
cbd`1`60

即:x`x`x

要求:第一个x相同的归为一类,然后按照第二个x从小到大排序.然后检测第三个x是否为递减。最终输出满足条件的第一列x。

方案

我这里想到一个方法。请参考是否合适。

思路如下:

  1. 排序(各列的宽度是固定的)
  2. 映射成类似(nby,(7,47))的数据结构
  3. 以第一列为key,进行groupByKey操作。形成类似(nby,CompactBuffer((3,99), (5,55), (7,47))))这样的数据结构。
  4. 再进行过滤操作。在自定义的函数中,判断第三列是否递减。过滤出递减的元素。
  5. 提取出满足条件的记录,对应的key,输出key

代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.util.collection.CompactBuffer

def myfunc(ele: ((String, Iterable[(String, String)]))) : Boolean = {
    var iter = ele._2.iterator
    var pre = iter.next
    var isSorted = true
    while (iter.hasNext && isSorted) {
        val cur = iter.next;
        if (cur._2.toInt >= pre._2.toInt) {
            isSorted = false
        }
    }
    isSorted
}

var fileRDD = sc.textFile("file:///tmp/test.data")
fileRDD.sortBy(line => line, true).
    map(line => line.toString().split("`")).
    map(arr => (arr(0),(arr(1),arr(2)))).
    groupByKey.
    filter(myfunc).
    keys.
    foreach(println)
Tags: #Spark    Post on Spark