Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
1.6.0, 2.0.0, 2.1.0
-
None
-
Windows 10, Ubuntu 16.04.1, Scala 2.11.8, Spark 1.6.0, 2.0.0, 2.1.0
Description
VectorAssembler produces unnecessary metadata that overflows the Java heap in the case of sparse vectors. In the example below, the logical length of the vector is 10^6, but the number of non-zero values is only 2.
The problem arises when the vector assembler creates metadata (ML attributes) for each of the 10^6 slots, even if this metadata didn't exist upstream (i.e. HashingTF doesn't produce metadata per slot). Here is a chunk of metadata it produces:
{"ml_attr":{"attrs":{"numeric":[{"idx":0,"name":"HashedFeat_0"},{"idx":1,"name":"HashedFeat_1"},{"idx":2,"name":"HashedFeat_2"},{"idx":3,"name":"HashedFeat_3"},{"idx":4,"name":"HashedFeat_4"},{"idx":5,"name":"HashedFeat_5"},{"idx":6,"name":"HashedFeat_6"},{"idx":7,"name":"HashedFeat_7"},{"idx":8,"name":"HashedFeat_8"},{"idx":9,"name":"HashedFeat_9"},...,{"idx":1000000,"name":"Feat01"}]},"num_attrs":1000001}}
In this lightweight example, the feature size limit seems to be 1,000,000 when run locally, but this scales poorly with more complicated routines. With a larger dataset and a learner (say LogisticRegression), it maxes out anywhere between 10k and 100k hash size even on a decent sized cluster.
I did some digging, and it seems that the only metadata necessary for downstream learners is the one indicating categorical columns. Thus, I thought of the following possible solutions:
1. Compact representation of ml attributes metadata (but this seems to be a bigger change)
2. Removal of non-categorical tags from the metadata created by the VectorAssembler
3. An option on the existent VectorAssembler to skip unnecessary ml attributes or create another transformer altogether
I would happy to take a stab at any of these solutions, but I need some direction from the Spark community.
import org.apache.spark.SparkConf import org.apache.spark.ml.feature.{HashingTF, VectorAssembler} import org.apache.spark.sql.SparkSession object VARepro { case class Record(Label: Double, Feat01: Double, Feat02: Array[String]) def main(args: Array[String]) { val conf = new SparkConf() .setAppName("Vector assembler bug") .setMaster("local[*]") val spark = SparkSession.builder.config(conf).getOrCreate() import spark.implicits._ val df = Seq(Record(1.0, 2.0, Array("4daf")), Record(0.0, 3.0, Array("a9ee"))).toDS() val numFeatures = 10000000 val hashingScheme = new HashingTF().setInputCol("Feat02").setOutputCol("HashedFeat").setNumFeatures(numFeatures) val hashedData = hashingScheme.transform(df) val vectorAssembler = new VectorAssembler().setInputCols(Array("HashedFeat","Feat01")).setOutputCol("Features") val processedData = vectorAssembler.transform(hashedData).select("Label", "Features") processedData.show() } }
Stacktrace from the example above:
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.ml.attribute.NumericAttribute.copy(attributes.scala:272)
at org.apache.spark.ml.attribute.NumericAttribute.withIndex(attributes.scala:215)
at org.apache.spark.ml.attribute.NumericAttribute.withIndex(attributes.scala:195)
at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3$$anonfun$apply$1.apply(AttributeGroup.scala:71)
at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3$$anonfun$apply$1.apply(AttributeGroup.scala:70)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.IterableLike$class.copyToArray(IterableLike.scala:254)
at scala.collection.SeqViewLike$AbstractTransformed.copyToArray(SeqViewLike.scala:37)
at scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:278)
at scala.collection.SeqViewLike$AbstractTransformed.copyToArray(SeqViewLike.scala:37)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:286)
at scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37)
at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
at org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
at scala.Option.map(Option.scala:146)
at org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:70)
at org.apache.spark.ml.attribute.AttributeGroup.<init>(AttributeGroup.scala:65)
at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:95)
at VARepro$.main(VARepro.scala:36)
Exception when run in conjuction with a learner on a bigger dataset (~10Gb) on a cluster.
: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)
Attachments
Issue Links
- is related to
-
SPARK-22346 Update VectorAssembler to work with Structured Streaming
- Resolved
-
SPARK-21926 Compatibility between ML Transformers and Structured Streaming
- Resolved
- relates to
-
SPARK-8515 Improve ML attribute API
- Resolved