diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 32fbad5bce..1ed4bb0c3c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -970,17 +970,16 @@ public InternalCompletionListener(TaskWrapper taskWrapper) { public void onSuccess(TaskRunner2Result result) { if (LOG.isDebugEnabled()) { LOG.debug("Received successful completion for: {}", - taskWrapper.getRequestId()); + taskWrapper.getRequestId()); + } + try { + metrics.addMetricsQueueTime(taskWrapper.getTaskRunnerCallable().getQueueTime()); + metrics.addMetricsRunningTime(taskWrapper.getTaskRunnerCallable().getRunningTime()); + taskCleanup(result.getEndReason()); + taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); + } catch (Exception err) { + LOG.error("Error during InternalCompletionListener for " + taskWrapper.getRequestId(), err); } - updateFallOffStats(taskWrapper.getRequestId()); - knownTasks.remove(taskWrapper.getRequestId()); - taskWrapper.setIsInPreemptableQueue(false); - taskWrapper.maybeUnregisterForFinishedStateNotifications(); - taskWrapper.getTaskRunnerCallable().setWmCountersDone(); - metrics.addMetricsQueueTime(taskWrapper.getTaskRunnerCallable().getQueueTime()); - metrics.addMetricsRunningTime(taskWrapper.getTaskRunnerCallable().getRunningTime()); - updatePreemptionListAndNotify(result.getEndReason()); - taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); } @Override @@ -989,16 +988,34 @@ public void onFailure(Throwable t) { LOG.debug("Received failed completion for: {}", taskWrapper.getRequestId()); } - updateFallOffStats(taskWrapper.getRequestId()); - knownTasks.remove(taskWrapper.getRequestId()); - taskWrapper.setIsInPreemptableQueue(false); - taskWrapper.maybeUnregisterForFinishedStateNotifications(); - taskWrapper.getTaskRunnerCallable().setWmCountersDone(); - updatePreemptionListAndNotify(null); - taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t); + try { + taskCleanup(null); + taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t); + } catch (Exception err) { + LOG.error("Error during InternalCompletionListener for " + taskWrapper.getRequestId(), err); + } LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t)); } + private void taskCleanup(EndReason reason) { + try { + updateFallOffStats(taskWrapper.getRequestId()); + knownTasks.remove(taskWrapper.getRequestId()); + taskWrapper.setIsInPreemptableQueue(false); + try { + taskWrapper.maybeUnregisterForFinishedStateNotifications(); + } catch (Exception err) { + // Known error at this point from HIVE-23061. + // Catching here so we can continue on to updatePreemptionListAndNotify(). + LOG.warn("Ignoring error during maybeUnregisterForFinishedStateNotifications() for :" + taskWrapper.getRequestId(), err); + } + taskWrapper.getTaskRunnerCallable().setWmCountersDone(); + updatePreemptionListAndNotify(reason); + } catch (Exception err) { + LOG.warn("Ignoring error during cleanup for: " + taskWrapper.getRequestId(), err); + } + } + private void updatePreemptionListAndNotify(EndReason reason) { // if this task was added to pre-emption list, remove it if (enablePreemption) {