Description
When we run in background and we get exception which is not HiveSQLException,
we may encounter memory leak since handleToOperation will not removed correctly.
The reason is below:
1. when calling operation.run we throw an exception which is not HiveSQLException
2. then opHandleSet will not add the opHandle, and operationManager.closeOperation(opHandle); will not be called
private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay, boolean runAsync) throws HiveSQLException { this.acquire(true); OperationManager operationManager = this.getOperationManager(); ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(this.getSession(), statement, confOverlay, runAsync); OperationHandle opHandle = operation.getHandle(); OperationHandle e; try { operation.run(); this.opHandleSet.add(opHandle); e = opHandle; } catch (HiveSQLException var11) { operationManager.closeOperation(opHandle); throw var11; } finally { this.release(true); } return e; } try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) throw e } }
3. when we close the session we will also call operationManager.closeOperation(opHandle),since we did not add this opHandle into the opHandleSet.
public void close() throws HiveSQLException { try { this.acquire(true); Iterator ioe = this.opHandleSet.iterator(); while(ioe.hasNext()) { OperationHandle opHandle = (OperationHandle)ioe.next(); this.operationManager.closeOperation(opHandle); } this.opHandleSet.clear(); this.cleanupSessionLogDir(); this.cleanupPipeoutFile(); HiveHistory ioe1 = this.sessionState.getHiveHistory(); if(null != ioe1) { ioe1.closeStream(); } try { this.sessionState.close(); } finally { this.sessionState = null; } } catch (IOException var17) { throw new HiveSQLException("Failure to close", var17); } finally { if(this.sessionState != null) { try { this.sessionState.close(); } catch (Throwable var15) { LOG.warn("Error closing session", var15); } this.sessionState = null; } this.release(true); } }
4. however, the opHandle will added into handleToOperation for each statement
val handleToOperation = ReflectionUtils .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation") val sessionToActivePool = new ConcurrentHashMap[SessionHandle, String]() val sessionToContexts = new ConcurrentHashMap[SessionHandle, SQLContext]() override def newExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + s" initialized or had already closed.") val conf = sqlContext.sessionState.conf val hiveSessionState = parentSession.getSessionState setConfMap(conf, hiveSessionState.getOverriddenConfigurations) setConfMap(conf, hiveSessionState.getHiveVariables) val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)(sqlContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") operation }
Below is an example which has memory leak:
Attachments
Attachments
Issue Links
- is duplicated by
-
SPARK-26701 spark thrift server driver memory leak
- Resolved
- links to