diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index d8b517d48b..d6d0779d44 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -884,10 +884,20 @@ private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishab taskWrapper.updateCanFinishForPriority(newFinishableState); forceReinsertIntoQueue(taskWrapper, isRemoved); } else { - taskWrapper.updateCanFinishForPriority(newFinishableState); - if (!newFinishableState && !taskWrapper.isInPreemptionQueue()) { - // No need to check guaranteed here; if it was false we would already be in the queue. + // if speculative task, any finishable state change should re-order the queue as speculative tasks are always + // not-guaranteed (re-order helps put non-finishable's ahead of finishable) + if (!taskWrapper.isGuaranteed()) { + removeFromPreemptionQueue(taskWrapper); + taskWrapper.updateCanFinishForPriority(newFinishableState); addToPreemptionQueue(taskWrapper); + } else { + // if guaranteed task, if the finishable state changed to non-finishable and if the task doesn't exist + // pre-emption queue, then add it so that it becomes candidate to kill + taskWrapper.updateCanFinishForPriority(newFinishableState); + if (!newFinishableState && !taskWrapper.isInPreemptionQueue()) { + // No need to check guaranteed here; if it was false we would already be in the queue. + addToPreemptionQueue(taskWrapper); + } } }