diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a45b9c..99bb400 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3146,6 +3146,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " EXECUTION: Log completion of tasks\n" + " PERFORMANCE: Execution + Performance logs \n" + " VERBOSE: All logs" ), + HIVE_SERVER2_OPERATION_LOG_CLEANUP_DELAY("hive.server2.operation.log.cleanup.delay", "300s", + new TimeValidator(TimeUnit.SECONDS), "When a query is cancelled (via kill query, query timeout or triggers),\n" + + " operation logs gets cleaned up after this delay"), // HS2 connections guard rails HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER("hive.server2.limit.connections.per.user", 0, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java b/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java index 7213dbf..6d75c29 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java @@ -211,4 +211,9 @@ private void resetIn() { return logs; } } + + @Override + public String toString() { + return logFile.file.toString(); + } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java index 4745688..99eaf02 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java @@ -34,8 +34,6 @@ import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.processors.CommandProcessor; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.hive.ql.session.OperationLog; -import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.ServiceUtils; import org.apache.hive.service.cli.FetchOrientation; @@ -148,7 +146,7 @@ public void close() throws HiveSQLException { setState(OperationState.CLOSED); tearDownSessionIO(); cleanTmpFile(); - cleanupOperationLog(); + cleanupOperationLog(0); } /* (non-Javadoc) diff --git a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java index 3be21b5..00a308a 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/MetadataOperation.java @@ -56,7 +56,7 @@ protected MetadataOperation(HiveSession parentSession, OperationType opType) { @Override public void close() throws HiveSQLException { setState(OperationState.CLOSED); - cleanupOperationLog(); + cleanupOperationLog(0); } /** diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 07ab487..1ee0756 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -22,7 +22,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.common.LogUtils; @@ -63,6 +65,7 @@ protected volatile Future backgroundHandle; protected OperationLog operationLog; protected boolean isOperationLogEnabled; + private ScheduledExecutorService scheduledExecutorService; private long operationTimeout; private volatile long lastAccessTime; @@ -89,6 +92,7 @@ protected Operation(HiveSession parentSession, lastAccessTime = beginTime; operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(), HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS); + scheduledExecutorService = Executors.newScheduledThreadPool(1); currentStateScope = updateOperationStateMetrics(null, MetricsConstant.OPERATION_PREFIX, MetricsConstant.COMPLETED_OPERATION_PREFIX, state); @@ -250,7 +254,24 @@ public void run() throws HiveSQLException { } } - protected synchronized void cleanupOperationLog() { + private static class OperationLogCleaner implements Runnable { + public static final Logger LOG = LoggerFactory.getLogger(OperationLogCleaner.class.getName()); + private OperationLog operationLog; + + public OperationLogCleaner(OperationLog operationLog) { + this.operationLog = operationLog; + } + + @Override + public void run() { + if (operationLog != null) { + LOG.info("Closing operation log {}", operationLog); + operationLog.close(); + } + } + } + + protected synchronized void cleanupOperationLog(final long operationLogCleanupDelayMs) { // stop the appenders for the operation log String queryId = queryState.getQueryId(); LogUtils.stopQueryAppender(LogDivertAppender.QUERY_ROUTING_APPENDER, queryId); @@ -265,7 +286,13 @@ protected synchronized void cleanupOperationLog() { + "but its OperationLog object cannot be found. " + "Perhaps the operation has already terminated."); } else { - operationLog.close(); + if (operationLogCleanupDelayMs > 0) { + scheduledExecutorService.schedule(new OperationLogCleaner(operationLog), operationLogCleanupDelayMs, + TimeUnit.MILLISECONDS); + } else { + LOG.info("Closing operation log {} without delay", operationLog); + operationLog.close(); + } } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 85f92d2..9a07fa1 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -95,6 +95,7 @@ private long queryTimeout; private ScheduledExecutorService timeoutExecutor; private final boolean runAsync; + private final long operationLogCleanupDelayMs; /** * A map to track query count running by each user @@ -114,6 +115,8 @@ public SQLOperation(HiveSession parentSession, String statement, Map 0 && (queryTimeout <= 0 || timeout < queryTimeout)) { this.queryTimeout = timeout; } + this.operationLogCleanupDelayMs = HiveConf.getTimeVar(queryState.getConf(), + HiveConf.ConfVars.HIVE_SERVER2_OPERATION_LOG_CLEANUP_DELAY, TimeUnit.MILLISECONDS); setupSessionIO(parentSession.getSessionState()); @@ -341,7 +344,6 @@ public Object run() throws HiveSQLException { } } - /** * Returns the current UGI on the stack * @@ -403,7 +405,7 @@ public void cancel(OperationState stateAfterCancel) throws HiveSQLException { LOG.info("Cancelling the query execution: " + queryId); } cleanup(stateAfterCancel); - cleanupOperationLog(); + cleanupOperationLog(operationLogCleanupDelayMs); if (stateAfterCancel == OperationState.CANCELED) { LOG.info("Successfully cancelled the query: " + queryId); } @@ -412,7 +414,7 @@ public void cancel(OperationState stateAfterCancel) throws HiveSQLException { @Override public void close() throws HiveSQLException { cleanup(OperationState.CLOSED); - cleanupOperationLog(); + cleanupOperationLog(0); } @Override