Spark应用开发实例 2015-03-12 11:00

说明

以下代码都是在spark-shell中执行的。

分析nginx日志

分析nginx的访问日志,统计出访问次数最多的IP地址.

nginx_access.log内容如下:

123.125.71.71 - - [11/Oct/2014:03:23:19 +0800] "GET /robo……
94.23.35.157 - - [11/Oct/2014:04:36:14 +0800] "GET /?p=29……
123.125.71.72 - - [11/Oct/2014:04:52:13 +0800] "GET /robo……
183.136.190.62 - - [11/Oct/2014:04:55:50 +0800] "GET / HT……
94.188.16.78 - - [11/Oct/2014:05:10:04 +0800] "GET /showt……
94.188.16.78 - - [11/Oct/2014:05:10:05 +0800] "GET / HTTP……
207.46.13.129 - - [11/Oct/2014:05:12:26 +0800] "GET /disc……
220.181.108.166 - - [11/Oct/2014:05:17:01 +0800] "GET /4.……
123.125.71.51 - - [11/Oct/2014:05:17:03 +0800] "GET /stat……
123.125.71.13 - - [11/Oct/2014:05:17:03 +0800] "GET /stat……
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
/* 读入HDFS文件 */
val logRDD = sc.textFile("hdfs://ctrl:9000/spark/test/1.log")
/* 从每行第一列提取出IP地址字符串, 并将每个记录形成 (IP,1)的形式 */
val ipRDD = logs.map( s => (s.split(" ",2)(0), 1))
/* 以每一个IP为key,进行规约求总数 */
val countRDD = ipRDD.reduceByKey((a, b) => a + b)
/* 按每个(IP,n)元组的第2列进行倒序排序 */
val sortRDD = countRDD.sortBy( c => c._2, false)
/* 将结果输出到HDFS文件中 */
sortRDD.saveAsTextFile("hdfs://ctrl:9000/spark/test/output1")

WordCount

1
2
3
4
5
6
var textRDD = sc.textFile("hdfs://ctrl:9000/spark/test/word.txt")
var wordRDD = textRDD.flatMap( text => text.toString().split("\\s+"))
var word2RDD = wordRDD.map(word => (word, 1))
/* 这样写也行:var countRDD = word2RDD.reduceByKey (_ + _) */
var countRDD = word2RDD.reduceByKey((x,y) => x + y)
var sortRDD = countRDD.sortBy(x => x._2, false)

TopK

基于WordCount的统计结果,分析出词频最大的5个单词。 将Key和value互换,然后基于key排序。

1
2
3
4
5
6
7
/* 注意,map后面是大括号 */
var sortRDD2 = countRDD.map{ case(k,v) => (v,k) }
/* 倒序,1个任务 */
var resultRDD = sortRDD2.sortByKey(false, 1)
/* 注意:top返回的是列表,不是RDD */
val top5 = resultRDD.top(5)
top5 .foreach(println)

参考

Apache Spark RDD API Examples

Tags: #Spark    Post on Spark