Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-14138

Generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.6.1
    • Fix Version/s: 1.6.2, 2.0.0
    • Component/s: SQL
    • Labels:
      None

      Description

      The generated SpecificColumnarIterator code for wide DataFrames can exceed the JVM 64k limit under certain circumstances. This snippet reproduces the error in spark-shell (with 5G driver memory) by creating a new DataFrame with >2000 aggregation-based columns:

      val df = sc.parallelize(1 to 10).toDF()
      val aggr = {1 to 2260}.map(colnum => avg(df.col("_1")).as(s"col_$colnum"))
      val res = df.groupBy("_1").agg(count("_1"), aggr: _*).cache()
      res.show() // this will break
      

      The following error is produced (pruned for brevity):

      /* 001 */
      /* 002 */ import java.nio.ByteBuffer;
      /* 003 */ import java.nio.ByteOrder;
      /* 004 */ import scala.collection.Iterator;
      /* 005 */ import org.apache.spark.sql.types.DataType;
      /* 006 */ import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
      /* 007 */ import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
      /* 008 */ import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
      /* 009 */
      /* 010 */ public SpecificColumnarIterator generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) {
      /* 011 */   return new SpecificColumnarIterator();
      /* 012 */ }
      /* 013 */
      
      ...
      
      /* 9113 */     accessor2261.extractTo(mutableRow, 2261);
      /* 9114 */     unsafeRow.pointTo(bufferHolder.buffer, 2262, bufferHolder.totalSize());
      /* 9115 */     return unsafeRow;
      /* 9116 */   }
      /* 9117 */ }
      
      	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:555)
      	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:575)
      	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:572)
      	at org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
      	at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
      	... 28 more
      Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "()Z" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator" grows beyond 64 KB
      	at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
      	at org.codehaus.janino.CodeContext.write(CodeContext.java:836)
      	at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251)
      	at org.codehaus.janino.UnitCompiler.invoke(UnitCompiler.java:10050)
      	at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4008)
      	at org.codehaus.janino.UnitCompiler.access$6900(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$10.visitMethodInvocation(UnitCompiler.java:3263)
      	at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:3974)
      	at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290)
      	at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368)
      	at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:3927)
      	at org.codehaus.janino.UnitCompiler.access$6900(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$10.visitMethodInvocation(UnitCompiler.java:3263)
      	at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:3974)
      	at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290)
      	at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368)
      	at org.codehaus.janino.UnitCompiler.invokeConstructor(UnitCompiler.java:6681)
      	at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4126)
      	at org.codehaus.janino.UnitCompiler.access$7600(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$10.visitNewClassInstance(UnitCompiler.java:3275)
      	at org.codehaus.janino.Java$NewClassInstance.accept(Java.java:4085)
      	at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290)
      	at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2669)
      	at org.codehaus.janino.UnitCompiler.access$4500(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$7.visitAssignment(UnitCompiler.java:2619)
      	at org.codehaus.janino.Java$Assignment.accept(Java.java:3405)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2654)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1643)
      	at org.codehaus.janino.UnitCompiler.access$1100(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$4.visitExpressionStatement(UnitCompiler.java:936)
      	at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2097)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958)
      	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1007)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2293)
      	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:822)
      	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
      	at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
      	at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
      	at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
      	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
      	at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
      	at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
      	at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
      	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
      	at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
      	at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
      	at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
      	at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
      	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
      	at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
      	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:550)
      	... 32 more
      

      Note that the issue does not occur (and the .show() call prints the right results) when the number of aggregation columns is slightly reduced, 2250 instead of 2260 in this case:

      val df = sc.parallelize(1 to 10).toDF()
      val aggr = {1 to 2250}.map(colnum => avg(df.col("_1")).as(s"col_$colnum")) // only 2250
      val res = df.groupBy("_1").agg(count("_1"), aggr: _*).cache()
      res.show() // this will work
      

      Also, if the final DataFrame is not cached, then it will also work for 2260 aggregations:

      val df = sc.parallelize(1 to 10).toDF()
      val aggr = {1 to 2260}.map(colnum => avg(df.col("_1")).as(s"col_$colnum"))
      val res = df.groupBy("_1").agg(count("_1"), aggr: _*) // no .cache() call
      res.show() // this will work
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                kiszk Kazuaki Ishizaki
                Reporter:
                skrasser Sven Krasser
              • Votes:
                0 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: