Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-12964

SparkContext.localProperties leaked

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Incomplete
    • 1.6.0
    • None
    • Spark Core

    Description

      I have a non-deterministic but quite reliable reproduction for a case where spark.sql.execution.id is leaked. Operations then die with spark.sql.execution.id is already set. These threads never recover because there is nothing to unset spark.sql.execution.id. (It's not a case of nested withNewExecutionId calls.)

      I have figured out why this happens. We are within a withNewExecutionId block. At some point we call back to user code. (In our case this is a custom data source's buildScan method.) The user code calls scala.concurrent.Await.result. Because our thread is a member of a ForkJoinPool (this is a Play HTTP serving thread) Await.result starts a new thread. SparkContext.localProperties is cloned for this thread and then it's ready to serve an HTTP request.

      The first thread then finishes waiting, finishes buildScan, and leaves withNewExecutionId, clearing spark.sql.execution.id in the finally block. All good. But some time later another HTTP request will be served by the second thread. This thread is "poisoned" with a spark.sql.execution.id. When it tries to use withNewExecutionId it fails.


      I don't know who's at fault here.

      • I don't like the ThreadLocal properties anyway. Why not create an Execution object and let it wrap the operation? Then you could have two executions in parallel on the same thread, and other stuff like that. It would be much clearer than storing the execution ID in a kind-of-global variable.
      • Why do we have to inherit the ThreadLocal properties? I'm sure there is a good reason, but this is essentially a bug-generator in my view. (It has already generated https://issues.apache.org/jira/browse/SPARK-10563.)
      • Await.result — I never would have thought it starts threads.
      • We probably shouldn't be calling Await.result inside buildScan.
      • We probably shouldn't call Spark things from HTTP serving threads.

      I'm not sure what could be done on the Spark side, but I thought I should mention this interesting issue. For supporting evidence here is the stack trace when localProperties is getting cloned. It's contents at that point are:

      {spark.sql.execution.id=0, spark.rdd.scope.noOverride=true, spark.rdd.scope={"id":"4","name":"ExecutedCommand"}}
      
        at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:364) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:362) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:353) [na:1.7.0_91]                
        at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:261) [na:1.7.0_91]                
        at java.lang.ThreadLocal.createInheritedMap(ThreadLocal.java:236) [na:1.7.0_91]                   
        at java.lang.Thread.init(Thread.java:416) [na:1.7.0_91]                                           
        at java.lang.Thread.init(Thread.java:349) [na:1.7.0_91]                                           
        at java.lang.Thread.<init>(Thread.java:508) [na:1.7.0_91]                                         
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.<init>(ForkJoinWorkerThread.java:48) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.<init>(ExecutionContextImpl.scala:42) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory.newThread(ExecutionContextImpl.scala:42) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.concurrent.forkjoin.ForkJoinPool.tryCompensate(ForkJoinPool.java:2341) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3638) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.blockOn(ExecutionContextImpl.scala:45) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.concurrent.Await$.result(package.scala:107) [org.scala-lang.scala-library-2.10.5.jar:na] 
        at com.lynxanalytics.biggraph.graph_api.SafeFuture.awaitResult(SafeFuture.scala:50) [biggraph.jar]
        at com.lynxanalytics.biggraph.graph_api.DataManager.get(DataManager.scala:315) [biggraph.jar]     
        at com.lynxanalytics.biggraph.graph_api.Scripting$.getData(Scripting.scala:87) [biggraph.jar]     
        at com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46) [biggraph.jar]
        at com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46) [biggraph.jar]
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.collection.AbstractTraversable.map(Traversable.scala:105) [org.scala-lang.scala-library-2.10.5.jar:na]
        at com.lynxanalytics.biggraph.table.TableRelation.buildScan(DefaultSource.scala:46) [biggraph.jar]
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:274) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:273) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:352) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:269) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:60) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) [org.scala-lang.scala-library-2.10.5.jar:na]
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:58) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:78) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0]
        at com.lynxanalytics.biggraph.controllers.SQLController.exportSQLQuery(SQLController.scala:112) [biggraph.jar]
        at com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357) [biggraph.jar]
        at com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357) [biggraph.jar]
        at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65) [biggraph.jar]
        at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65) [biggraph.jar]
        at scala.util.Try$.apply(Try.scala:161) [org.scala-lang.scala-library-2.10.5.jar:na]              
        at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:65) [biggraph.jar]
        at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:58) [biggraph.jar]
        at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34) [biggraph.jar]
        at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34) [biggraph.jar]
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [org.scala-lang.scala-library-2.10.5.jar:na]
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [org.scala-lang.scala-library-2.10.5.jar:na]
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              darabos Daniel Darabos
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: