Description
An InMemoryRelation is created when you cache a table; but if the table is large, defined by either having a really large amount of columns, or a really large amount of partitions (in the file split sense, not the "table partition" sense), or both, it causes an immense amount of memory to be used in the driver.
The reason is that it uses an accumulator to collect statistics about each partition, and instead of summarizing the data in the driver, it keeps all entries in memory.
I'm attaching a script I used to create a parquet file with 20,000 columns and a single row, which I then copied 500 times so I'd have 500 partitions.
When doing the following:
sqlContext.read.parquet(...).count()
Everything works fine, both in Spark 1.6 and 2.0. (It's super slow with the settings I used, but it works.)
I ran spark-shell like this:
./bin/spark-shell --master 'local-cluster[4,1,4096]' --driver-memory 2g --conf spark.executor.memory=2g
And ran:
sqlContext.read.parquet(...).cache().count()
You'll see the results in screenshot example_1.6_pre_patch.png. After 40 partitions were processed, there were 40 GenericInternalRow objects with
100,000 items each (5 stat info fields * 20,000 columns). So, memory usage was:
40 * 100000 * (4 * 20 + 24) = 416000000 =~ 400MB
(Note: Integer = 20 bytes, Long = 24 bytes.)
If I waited until the end, there would be 500 partitions, so ~ 5GB of memory to hold the stats.
I'm also attaching a patch I made on top of 1.6 that uses just a long accumulator to capture the table size; with that patch memory usage on the driver doesn't keep growing. Also note in the patch that I'm multiplying the column size by the row count, which I think is a different bug in the existing code (those stats should be for the whole batch, not just a single row, right?). I also added example_1.6_post_patch.png to show the InMemoryRelation with the patch.
I also applied a very similar patch on top of Spark 2.0. But there things blow up even more spectacularly when I try to run the count on the cached table. It starts with this error:
14:19:43 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, vanzin-st1-3.gce.cloudera.com): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: java.lang.IndexOutOfBoundsException: Index: 63235, Size: 1 (lots of generated code here...) Caused by: java.lang.IndexOutOfBoundsException: Index: 63235, Size: 1 at java.util.ArrayList.rangeCheck(ArrayList.java:635) at java.util.ArrayList.get(ArrayList.java:411) at org.codehaus.janino.util.ClassFile.getConstantPoolInfo(ClassFile.java:556) at org.codehaus.janino.util.ClassFile.getConstantUtf8(ClassFile.java:572) at org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1513) at org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:644) at org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:623) at org.codehaus.janino.util.ClassFile.<init>(ClassFile.java:280) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:913) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:911) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:911) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:883) ... 54 more
And basically a lot of that going on making the output unreadable, so I just killed the shell. Anyway, I believe the same fix should work there, but I can't be sure because the test doesn't work for different reasons, it seems.