Spark SQL实例(1.2.1) 2015-03-16 17:00

说明

以下代码都是在spark-shell中执行的。

数据准备

house.txt内容如下:

1,1272792,3301,33,88.92
2,1272823,3302,33,88.92
3,1273080,3303,33,88.91
4,1272884,3304,33,76.10
5,1272920,3305,33,129.70
6,1272791,3201,32,88.92
7,1272822,3202,32,88.92
8,1272853,3203,32,88.91
9,1272883,3204,32,76.10
10,1272919,3205,32,129.70
……

将数据上传至HDFS:

hdfs dfs -put house.txt /spark/sql/

运行Spark SQL程序

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around 
// this limit,
// you can use custom classes that implement the Product interface.
case class House(sn: Int, id: Int, name: String, floor: String, size1: Float)

// Create an RDD of Person objects and register it as a table.
var house = sc.textFile("hdfs://ctrl:9000/spark/sql/house.txt").map(_.split(","))
    .map(p => House(
          p(0).trim.toInt, 
          p(1).trim.toInt, 
          p(2).trim, 
          p(3).trim, 
          p(4).trim.toFloat))

house.registerTempTable("house")

// SQL statements can be run by using the sql methods provided by sqlContext.
val resultRDD = sqlContext.sql("SELECT id,size1 FROM house WHERE size1 >= 250")

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
resultRDD.saveAsTextFile("hdfs://ctrl:9000/spark/sql/output")
Tags: #Spark    Post on Spark