Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26751

HiveSessionImpl might have memory leak since Operation do not close properly

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.4.0
    • 2.3.3, 2.4.1, 3.0.0
    • SQL
    • None

    Description

      When we run in background and we get exception which is not HiveSQLException,
      we may encounter memory leak since handleToOperation will not removed correctly.
      The reason is below:
      1. when calling operation.run we throw an exception which is not HiveSQLException
      2. then opHandleSet will not add the opHandle, and operationManager.closeOperation(opHandle); will not be called

       private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException {
              this.acquire(true);
              OperationManager operationManager = this.getOperationManager();
              ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(this.getSession(), statement, confOverlay, runAsync);
              OperationHandle opHandle = operation.getHandle();
      
              OperationHandle e;
              try {
                  operation.run();
                  this.opHandleSet.add(opHandle);
                  e = opHandle;
              } catch (HiveSQLException var11) {
                  operationManager.closeOperation(opHandle);
                  throw var11;
              } finally {
                  this.release(true);
              }
      
              return e;
          }
      
      
            try {
              // This submit blocks if no background threads are available to run this operation
              val backgroundHandle =
                parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
              setBackgroundHandle(backgroundHandle)
            } catch {
              case rejected: RejectedExecutionException =>
                setState(OperationState.ERROR)
                throw new HiveSQLException("The background threadpool cannot accept" +
                  " new task for execution, please retry the operation", rejected)
              case NonFatal(e) =>
                logError(s"Error executing query in background", e)
                setState(OperationState.ERROR)
                throw e
            }
          }
      

      3. when we close the session we will also call operationManager.closeOperation(opHandle),since we did not add this opHandle into the opHandleSet.

      public void close() throws HiveSQLException {
              try {
                  this.acquire(true);
                  Iterator ioe = this.opHandleSet.iterator();
      
                  while(ioe.hasNext()) {
                      OperationHandle opHandle = (OperationHandle)ioe.next();
                      this.operationManager.closeOperation(opHandle);
                  }
      
                  this.opHandleSet.clear();
                  this.cleanupSessionLogDir();
                  this.cleanupPipeoutFile();
                  HiveHistory ioe1 = this.sessionState.getHiveHistory();
                  if(null != ioe1) {
                      ioe1.closeStream();
                  }
      
                  try {
                      this.sessionState.close();
                  } finally {
                      this.sessionState = null;
                  }
              } catch (IOException var17) {
                  throw new HiveSQLException("Failure to close", var17);
              } finally {
                  if(this.sessionState != null) {
                      try {
                          this.sessionState.close();
                      } catch (Throwable var15) {
                          LOG.warn("Error closing session", var15);
                      }
      
                      this.sessionState = null;
                  }
      
                  this.release(true);
              }
      
          }
      

      4. however, the opHandle will added into handleToOperation for each statement

      val handleToOperation = ReflectionUtils
          .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
      
        val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]()
        val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]()
      
        override def newExecuteStatementOperation(
            parentSession: HiveSession,
            statement: String,
            confOverlay: JMap[String, String],
            async: Boolean): ExecuteStatementOperation = synchronized {
          val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
          require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
            s" initialized or had already closed.")
          val conf = sqlContext.sessionState.conf
          val hiveSessionState = parentSession.getSessionState
          setConfMap(conf, hiveSessionState.getOverriddenConfigurations)
          setConfMap(conf, hiveSessionState.getHiveVariables)
          val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
          val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay,
            runInBackground)(sqlContext, sessionToActivePool)
          handleToOperation.put(operation.getHandle, operation)
          logDebug(s"Created Operation for $statement with session=$parentSession, " +
            s"runInBackground=$runInBackground")
          operation
        }
      

      Below is an example which has memory leak:

      Attachments

        1. 26751.png
          126 kB
          zhoukang

        Issue Links

          Activity

            People

              cane zhoukang
              cane zhoukang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: