Uploaded image for project: 'Kylin'
  1. Kylin
  2. KYLIN-5271

Query memory leaks

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • v4.0.1
    • v4.0.3
    • Query Engine
    • None

    Description

      The query thread will clone a SparkSession and put it into ThreadLocal. However, if an exception occurs in the Calcite To SparkPlan, the SparkSession in ThreadLocal will not be removed. More importantly, if the Spark restarts later, the SparkSession left in ThreadLocal will be unavailable, and the query on this thread will fail, throwing an exception: Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext

      This stopped SparkContext was created at:

      org.apache.spark.sql.SparderContext$$anon$4.run(SparderContext.scala:150)

      java.lang.Thread.run(Thread.java:748)

      // put SparkSession toThreadLocal

      object SparderContextFacade extends Logging {
      
        final val CURRENT_SPARKSESSION: InternalThreadLocal[Pair[SparkSession, UdfManager]] =
          new InternalThreadLocal[Pair[SparkSession, UdfManager]]()
      
        def current(): Pair[SparkSession, UdfManager] = {
          if (CURRENT_SPARKSESSION.get() == null) {
            val spark = SparderContext.getOriginalSparkSession.cloneSession()
            CURRENT_SPARKSESSION.set(new Pair[SparkSession, UdfManager](spark,
              UdfManager.createWithoutBuildInFunc(spark)))
          }
          CURRENT_SPARKSESSION.get()
        }
      
        def remove(): Unit = {
          CURRENT_SPARKSESSION.remove()
        }
      }
      

      // remove SparkSession from ThreadLocal
      // org.apache.kylin.query.runtime.plans.ResultPlan

          def getResult(df: DataFrame, rowType: RelDataType, resultType: ResultType)
        : Either[Enumerable[Array[Any]], Enumerable[Any]] = withScope(df) {
          val result: Either[Enumerable[Array[Any]], Enumerable[Any]] =
            resultType match {
              case ResultType.NORMAL =>
                if (SparderContext.needCompute()) {
                  Left(ResultPlan.collectEnumerable(df, rowType))
                } else {
                  Left(Linq4j.asEnumerable(Array.empty[Array[Any]]))
                }
              case ResultType.SCALA =>
                if (SparderContext.needCompute()) {
                  Right(ResultPlan.collectScalarEnumerable(df, rowType))
                } else {
                  Right(Linq4j.asEnumerable(Lists.newArrayList[Any]()))
                }
            }
          SparderContext.cleanQueryInfo()
          SparderContext.closeThreadSparkSession()
          result
        }
      

      Attachments

        Activity

          People

            zhaoliu4 Liu Zhao
            zhaoliu4 Liu Zhao
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: