Spark DataFrame使用(1.3.1 & Python) 2015-05-21 21:00

通过文件创建DataFrame

通过【json文件】创建DataFrame

  • 通过SqlContext.jsonFile()接口
1
2
3
4
sqlContext = SQLContext(sc)
path = "file:///tmp/test/jsonfile"
# peopleDF是一个DataFrame
peopleDF = sqlContext.jsonFile(path)
  • 通过SqlContext.load接口
1
peopleDF = sqlContext.load("people.json", "json")
  • 保存为【json文件】
1
peopleDF.save("file:///tmp/test/outjson", "json")

通过【textfile】创建DataFrame

  • 通过SqlContext.inferSchema方法
1
2
3
4
5
6
path3 = "file:///tmp/test/textfile"
linesRDD = sc.textFile(path3)
partsRDD = linesRDD.map(lambda l: l.split(","))
peopleRDD = partsRDD.map(lambda p: Row(name=p[0], age=int(p[1])))
schemaPeopleDF = sqlContext.inferSchema(peopleRDD)
schemaPeopleDF.show()
  • 通过指定Schema
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from pyspark.sql import SQLContext
from pyspark.sql.types import *

path3 = "file:///tmp/test/textfile"
linesRDD = sc.textFile(path3)
partsRDD = linesRDD.map(lambda l: l.split(","))
peopleRDD = partsRDD.map(lambda p: (p[0], p[1].strip()))
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
schemaPeopleDF = sqlContext.createDataFrame(peopleRDD, schema)
schemaPeopleDF.show()

通过【parquet文件】创建DataFrame

1
df = sqlContext.load("file:///tmp/test/parquetfile/part-r-00001.parquet", "parquet")

或者

1
df = sqlContext.load("file:///tmp/test/parquetfile/part-r-00001.parquet")

SqlContext.load()的第二个参数是数据源的格式,如果没有指定此参数,默认是Parquet。

或者

1
df2 = sqlContext.parquetFile("file:///tmp/test/parquetfile/part-r-00001.parquet")
  • 保存为【Parquet文件】
1
df.saveAsParquetFile("file:///tmp/test/outparquet2")

通过【Hive表】创建DataFrame

1
2
3
4
5
6
7
8
# sc is an existing SparkContext.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

# Queries can be expressed in HiveQL.
hiveDataFrame = sqlContext.sql("FROM house SELECT key, value")
hiveDataFrame.show()
###DataFrame操作

注意:默认Spark不支持HiveTable,需要自行重新编译Spark代码。

通过RDD创建DataFrame

  • 通过SqlContext提供的方法
1
2
3
4
anotherPeopleRDD = sc.parallelize([
  '{"name":"cheyo","address":{"city":"Columbus","state":"Ohio"}}'])
anotherPeopleDF = sqlContext.jsonRDD(anotherPeopleRDD)
anotherPeopleDF.show()

通过JDBC创建DataFrame

需要先将MySQL的JDBC驱动加入SPARK_CLASSPATH中:

SPARK_CLASSPATH=/opt/spark/lib/mysql-connector-java-5.1.22-bin.jar pyspark
1
2
3
4
sqlContext = SQLContext(sc)
df = sqlContext.load(source="jdbc", \
    url="jdbc:mysql://localhost:3306/LemonHouse?user=root&password=123123",\
    dbtable="house_datastat")

DataFrame操作

  • 基本操作
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# peopleDF是一个DataFrame
peopleDF = sqlContext.jsonFile(path)
# 输出表结构
peopleDF.printSchema()
# 输出表数据
peopleDF.show()
peopleDF.select("name").show()
# 选择所有年龄大于21岁的人,只保留name字段
peopleDF.filter(peopleDF.age > 21).select("name").show()
# 选择name,并把age字段自增
peopleDF.select(peopleDF.name, peopleDF.age + 1).show()
# 按年龄分组计数
peopleDF.groupBy("age").count().show()
  • 其他操作
1
2
3
4
path2 = "file:///tmp/test/jsonfile2"
peopleDF2 = sqlContext.jsonFile(path2)
# 左联表
peopleDF.join(peopleDF2, peopleDF.name == peopleDF2.name, "left").show()

DataFrame执行SQL查询

我们也可以把DataFrame对象转化为一个虚拟的表,然后用SQL语句查询:

1
2
3
4
5
peopleDF.registerTempTable("people")
# DataFrame[name: string]
# teenagers是一个DataFrame
teenagersDF = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
teenagersDF.show()

通过DataFrame创建RDD

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# PythonRDD
teenNamesRDD = teenagersDF.map(lambda p: "Name: " + p.name)
for teenName in teenNamesRDD.collect():
  print teenName

# 以下验证未通过,原因未知  
teenagersDF2 = sqlContext.sql("SELECT name,age FROM people WHERE age >= 13 AND age <= 19")
teenagersDF2.show()
teenagersRDD2 = teenagersDF2.map(lambda p: Row(name=p.name,age=p.age))
for teen2 in teenagersRDD2.collect():
  print teen2

合并Schema

Spark可以识别Hive中的分区,将多个分区中的数据合并成一个表格:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from pyspark.sql import SQLContext, Row

df1 = sqlContext.createDataFrame(sc.parallelize(range(1, 6))\
                                   .map(lambda i: Row(single=i, double=i * 2)))
df1.save("file:///tmp/test/data/test_table/key=1", "parquet")

df2 = sqlContext.createDataFrame(sc.parallelize(range(6, 11))\
                                   .map(lambda i: Row(single=i, double=i * 2)))
df2.save("file:///tmp/test/data/test_table/key=2", "parquet")

df3 = sqlContext.parquetFile("file:///tmp/test/data/test_table")
df3.printSchema()
df3.show()

参考文档

  1. Spark DataFrame小试牛刀
Tags: #Spark #SparkSQL #Python    Post on Spark