Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19141

VectorAssembler metadata causing memory issues

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 1.6.0, 2.0.0, 2.1.0
    • None
    • ML, MLlib
    • Windows 10, Ubuntu 16.04.1, Scala 2.11.8, Spark 1.6.0, 2.0.0, 2.1.0

    Description

      VectorAssembler produces unnecessary metadata that overflows the Java heap in the case of sparse vectors. In the example below, the logical length of the vector is 10^6, but the number of non-zero values is only 2.
      The problem arises when the vector assembler creates metadata (ML attributes) for each of the 10^6 slots, even if this metadata didn't exist upstream (i.e. HashingTF doesn't produce metadata per slot). Here is a chunk of metadata it produces:

      {"ml_attr":{"attrs":{"numeric":[{"idx":0,"name":"HashedFeat_0"},{"idx":1,"name":"HashedFeat_1"},{"idx":2,"name":"HashedFeat_2"},{"idx":3,"name":"HashedFeat_3"},{"idx":4,"name":"HashedFeat_4"},{"idx":5,"name":"HashedFeat_5"},{"idx":6,"name":"HashedFeat_6"},{"idx":7,"name":"HashedFeat_7"},{"idx":8,"name":"HashedFeat_8"},{"idx":9,"name":"HashedFeat_9"},...,{"idx":1000000,"name":"Feat01"}]},"num_attrs":1000001}}
      

      In this lightweight example, the feature size limit seems to be 1,000,000 when run locally, but this scales poorly with more complicated routines. With a larger dataset and a learner (say LogisticRegression), it maxes out anywhere between 10k and 100k hash size even on a decent sized cluster.
      I did some digging, and it seems that the only metadata necessary for downstream learners is the one indicating categorical columns. Thus, I thought of the following possible solutions:

      1. Compact representation of ml attributes metadata (but this seems to be a bigger change)
      2. Removal of non-categorical tags from the metadata created by the VectorAssembler
      3. An option on the existent VectorAssembler to skip unnecessary ml attributes or create another transformer altogether

      I would happy to take a stab at any of these solutions, but I need some direction from the Spark community.

      VABug.scala
      import org.apache.spark.SparkConf
      import org.apache.spark.ml.feature.{HashingTF, VectorAssembler}
      import org.apache.spark.sql.SparkSession
      
      
      object VARepro {
      
        case class Record(Label: Double, Feat01: Double, Feat02: Array[String])
      
        def main(args: Array[String]) {
      
          val conf = new SparkConf()
            .setAppName("Vector assembler bug")
            .setMaster("local[*]")
          val spark = SparkSession.builder.config(conf).getOrCreate()
      
          import spark.implicits._
          val df = Seq(Record(1.0, 2.0, Array("4daf")), Record(0.0, 3.0, Array("a9ee"))).toDS()
      
          val numFeatures = 10000000
          val hashingScheme = new HashingTF().setInputCol("Feat02").setOutputCol("HashedFeat").setNumFeatures(numFeatures)
          val hashedData = hashingScheme.transform(df)
      
          val vectorAssembler = new VectorAssembler().setInputCols(Array("HashedFeat","Feat01")).setOutputCol("Features")
          val processedData = vectorAssembler.transform(hashedData).select("Label", "Features")
          processedData.show()
        }
      }
      

      Stacktrace from the example above:

      Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
      at org.apache.spark.ml.attribute.NumericAttribute.copy(attributes.scala:272)
      at org.apache.spark.ml.attribute.NumericAttribute.withIndex(attributes.scala:215)
      at org.apache.spark.ml.attribute.NumericAttribute.withIndex(attributes.scala:195)
      at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3$$anonfun$apply$1.apply(AttributeGroup.scala:71)
      at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3$$anonfun$apply$1.apply(AttributeGroup.scala:70)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
      at scala.collection.IterableLike$class.copyToArray(IterableLike.scala:254)
      at scala.collection.SeqViewLike$AbstractTransformed.copyToArray(SeqViewLike.scala:37)
      at scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:278)
      at scala.collection.SeqViewLike$AbstractTransformed.copyToArray(SeqViewLike.scala:37)
      at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:286)
      at scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37)
      at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
      at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
      at scala.Option.map(Option.scala:146)
      at org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:70)
      at org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:65)
      at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:95)
      at VARepro$.main(VARepro.scala:36)

      Exception when run in conjuction with a learner on a bigger dataset (~10Gb) on a cluster.

      : java.lang.OutOfMemoryError: Java heap space
      at java.util.Arrays.copyOf(Arrays.java:3236)
      at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
      at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
      at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
      at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
      at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
      at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              moprescu Antonia Oprescu
              Votes:
              4 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: