Spark序列化与压缩 2015-04-25 17:00

序列化

当使用SparkContext的saveAsObjectFile方法将对象序列化到文件,以及通过objectFile方法将对象从文件反序列出来的时候. Spark默认使用Java的序列化以及反序列化机制,通常情况下,这种序列化机制是很低效的,Spark支持使用Kyro作为对象的序列化和反序列化机制,序列化的速度比java更快。

  • Spark默认是使用Java的ObjectOutputStream框架,它支持所有的继承于java.io.Serializable序列化,如果想要进行调优的话,可以通过继承java.io.Externalizable。这种格式比较大,而且速度慢。

  • Spark还支持这种方式Kryo serialization,它的速度快,而且压缩比高于Java的序列化,但是它不支持所有的Serializable格式,并且需要在程序里面注册。它需要在实例化SparkContext之前进行注册。

样例代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package net.cheyo

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import org.apache.spark.serializer.KryoRegistrator

//同时必须实现java.io.Serializable接口
class House(val sn: Int, val id: Int, val name: String, val size1: Float, val branch_id: Int)
  extends java.io.Serializable {
  //留空即可,有默认构造函数
}

//同时必须实现java.io.Serializable接口
class Branch(val id: Int, val name: String, val md5: String, val building_name: String)
  extends java.io.Serializable {
  //留空即可,有默认构造函数
}

//注册使用Kryo序列化的类,要求MyClass1和MyClass2必须实现java.io.Serializable
class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[House]);
    kryo.register(classOf[Branch]);
  }
}

object SparkKryo {
  def main(args: Array[String]) {
    //设置序列化器为KryoSerializer,也可以在配置文件中进行配置
    //需要在new SparkContext之前设置
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "net.cheyo.MyKryoRegistrator")

    val conf = new SparkConf()
    conf.setAppName("SparkKryo")
    //conf.setMaster("local[3]")

    val sc = new SparkContext(conf)

    val accum = sc.accumulator(0, "HouseCounter")

    val house_rdd = sc.textFile("hdfs://ctrl:9000/user/root/house_house/part-m-00000")
    //val house_rdd = sc.textFile("file:///E:/house.txt")

    val house_rdd2 = house_rdd.map(line => line.split(","))
    val house_rdd3 = house_rdd2.map(arr => new House(arr(0).toInt, arr(0).toInt, 
           arr(2), arr(4).toFloat, arr(10).toInt))
    //将rdd中的对象通过kyro进行序列化
    val dir = "hdfs://ctrl:9000/user/root/house_obj" + System.currentTimeMillis()
    house_rdd3.saveAsObjectFile(dir)

    //读取part-00000文件中的数据,对它进行反序列化,得到对象集合rdd_house
    val rdd_house = sc.objectFile[House](dir + "/part-00000")

    var i = 0
    rdd_house.foreachPartition(iter => {
      while (iter.hasNext) {
        val house = iter.next();
        accum += 1

        //不知为何放开如下几行代码的话,后面的House Counter就打印不出来,house.name也打印不出来
        //i += 1
        //if (i < 5) {
        //  println(house.name)
        //}
      }
    })

    println("House Counter: " + accum.value)

    sc.stop
  }
}

相关配置

  • spark.serializer

默认值:org.apache.spark.serializer.JavaSerializer。哪一种序列化器。另一种是org.apache.spark.serializer.KryoSerializer。

  • spark.kryo.classesToRegister

如果你用Kryo序列化,给定的用逗号分隔的自定义类名列表表示要注册的类。

  • spark.kryo.registrator

如果用Kryo序列化,设置这个类去注册自定义类。如果需要用自定义的方式注册类,需要用这个属性。否则spark.kryo.classesToRegister会更简单。spark.kryo.registrator应该设置一个继承自KryoRegistrator的类。

  • spark.kryo.referenceTracking

默认值:true。当用Kryo序列化时,跟踪是否引用同一对象。如果你的对象图有环,这是必须的设置。如果他们包含相同对象的多个副本,这个设置对效率是有用的。如果你知道不在这两个场景,那么可以禁用它以提高效率。

  • spark.kryo.registrationRequired

默认值:false。是否需要注册为Kyro可用。如果设置为true,然后如果一个没有注册的类序列化,Kyro会抛出异常。如果设置为false,Kryo将会同时写每个对象和其非注册类名。写类名可能造成显著地性能瓶颈。

  • spark.closure.serializer

默认值:org.apache.spark.serializer.JavaSerializer。闭包序列化器,目前只支持Java序列化器。

  • spark.kryoserializer.buffer.max.mb

默认值:64. Kryo序列化缓存允许的最大值。这个值必须大于你尝试序列化的对象.

  • spark.kryoserializer.buffer.mb

默认值:0.064. Kyro序列化缓存的大小。这样worker上的每个核都有一个缓存。如果有需要,缓存会涨到spark.kryoserializer.buffer.max.mb设置的值那么大。

  • spark.serializer.objectStreamReset

默认值:100. 当用org.apache.spark.serializer.JavaSerializer序列化时,序列化器通过缓存对象防止写多余的数据,然而这会造成这些对象的垃圾回收停止。通过请求'reset',你从序列化器中flush这些信息并允许收集老的数据。为了关闭这个周期性的reset,你可以将值设为-1。默认情况下,每一百个对象reset一次。

压缩

相关配置

  • spark.broadcast.compress

默认值:true。 在发送广播变量之前是否压缩它。

  • spark.io.compression.codec

默认值:snappy。压缩诸如RDD分区、广播变量、shuffle输出等内部数据的编码解码器。默认情况下,Spark提供了三种选择:lz4, lzf和snappy。你也可以用完整的类名来制定。org.apache.spark.io.LZ4CompressionCodec,org.apache.spark.io.LZFCompressionCodec,org.apache.spark.io.SnappyCompressionCodec

  • spark.io.compression.lz4.block.size

默认值:32768。LZ4压缩中用到的块大小。降低这个块的大小也会降低shuffle内存使用率。

  • spark.io.compression.snappy.block.size

默认值:32768。Snappy压缩中用到的块大小。降低这个块的大小也会降低shuffle内存使用率。

  • spark.rdd.compress

默认值:false。是否压缩序列化的RDD分区。在花费一些额外的CPU时间的同时节省大量的空间。

Tags: #Spark    Post on Spark