Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
v4.0.1
-
None
Description
The query thread will clone a SparkSession and put it into ThreadLocal. However, if an exception occurs in the Calcite To SparkPlan, the SparkSession in ThreadLocal will not be removed. More importantly, if the Spark restarts later, the SparkSession left in ThreadLocal will be unavailable, and the query on this thread will fail, throwing an exception: Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext
This stopped SparkContext was created at:
org.apache.spark.sql.SparderContext$$anon$4.run(SparderContext.scala:150)
java.lang.Thread.run(Thread.java:748)
// put SparkSession toThreadLocal
object SparderContextFacade extends Logging { final val CURRENT_SPARKSESSION: InternalThreadLocal[Pair[SparkSession, UdfManager]] = new InternalThreadLocal[Pair[SparkSession, UdfManager]]() def current(): Pair[SparkSession, UdfManager] = { if (CURRENT_SPARKSESSION.get() == null) { val spark = SparderContext.getOriginalSparkSession.cloneSession() CURRENT_SPARKSESSION.set(new Pair[SparkSession, UdfManager](spark, UdfManager.createWithoutBuildInFunc(spark))) } CURRENT_SPARKSESSION.get() } def remove(): Unit = { CURRENT_SPARKSESSION.remove() } }
// remove SparkSession from ThreadLocal
// org.apache.kylin.query.runtime.plans.ResultPlan
def getResult(df: DataFrame, rowType: RelDataType, resultType: ResultType) : Either[Enumerable[Array[Any]], Enumerable[Any]] = withScope(df) { val result: Either[Enumerable[Array[Any]], Enumerable[Any]] = resultType match { case ResultType.NORMAL => if (SparderContext.needCompute()) { Left(ResultPlan.collectEnumerable(df, rowType)) } else { Left(Linq4j.asEnumerable(Array.empty[Array[Any]])) } case ResultType.SCALA => if (SparderContext.needCompute()) { Right(ResultPlan.collectScalarEnumerable(df, rowType)) } else { Right(Linq4j.asEnumerable(Lists.newArrayList[Any]())) } } SparderContext.cleanQueryInfo() SparderContext.closeThreadSparkSession() result }