Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-4550

In sort-based shuffle, store map outputs in serialized form

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.4.0
    • Component/s: Shuffle, Spark Core
    • Labels:
      None
    • Target Version/s:

      Description

      One drawback with sort-based shuffle compared to hash-based shuffle is that it ends up storing many more java objects in memory. If Spark could store map outputs in serialized form, it could

      • spill less often because the serialized form is more compact
      • reduce GC pressure

      This will only work when the serialized representations of objects are independent from each other and occupy contiguous segments of memory. E.g. when Kryo reference tracking is left on, objects may contain pointers to objects farther back in the stream, which means that the sort can't relocate objects without corrupting them.

      1. kryo-flush-benchmark.scala
        2 kB
        Sandy Ryza
      2. SPARK-4550-design-v1.pdf
        145 kB
        Sandy Ryza

        Issue Links

          Activity

          Hide
          pwendell Patrick Wendell added a comment -

          Not an expert on the internals of this component, but do we need a way of ordering/comparing serialized objects for this to work?

          Show
          pwendell Patrick Wendell added a comment - Not an expert on the internals of this component, but do we need a way of ordering/comparing serialized objects for this to work?
          Hide
          sandyr Sandy Ryza added a comment -

          We don't, though it would allow us to be much more efficient in certain situations.

          The way sort-based shuffle works right now, the map side only sorts by the partition, so we can store this number alongside the serialized record and not need to compare keys at all.

          SPARK-2926 proposes sorting by keys on the map side. For that, we'd need to deserialize keys before comparing them. There might be situations where this is slower than not serializing them in the first place. But even in those situations, we'd get more reliability by stressing GC less. It would probably be good to define raw comparators for common raw-comparable key types like ints and strings.

          Show
          sandyr Sandy Ryza added a comment - We don't, though it would allow us to be much more efficient in certain situations. The way sort-based shuffle works right now, the map side only sorts by the partition, so we can store this number alongside the serialized record and not need to compare keys at all. SPARK-2926 proposes sorting by keys on the map side. For that, we'd need to deserialize keys before comparing them. There might be situations where this is slower than not serializing them in the first place. But even in those situations, we'd get more reliability by stressing GC less. It would probably be good to define raw comparators for common raw-comparable key types like ints and strings.
          Hide
          sandyr Sandy Ryza added a comment -

          Just posted a design doc. Would love to get feedback Aaron Davidson Matei Zaharia Saisai Shao.

          Show
          sandyr Sandy Ryza added a comment - Just posted a design doc. Would love to get feedback Aaron Davidson Matei Zaharia Saisai Shao .
          Hide
          pwendell Patrick Wendell added a comment -

          Yeah, this is a good idea. I don't see why we don't serialize these immediately.

          Show
          pwendell Patrick Wendell added a comment - Yeah, this is a good idea. I don't see why we don't serialize these immediately.
          Hide
          pwendell Patrick Wendell added a comment -

          The doc alludes to having to (at some point) deal with comparing serialized objects. In the future one approach would be to restrict this only to SchemaRDD's where we can have more control over the serialized format. This is effectively what Flink and other systems do (they basically only have SchemaRDD's).

          Show
          pwendell Patrick Wendell added a comment - The doc alludes to having to (at some point) deal with comparing serialized objects. In the future one approach would be to restrict this only to SchemaRDD's where we can have more control over the serialized format. This is effectively what Flink and other systems do (they basically only have SchemaRDD's).
          Show
          sandyr Sandy Ryza added a comment - WIP branch: https://github.com/sryza/spark/tree/sandy-spark-4550
          Hide
          rxin Reynold Xin added a comment -

          Sandy,

          The proposal seems to assume that objects can be individually serialized efficiently. That is not the case with Kryo, and I'm not even sure if it is safe to do that in Java. Do you have any thoughts on this?

          Show
          rxin Reynold Xin added a comment - Sandy, The proposal seems to assume that objects can be individually serialized efficiently. That is not the case with Kryo, and I'm not even sure if it is safe to do that in Java. Do you have any thoughts on this?
          Hide
          sandyr Sandy Ryza added a comment - - edited

          I had heard rumors to that effect, so I ran some experiments and didn't find that to be the case:

          import org.apache.spark.serializer.KryoSerializer
          import org.apache.spark.SparkConf
          import java.io.ByteArrayOutputStream
          import java.nio.ByteBuffer
          
          val ser1 = new KryoSerializer(new SparkConf)
          
          def serialize(objs: Array[AnyRef], ser: KryoSerializer): Array[Byte] = {
            val instance = ser.newInstance
            val baos = new ByteArrayOutputStream()
            val stream = instance.serializeStream(baos)
            objs.foreach(obj => stream.writeObject(obj))
            stream.close()
            baos.toByteArray
          }
          
          val inner = (0 until 100000).toArray
          val bytes1 = serialize(Array((1, inner), (2, inner)), ser1)
          
          val inner1 = (0 until 100000).toArray
          val inner2 = (0 until 100000).toArray
          val bytes2 = serialize(Array((1, inner1), (2, inner2)), ser1)
          
          val secondHalf = new Array[Byte](bytes1.size / 2)
          System.arraycopy(bytes1, bytes1.size / 2, secondHalf, 0, bytes1.size / 2)
          
          ser1.newInstance.deserialize[AnyRef](ByteBuffer.wrap(secondHalf))
          

          A couple observations:

          • "bytes1" ends up the same size as "bytes2", implying that "inner" is not being reference-tracked between the two writeObject calls
          • The last line is able to successfully reproduce the second object, implying that there's no information written at the beginning of the stream needed to deserialize objects later down.

          Are there cases or Kryo versions I'm not thinking about?

          Show
          sandyr Sandy Ryza added a comment - - edited I had heard rumors to that effect, so I ran some experiments and didn't find that to be the case: import org.apache.spark.serializer.KryoSerializer import org.apache.spark.SparkConf import java.io.ByteArrayOutputStream import java.nio.ByteBuffer val ser1 = new KryoSerializer( new SparkConf) def serialize(objs: Array[AnyRef], ser: KryoSerializer): Array[ Byte ] = { val instance = ser.newInstance val baos = new ByteArrayOutputStream() val stream = instance.serializeStream(baos) objs.foreach(obj => stream.writeObject(obj)) stream.close() baos.toByteArray } val inner = (0 until 100000).toArray val bytes1 = serialize(Array((1, inner ), (2, inner )), ser1) val inner1 = (0 until 100000).toArray val inner2 = (0 until 100000).toArray val bytes2 = serialize(Array((1, inner1), (2, inner2)), ser1) val secondHalf = new Array[ Byte ](bytes1.size / 2) System .arraycopy(bytes1, bytes1.size / 2, secondHalf, 0, bytes1.size / 2) ser1.newInstance.deserialize[AnyRef](ByteBuffer.wrap(secondHalf)) A couple observations: "bytes1" ends up the same size as "bytes2", implying that "inner" is not being reference-tracked between the two writeObject calls The last line is able to successfully reproduce the second object, implying that there's no information written at the beginning of the stream needed to deserialize objects later down. Are there cases or Kryo versions I'm not thinking about?
          Hide
          sandyr Sandy Ryza added a comment -

          I also just tried this out using an object that's not registered with Kryo (java.awt.Color) instead of an Int array and observed things work fine as well.

          Show
          sandyr Sandy Ryza added a comment - I also just tried this out using an object that's not registered with Kryo (java.awt.Color) instead of an Int array and observed things work fine as well.
          Hide
          sandyr Sandy Ryza added a comment - - edited

          I got a working prototype and benchmarked the ExternalSorter changes on my laptop.

          Each run inserts a bunch of records, each a (Int, (10-character string, Int)) tuple, into an ExternalSorter and then calls writePartitionedFile. The reported memory size is the sum of the shuffle bytes spilled (mem) metric and the remaining size of the collection after insertion has completed. Results are averaged over three runs.

          Keep in mind that the primary goal here is to reduce GC pressure, so any speed improvements are icing.

          Number of Records Storing as Serialized Memory Size Number of Spills Insert Time (ms) Write Time (ms) Total Time
          1 million false 194923217 0 1123 3442 4566
          1 million true 48694072 0 1315 2652 3967
          10 million false 2050514159 3 26723 17418 44141
          10 million true 613614392 1 16501 17151 33652
          50 million false 10166122563 17 101831 89960 191791
          50 million true 3067937592 5 76801 78361 155161
          Show
          sandyr Sandy Ryza added a comment - - edited I got a working prototype and benchmarked the ExternalSorter changes on my laptop. Each run inserts a bunch of records, each a (Int, (10-character string, Int)) tuple, into an ExternalSorter and then calls writePartitionedFile. The reported memory size is the sum of the shuffle bytes spilled (mem) metric and the remaining size of the collection after insertion has completed. Results are averaged over three runs. Keep in mind that the primary goal here is to reduce GC pressure, so any speed improvements are icing. Number of Records Storing as Serialized Memory Size Number of Spills Insert Time (ms) Write Time (ms) Total Time 1 million false 194923217 0 1123 3442 4566 1 million true 48694072 0 1315 2652 3967 10 million false 2050514159 3 26723 17418 44141 10 million true 613614392 1 16501 17151 33652 50 million false 10166122563 17 101831 89960 191791 50 million true 3067937592 5 76801 78361 155161
          Hide
          apachespark Apache Spark added a comment -

          User 'sryza' has created a pull request for this issue:
          https://github.com/apache/spark/pull/4450

          Show
          apachespark Apache Spark added a comment - User 'sryza' has created a pull request for this issue: https://github.com/apache/spark/pull/4450
          Hide
          sandyr Sandy Ryza added a comment -

          I spoke briefly with Reynold about this offline, and he pointed out that, with the patch, we now flush the Kryo serialization stream after every object we write. I put together a micro-benchmark to stress this that writes a bunch of small records to a Kryo serialization stream with and without flushing:

          runs without flush: (count: 30, mean: 226.400000, stdev: 3.929377, max: 241.000000, min: 222.000000)
          runs with flush: (count: 30, mean: 226.300000, stdev: 2.084067, max: 234.000000, min: 224.000000)

          There doesn't appear to be a significant difference. The benchmark code is attached.

          Show
          sandyr Sandy Ryza added a comment - I spoke briefly with Reynold about this offline, and he pointed out that, with the patch, we now flush the Kryo serialization stream after every object we write. I put together a micro-benchmark to stress this that writes a bunch of small records to a Kryo serialization stream with and without flushing: runs without flush: (count: 30, mean: 226.400000, stdev: 3.929377, max: 241.000000, min: 222.000000) runs with flush: (count: 30, mean: 226.300000, stdev: 2.084067, max: 234.000000, min: 224.000000) There doesn't appear to be a significant difference. The benchmark code is attached.
          Hide
          rxin Reynold Xin added a comment -

          Sandy Ryza can you investigate what's happening with Java serialization? Would be great to get that working too.

          Show
          rxin Reynold Xin added a comment - Sandy Ryza can you investigate what's happening with Java serialization? Would be great to get that working too.
          Hide
          sandyr Sandy Ryza added a comment -

          Java serialization appears to write out the full class name the first time an object is written and then refer to it by an identifier afterwards:

          scala> val baos = new ByteArrayOutputStream()
          scala> val oos = new ObjectOutputStream(baos)
          scala> oos.writeObject(new java.util.Date())
          scala> oos.flush()
          
          scala> baos.toString
          res8: String = ��??sr??java.util.Datehj�?KYt????xpw????LY6: x 
          scala> baos.toByteArray.length
          res9: Int = 46
          
          scala> oos.writeObject(new java.util.Date())
          scala> oos.flush()
          
          scala> baos.toString
          res14: String = ��??sr??java.util.Datehj�?KYt????xpw????LY6: xsq?~??w????LY6�Dx 
          scala> baos.toByteArray.length
          res13: Int = 63
          
          scala> oos.writeObject(new java.util.Date())
          scala> oos.flush()
          
          scala> baos.toString
          res17: String = ��??sr??java.util.Datehj�?KYt????xpw????LY6: xsq?~??w????LY6�Dxsq?~??w????LY8?�x 
          scala> baos.toByteArray.length
          res18: Int = 80
          

          There might be some fancy way to listen for the class name being written out and relocate that segment to the front of the stream. However, this seems fairly and involved and bug-prone; my opinion is that isn't not worth it given that Java ser is already a severely performance-impaired option. Another option of course would be to write the class name in front of every record, but this would bloat the serialized representation considerably.

          Show
          sandyr Sandy Ryza added a comment - Java serialization appears to write out the full class name the first time an object is written and then refer to it by an identifier afterwards: scala> val baos = new ByteArrayOutputStream() scala> val oos = new ObjectOutputStream(baos) scala> oos.writeObject( new java.util.Date()) scala> oos.flush() scala> baos.toString res8: String = ��??sr??java.util.Datehj�?KYt????xpw????LY6: x scala> baos.toByteArray.length res9: Int = 46 scala> oos.writeObject( new java.util.Date()) scala> oos.flush() scala> baos.toString res14: String = ��??sr??java.util.Datehj�?KYt????xpw????LY6: xsq?~??w????LY6�Dx scala> baos.toByteArray.length res13: Int = 63 scala> oos.writeObject( new java.util.Date()) scala> oos.flush() scala> baos.toString res17: String = ��??sr??java.util.Datehj�?KYt????xpw????LY6: xsq?~??w????LY6�Dxsq?~??w????LY8?�x scala> baos.toByteArray.length res18: Int = 80 There might be some fancy way to listen for the class name being written out and relocate that segment to the front of the stream. However, this seems fairly and involved and bug-prone; my opinion is that isn't not worth it given that Java ser is already a severely performance-impaired option. Another option of course would be to write the class name in front of every record, but this would bloat the serialized representation considerably.
          Hide
          apachespark Apache Spark added a comment -

          User 'sryza' has created a pull request for this issue:
          https://github.com/apache/spark/pull/5916

          Show
          apachespark Apache Spark added a comment - User 'sryza' has created a pull request for this issue: https://github.com/apache/spark/pull/5916

            People

            • Assignee:
              sandyr Sandy Ryza
              Reporter:
              sandyr Sandy Ryza
            • Votes:
              1 Vote for this issue
              Watchers:
              19 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development