diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java index a80bb9b..8fe59d4 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java @@ -53,6 +53,11 @@ public synchronized E offer(E e, int additionalElementsAllowed) { currentSize++; return null; } else { + if (isEmpty()) { + // Empty queue. But no capacity available, due to waitQueueSize and additionalElementsAllowed + // Return the element. + return e; + } // No capacity. Check if an element needs to be evicted. E last = deque.peekLast(); if (comparator.compare(e, last) < 0) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 9eaa7d7..4f2e325 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -277,7 +277,8 @@ public void run() { } // If the task cannot finish and if no slots are available then don't schedule it. // Also don't wait if we have a task and we just killed something to schedule it. - boolean shouldWait = numSlotsAvailable.get() == 0 && lastKillTimeMs == null; + // (numSlotsAvailable can go negative, if the callback after the thread completes is delayed) + boolean shouldWait = numSlotsAvailable.get() <= 0 && lastKillTimeMs == null; if (task.getTaskRunnerCallable().canFinish()) { if (isDebugEnabled) { LOG.debug("Attempting to schedule task {}, canFinish={}. Current state: " @@ -728,8 +729,8 @@ public void onSuccess(TaskRunner2Result result) { knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); - taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); updatePreemptionListAndNotify(result.getEndReason()); + taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); } @Override @@ -742,8 +743,8 @@ public void onFailure(Throwable t) { knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); - taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t); updatePreemptionListAndNotify(null); + taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t); LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t)); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 25cd039..fae794f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -410,6 +410,7 @@ public void shutdown() { taskReporter.shutdown(); } if (umbilical != null) { + // TODO: Can this be moved out of the main callback path RPC.stopProxy(umbilical); } }