diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 3487e19..6aed60f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -178,7 +178,12 @@ synchronized boolean registerForUpdates(FinishableStateUpdateHandler handler, sourceToEntity.put(source, entityInfo); } - return lastFinishableState == fragmentInfo.canFinish(); + if (lastFinishableState != fragmentInfo.canFinish()) { + entityInfo.setLastFinishableState(fragmentInfo.canFinish()); + return false; + } else { + return true; + } } synchronized void unregisterForUpdates(FinishableStateUpdateHandler handler) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 453a71e..b3e5f74 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -282,9 +282,12 @@ private boolean trySchedule(final TaskWrapper taskWrapper) { boolean scheduled = false; try { + + boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); + // It's safe to register outside of the lock since the stateChangeTracker ensures that updates + // and registrations are mutually exclusive. + boolean stateChanged = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); synchronized (lock) { - boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); - boolean stateChanged = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); ListenableFuture future = executorService.submit(taskWrapper.getTaskRunnerCallable()); taskWrapper.setIsInWaitQueue(false); FutureCallback wrappedCallback = new InternalCompletionListener(taskWrapper); @@ -299,7 +302,7 @@ private boolean trySchedule(final TaskWrapper taskWrapper) { // only tasks that cannot finish immediately are pre-emptable. In other words, if all inputs // to the tasks are not ready yet, the task is eligible for pre-emptable. if (enablePreemption) { - if (!canFinish && !stateChanged) { + if ((!canFinish && !stateChanged) || (canFinish && stateChanged)) { if (isInfoEnabled) { LOG.info("{} is not finishable. Adding it to pre-emption queue", taskWrapper.getRequestId()); } @@ -538,6 +541,8 @@ public TaskWrapper(TaskRunnerCallable taskRunnerCallable, TaskExecutorService ta * @param currentFinishableState * @return true if the current state is the same as the currentFinishableState. false if the state has already changed. */ + // Synchronized to avoid register / unregister clobbering each other. + // Don't invoke from within a scheduler lock public synchronized boolean maybeRegisterForFinishedStateNotifications( boolean currentFinishableState) { if (!registeredForNotifications) { @@ -549,6 +554,8 @@ public synchronized boolean maybeRegisterForFinishedStateNotifications( } } + // Synchronized to avoid register / unregister clobbering each other. + // Don't invoke from within a scheduler lock public synchronized void maybeUnregisterForFinishedStateNotifications() { if (registeredForNotifications) { registeredForNotifications = false; @@ -590,6 +597,7 @@ public String toString() { '}'; } + // No task lock. But acquires lock on the scheduler @Override public void finishableStateUpdated(boolean finishableState) { // This method should not by synchronized. Can lead to deadlocks since it calls a sync method.