diff --git a/service/src/java/org/apache/hive/service/cli/CLIService.java b/service/src/java/org/apache/hive/service/cli/CLIService.java index 22c4026..957e11b 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIService.java +++ b/service/src/java/org/apache/hive/service/cli/CLIService.java @@ -498,14 +498,15 @@ private JobProgressUpdate progressUpdateLog(boolean isProgressLogRequested, Oper SessionState sessionState = operation.getParentSession().getSessionState(); long startTime = System.nanoTime(); int timeOutMs = 8; + boolean terminated = operation.isDone(); try { - while (sessionState.getProgressMonitor() == null && !operation.isDone()) { + while ((sessionState.getProgressMonitor() == null) && !terminated) { long remainingMs = (PROGRESS_MAX_WAIT_NS - (System.nanoTime() - startTime)) / 1000000l; if (remainingMs <= 0) { LOG.debug("timed out and hence returning progress log as NULL"); return new JobProgressUpdate(ProgressMonitor.NULL); } - Thread.sleep(Math.min(remainingMs, timeOutMs)); + terminated = operation.waitToTerminate(Math.min(remainingMs, timeOutMs)); timeOutMs <<= 1; } } catch (InterruptedException e) { diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 32c24e8..5109981 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -71,6 +72,7 @@ private long operationTimeout; private volatile long lastAccessTime; private final long beginTime; + private final CountDownLatch opTerminateMonitorLatch; protected long operationStart; protected long operationComplete; @@ -89,6 +91,7 @@ protected Operation(HiveSession parentSession, Map confOverlay, OperationType opType) { this.parentSession = parentSession; this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion()); + opTerminateMonitorLatch = new CountDownLatch(1); beginTime = System.currentTimeMillis(); lastAccessTime = beginTime; operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(), @@ -209,6 +212,16 @@ public boolean isDone() { return state.isTerminal(); } + /** + * Wait until the operation terminates and returns false if timeout. + * @param timeOutMs - timeout in milli-seconds + * @return true if operation is terminated or false if timed-out + * @throws InterruptedException + */ + public boolean waitToTerminate(long timeOutMs) throws InterruptedException { + return opTerminateMonitorLatch.await(timeOutMs, TimeUnit.MILLISECONDS); + } + protected void createOperationLog() { if (parentSession.isOperationLogEnabled()) { File operationLogFile = new File(parentSession.getOperationLogSessionDir(), queryState.getQueryId()); @@ -400,6 +413,11 @@ protected void onNewState(OperationState state, OperationState prevState) { markOperationCompletedTime(); break; } + + if (state.isTerminal()) { + // Unlock the execution thread as operation is already terminated. + opTerminateMonitorLatch.countDown(); + } } public long getOperationComplete() {