Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Incomplete
-
1.6.0
-
None
Description
I have a non-deterministic but quite reliable reproduction for a case where spark.sql.execution.id is leaked. Operations then die with spark.sql.execution.id is already set. These threads never recover because there is nothing to unset spark.sql.execution.id. (It's not a case of nested withNewExecutionId calls.)
I have figured out why this happens. We are within a withNewExecutionId block. At some point we call back to user code. (In our case this is a custom data source's buildScan method.) The user code calls scala.concurrent.Await.result. Because our thread is a member of a ForkJoinPool (this is a Play HTTP serving thread) Await.result starts a new thread. SparkContext.localProperties is cloned for this thread and then it's ready to serve an HTTP request.
The first thread then finishes waiting, finishes buildScan, and leaves withNewExecutionId, clearing spark.sql.execution.id in the finally block. All good. But some time later another HTTP request will be served by the second thread. This thread is "poisoned" with a spark.sql.execution.id. When it tries to use withNewExecutionId it fails.
I don't know who's at fault here.
- I don't like the ThreadLocal properties anyway. Why not create an Execution object and let it wrap the operation? Then you could have two executions in parallel on the same thread, and other stuff like that. It would be much clearer than storing the execution ID in a kind-of-global variable.
- Why do we have to inherit the ThreadLocal properties? I'm sure there is a good reason, but this is essentially a bug-generator in my view. (It has already generated https://issues.apache.org/jira/browse/SPARK-10563.)
- Await.result — I never would have thought it starts threads.
- We probably shouldn't be calling Await.result inside buildScan.
- We probably shouldn't call Spark things from HTTP serving threads.
I'm not sure what could be done on the Spark side, but I thought I should mention this interesting issue. For supporting evidence here is the stack trace when localProperties is getting cloned. It's contents at that point are:
{spark.sql.execution.id=0, spark.rdd.scope.noOverride=true, spark.rdd.scope={"id":"4","name":"ExecutedCommand"}}
at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:364) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.SparkContext$$anon$2.childValue(SparkContext.scala:362) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:353) [na:1.7.0_91] at java.lang.ThreadLocal$ThreadLocalMap.<init>(ThreadLocal.java:261) [na:1.7.0_91] at java.lang.ThreadLocal.createInheritedMap(ThreadLocal.java:236) [na:1.7.0_91] at java.lang.Thread.init(Thread.java:416) [na:1.7.0_91] at java.lang.Thread.init(Thread.java:349) [na:1.7.0_91] at java.lang.Thread.<init>(Thread.java:508) [na:1.7.0_91] at scala.concurrent.forkjoin.ForkJoinWorkerThread.<init>(ForkJoinWorkerThread.java:48) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.<init>(ExecutionContextImpl.scala:42) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory.newThread(ExecutionContextImpl.scala:42) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.tryCompensate(ForkJoinPool.java:2341) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3638) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$2.blockOn(ExecutionContextImpl.scala:45) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.Await$.result(package.scala:107) [org.scala-lang.scala-library-2.10.5.jar:na] at com.lynxanalytics.biggraph.graph_api.SafeFuture.awaitResult(SafeFuture.scala:50) [biggraph.jar] at com.lynxanalytics.biggraph.graph_api.DataManager.get(DataManager.scala:315) [biggraph.jar] at com.lynxanalytics.biggraph.graph_api.Scripting$.getData(Scripting.scala:87) [biggraph.jar] at com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46) [biggraph.jar] at com.lynxanalytics.biggraph.table.TableRelation$$anonfun$1.apply(DefaultSource.scala:46) [biggraph.jar] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.collection.AbstractTraversable.map(Traversable.scala:105) [org.scala-lang.scala-library-2.10.5.jar:na] at com.lynxanalytics.biggraph.table.TableRelation.buildScan(DefaultSource.scala:46) [biggraph.jar] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$4.apply(DataSourceStrategy.scala:64) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:274) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:273) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:352) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:269) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:60) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) [org.scala-lang.scala-library-2.10.5.jar:na] at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:47) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:45) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:52) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:52) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution$$anonfun$toString$5.apply(QueryExecution.scala:78) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:58) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:78) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) [spark-assembly-1.6.0-hadoop2.4.0.jar:1.6.0] at com.lynxanalytics.biggraph.controllers.SQLController.exportSQLQuery(SQLController.scala:112) [biggraph.jar] at com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357) [biggraph.jar] at com.lynxanalytics.biggraph.serving.ProductionJsonServer$$anonfun$exportSQLQuery$1.apply(JsonServer.scala:357) [biggraph.jar] at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65) [biggraph.jar] at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1$$anonfun$1.apply(JsonServer.scala:65) [biggraph.jar] at scala.util.Try$.apply(Try.scala:161) [org.scala-lang.scala-library-2.10.5.jar:na] at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:65) [biggraph.jar] at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$jsonPost$1.apply(JsonServer.scala:58) [biggraph.jar] at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34) [biggraph.jar] at com.lynxanalytics.biggraph.serving.JsonServer$$anonfun$action$1$$anonfun$apply$1.apply(JsonServer.scala:34) [biggraph.jar] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [org.scala-lang.scala-library-2.10.5.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [org.scala-lang.scala-library-2.10.5.jar:na]
Attachments
Issue Links
- is cloned by
-
SPARK-20330 CLONE - SparkContext.localProperties leaked
- Closed
- is related to
-
SPARK-13747 Concurrent execution in SQL doesn't work with Scala ForkJoinPool
- Resolved