Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-15390

Memory management issue in complex DataFrame join and filter

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.0
    • 2.0.1
    • SQL
    • None
    • branch-2.0, 16 workers

    Description

      See SPARK-15389 for a description of the code which produces this bug. I am filing this as a separate JIRA since the bug in 2.0 is different.

      In 2.0, the code fails with some memory management error. Here is the stacktrace:

      OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support was removed in 8.0
      16/05/18 19:23:16 ERROR Uncaught throwable from user code: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
      Exchange SinglePartition, None
      +- WholeStageCodegen
         :  +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#170L])
         :     +- Project
         :        +- BroadcastHashJoin [id#70L], [id#110L], Inner, BuildLeft, None
         :           :- INPUT
         :           +- Project [id#110L]
         :              +- Filter (degree#115 > 2000000)
         :                 +- TungstenAggregate(key=[id#110L], functions=[(count(1),mode=Final,isDistinct=false)], output=[id#110L,degree#115])
         :                    +- INPUT
         :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint]))
         :  +- WholeStageCodegen
         :     :  +- Project [row#66.id AS id#70L]
         :     :     +- Filter isnotnull(row#66.id)
         :     :        +- INPUT
         :     +- Scan ExistingRDD[row#66,uniq_id#67]
         +- Exchange hashpartitioning(id#110L, 200), None
            +- WholeStageCodegen
               :  +- TungstenAggregate(key=[id#110L], functions=[(count(1),mode=Partial,isDistinct=false)], output=[id#110L,count#136L])
               :     +- Filter isnotnull(id#110L)
               :        +- INPUT
               +- Generate explode(array(src#2L, dst#3L)), false, false, [id#110L]
                  +- WholeStageCodegen
                     :  +- Filter ((isnotnull(src#2L) && isnotnull(dst#3L)) && NOT (src#2L = dst#3L))
                     :     +- INPUT
                     +- InMemoryTableScan [src#2L,dst#3L], [isnotnull(src#2L),isnotnull(dst#3L),NOT (src#2L = dst#3L)], InMemoryRelation [src#2L,dst#3L], true, 10000, StorageLevel(disk=true, memory=true, offheap=false, deserialized=true, replication=1), WholeStageCodegen, None
      
      	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
      	at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
      	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
      	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.inputRDDs(TungstenAggregate.scala:134)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:348)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
      	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
      	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287)
      	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2122)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
      	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2436)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2121)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2128)
      	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2156)
      	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2155)
      	at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2449)
      	at org.apache.spark.sql.Dataset.count(Dataset.scala:2155)
      	at Notebook.summary$1(<console>:70)
      	at Notebook.getIndexedEdges(<console>:82)
      	at Notebook.getIndexedGraph(<console>:135)
      Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: 
      	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
      	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:102)
      	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:229)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:125)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:124)
      	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
      	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
      	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
      	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
      	at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:30)
      	at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:62)
      	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
      	at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:79)
      	at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:194)
      	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
      	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.consume(TungstenAggregate.scala:33)
      	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.generateResultCode(TungstenAggregate.scala:432)
      	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doProduceWithKeys(TungstenAggregate.scala:534)
      	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doProduce(TungstenAggregate.scala:141)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
      	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.produce(TungstenAggregate.scala:33)
      	at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:113)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
      	at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:79)
      	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
      	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:30)
      	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
      	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
      	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
      	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:30)
      	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doProduceWithoutKeys(TungstenAggregate.scala:211)
      	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doProduce(TungstenAggregate.scala:139)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
      	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
      	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.produce(TungstenAggregate.scala:33)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:304)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:343)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
      	at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:86)
      	at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:122)
      	at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:113)
      	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
      	at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:113)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
      	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
      	at org.apache.spark.sql.execution.aggregate.TungstenAggregate.inputRDDs(TungstenAggregate.scala:134)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:348)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
      	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
      	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287)
      	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2122)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
      	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2436)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2121)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2128)
      	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2156)
      	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2155)
      	at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2449)
      	at org.apache.spark.sql.Dataset.count(Dataset.scala:2155)
      	at Notebook.summary$1(<console>:70)
      	at Notebook.getIndexedEdges(<console>:82)
      	at Notebook.getIndexedGraph(<console>:135)
      Caused by: java.util.concurrent.ExecutionException: Boxed Error
      	at scala.concurrent.impl.Promise$.resolver(Promise.scala:55)
      	at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:47)
      	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:244)
      	at scala.concurrent.Promise$class.complete(Promise.scala:55)
      	at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:23)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.AssertionError: assertion failed: invalid number of bytes requested: -2146435072
      	at scala.Predef$.assert(Predef.scala:179)
      	at org.apache.spark.memory.ExecutionMemoryPool.acquireMemory(ExecutionMemoryPool.scala:96)
      	at org.apache.spark.memory.StaticMemoryManager.acquireExecutionMemory(StaticMemoryManager.scala:98)
      	at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:145)
      	at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.acquireMemory(HashedRelation.scala:403)
      	at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.init(HashedRelation.scala:419)
      	at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.<init>(HashedRelation.scala:426)
      	at org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:795)
      	at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:105)
      	at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:819)
      	at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:815)
      	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:80)
      	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:71)
      	at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:94)
      	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:71)
      	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:71)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      Attachments

        Issue Links

          Activity

            People

              davies Davies Liu
              josephkb Joseph K. Bradley
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: