Spark性能优化(2)——广播变量、本地缓存目录、RDD操作、数据倾斜 2015-06-24 21:00

广播变量

背景

一般Task大小超过10K时(Spark官方建议是20K),需要考虑使用广播变量进行优化。
大表小表Join,小表使用广播的方式,减少Join操作。

参考:Spark广播变量与累加器

Local Dir

背景

shuffle过程中,临时数据需要写入本地磁盘。本地磁盘的临时目录通过参数spark.local.dir配置。

性能优化点

spark.local.dir支持配置多个目录。配置spark.local.dir有多个目录,每个目录对应不同的磁盘,这样可以提升IO效率。另外,可以采用IO性能较高的磁盘作为local dir的磁盘。

注意:

  • 如果使用YARN、Mesos等资源框架,此参数应该通过相应资源框架的参数来设置。
  • 如果只有一个磁盘,配置了多个目录,性能提升不大。

RDD操作:使用MapPartitions替代Map

性能优化点

map方法是对RDD的每一条记录逐一操作。mapPartitons是对整个RDD,以迭代器的方式逐一操作。比如对条记录的开销较大,比如需要连接、断开数据库。使用map方法需要对每一条记录都连接、断开数据库,效率差。此时,可以改用mapPartitons操作,只需要整个Partition连接、断开一次数据库即可。

1
rdd.map{x => conn=getDBConn;conn.write(x.toString);conn.close}

改为:

1
rdd.mapPartitions(records => conn.getDBConn;for(item <- records) write(item.toString); conn.close)

RDD操作:使用coalesce减小空运行的任务数量

性能优化点

  • 场景一

当对RDD进行多次过滤时,可能会形成很多空的、无数据的Partition。通过调用coalesce方法,可以减小Task个数。让有的Task可以同时管理多个Partition。

  • 场景二

当任务数过多的时候,Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时,可以通过调用coalesce方法,可以减小Task个数,让程序得以继续运行。

coalesce()方法接受一个参数,为减小后的目标Partition个数。

RDD操作:collect

Collect操作会将Executor的数据发送的Driver端。需要确保Driver有足够的内存。Driver的内存通过参数spark.driver.memory参数进行配置。

RDD操作:使用reduceByKey替代groupByKey

reduceByKey会在Map端做本地聚合,而groupByKey等Shuffle操作不会再Map端做聚合。 能使用reduceByKey的地方尽量使用该方式,避免出现.groupByKey().map(x=>(x.1,x.2.size))

  • 举例

对于数据

2015-05-01 13:00:00,B101,MEILIN
2015-05-01 10:04:20,B101,GUANLAN
2015-05-01 09:18:00,F301,MEILIN
2015-05-01 12:00:00,B107,WUHE
2015-05-01 18:20:00,F301,WUHE
2015-05-02 12:00:02,T442,GUANLAN
2015-05-01 07:00:00,B101,GUANLAN
2015-05-01 21:31:00,M721,WUHE
2015-05-01 09:00:00,Z007,MEILIN

现在要统计各个车牌(第二列)出现的次数,则应使用:

1
2
3
4
5
var dataRDD = sc.textFile("file:///tmp/data.txt")
var data2RDD = dataRDD.map(s => s.split(","))
var data3RDD = data2RDD.map( a => (a(1),1) )
var data4RDD = data3RDD.reduceByKey(_ + _)
data4RDD.collect

而不是:

1
2
3
4
5
6
var dataRDD = sc.textFile("file:///tmp/data.txt")
var data2RDD = dataRDD.map(s => s.split(","))
var data3RDD = data2RDD.map( a => (a(1),Array(a(0),a(2))) )
var data4RDD = data3RDD.groupByKey()
var data5RDD = data4RDD.map(x => (x._1,x._2.size))
data5RDD.collect

避免数据倾斜

如何检测数据倾斜?

现象:没有GC,各Task执行时间严重不一致。

性能优化点

  • 重新设计key,以更小粒度的key使得Task大小合理化。
  • 有时提升并行度,有助于解决数据倾斜
Tags: #Spark    Post on Spark