diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java index f0425627760b3b45518379fe60b464c76534a44c..e8d789b4724cde303ea8e95d6085a394c03225b6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java @@ -49,19 +49,19 @@ public synchronized E offer(E e) { } } - public boolean isEmpty() { + public synchronized boolean isEmpty() { return deque.isEmpty(); } - public E peek() { + public synchronized E peek() { return deque.peek(); } - public E take() throws InterruptedException { + public synchronized E take() throws InterruptedException { return deque.take(); } - public void remove(E e) { + public synchronized void remove(E e) { deque.remove(e); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 5253e5bbda0c87721c24cfd3663ccdddf28024b2..08af1e2b677df4e3167e9ac212de692b09b407d8 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -18,9 +18,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; import java.util.Comparator; -import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; @@ -30,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; import org.slf4j.Logger; @@ -66,7 +65,6 @@ private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); - private static final boolean isTraceEnabled = LOG.isTraceEnabled(); private static final String TASK_EXECUTOR_THREAD_NAME_FORMAT = "Task-Executor-%d"; private static final String WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT = "Wait-Queue-Scheduler-%d"; @@ -76,7 +74,6 @@ private final ListeningExecutorService executorService; private final EvictingPriorityBlockingQueue waitQueue; private final ListeningExecutorService waitQueueExecutorService; - private final Map idToTaskMap; private final BlockingQueue preemptionQueue; private final boolean enablePreemption; private final ThreadPoolExecutor threadPoolExecutor; @@ -91,7 +88,6 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePr new SynchronousQueue(), // direct hand-off new ThreadFactoryBuilder().setNameFormat(TASK_EXECUTOR_THREAD_NAME_FORMAT).build()); this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor); - this.idToTaskMap = new ConcurrentHashMap<>(); this.preemptionQueue = new PriorityBlockingQueue<>(numExecutors, new PreemptionQueueComparator()); this.enablePreemption = enablePreemption; @@ -114,8 +110,9 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePr @Override public void run() { try { - if (waitQueue.isEmpty()) { - synchronized (waitLock) { + + synchronized (waitLock) { + while (waitQueue.isEmpty()) { waitLock.wait(); } } @@ -140,8 +137,8 @@ public void run() { waitQueue.remove(task); } - if (waitQueue.isEmpty()) { - synchronized (waitLock) { + synchronized (waitLock) { + while (waitQueue.isEmpty()) { waitLock.wait(); } } @@ -172,8 +169,8 @@ public void onFailure(Throwable t) { public void schedule(TaskRunnerCallable task) throws RejectedExecutionException { TaskRunnerCallable evictedTask = waitQueue.offer(task); if (evictedTask == null) { - if (isDebugEnabled) { - LOG.debug(task.getRequestId() + " added to wait queue."); + if (isInfoEnabled) { + LOG.info(task.getRequestId() + " added to wait queue."); } synchronized (waitLock) { @@ -192,8 +189,7 @@ private boolean trySchedule(final TaskRunnerCallable task) { boolean scheduled = false; try { ListenableFuture future = executorService.submit(task); - FutureCallback wrappedCallback = - new InternalCompletionListener(task.getCallback()); + FutureCallback wrappedCallback = new InternalCompletionListener(task); Futures.addCallback(future, wrappedCallback); if (isInfoEnabled) { @@ -203,11 +199,10 @@ private boolean trySchedule(final TaskRunnerCallable task) { // only tasks that cannot finish immediately are pre-emptable. In other words, if all inputs // to the tasks are not ready yet, the task is eligible for pre-emptable. if (enablePreemption && !task.canFinish()) { - if (isDebugEnabled) { - LOG.debug(task.getRequestId() + " is not finishable and pre-emption is enabled." - + "Adding it to pre-emption queue."); + if (isInfoEnabled) { + LOG.info(task.getRequestId() + " is not finishable. Adding it to pre-emption queue."); } - addTaskToPreemptionList(task); + preemptionQueue.add(task); } numSlotsAvailable.decrementAndGet(); @@ -216,21 +211,18 @@ private boolean trySchedule(final TaskRunnerCallable task) { if (enablePreemption && task.canFinish() && !preemptionQueue.isEmpty()) { - if (isTraceEnabled) { - LOG.trace("idToTaskMap: " + idToTaskMap.keySet()); + if (isDebugEnabled) { LOG.trace("preemptionQueue: " + preemptionQueue); } - TaskRunnerCallable pRequest = preemptionQueue.peek(); + TaskRunnerCallable pRequest = preemptionQueue.remove(); + if (pRequest != null && !pRequest.isCompleted() && !pRequest.isKillInvoked()) { - // if some task completes, it will remove itself from pre-emption lists, making this null. - // if it happens bail out and schedule it again as a free slot will be available. - if (pRequest != null) { - - if (isDebugEnabled) { - LOG.debug("Kill task invoked for " + pRequest.getRequestId() + " due to pre-emption"); + if (isInfoEnabled) { + LOG.info("Kill task invoked for " + pRequest.getRequestId() + " due to pre-emption"); } + pRequest.setKillInvoked(); pRequest.killTask(); } } @@ -239,46 +231,37 @@ private boolean trySchedule(final TaskRunnerCallable task) { return scheduled; } - private synchronized void removeTaskFromPreemptionList(TaskRunnerCallable pRequest, - String requestId) { - idToTaskMap.remove(requestId); - preemptionQueue.remove(pRequest); - } - - private synchronized void addTaskToPreemptionList(TaskRunnerCallable task) { - idToTaskMap.put(task.getRequestId(), task); - preemptionQueue.add(task); - } - private final class InternalCompletionListener implements FutureCallback { - private TaskRunnerCallable.TaskRunnerCallback wrappedCallback; + private TaskRunnerCallable task; - public InternalCompletionListener(TaskRunnerCallable.TaskRunnerCallback wrappedCallback) { - this.wrappedCallback = wrappedCallback; + public InternalCompletionListener(TaskRunnerCallable task) { + this.task = task; } @Override public void onSuccess(TaskRunner2Result result) { - wrappedCallback.onSuccess(result); + task.setCompleted(); + task.getCallback().onSuccess(result); updatePreemptionListAndNotify(result.getEndReason()); } @Override public void onFailure(Throwable t) { - wrappedCallback.onFailure(t); + task.setCompleted(); + task.getCallback().onFailure(t); updatePreemptionListAndNotify(null); + LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t)); } private void updatePreemptionListAndNotify(EndReason reason) { // if this task was added to pre-emption list, remove it - String taskId = wrappedCallback.getRequestId(); - TaskRunnerCallable task = idToTaskMap.get(taskId); - String state = reason == null ? "FAILED" : reason.name(); - if (enablePreemption && task != null) { - removeTaskFromPreemptionList(task, taskId); - if (isDebugEnabled) { - LOG.debug(task.getRequestId() + " request " + state + "! Removed from preemption list."); + if (enablePreemption) { + String state = reason == null ? "FAILED" : reason.name(); + preemptionQueue.remove(task.getRequest()); + if (isInfoEnabled) { + LOG.info(TaskRunnerCallable.getTaskIdentifierString(task.getRequest()) + + " request " + state + "! Removed from preemption list."); } } @@ -327,16 +310,6 @@ public void shutDown(boolean awaitTermination) { } @VisibleForTesting - public int getPreemptionListSize() { - return preemptionQueue.size(); - } - - @VisibleForTesting - public TaskRunnerCallable getPreemptionTask() { - return preemptionQueue.peek(); - } - - @VisibleForTesting public static class WaitQueueComparator implements Comparator { @Override diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 72fcf3b34d4ebd2a7c41cb7124197c52becf877d..e5050707db53c29e149468163008e522a1088ef6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; @@ -101,6 +102,8 @@ private boolean shouldRunTask = true; final Stopwatch runtimeWatch = new Stopwatch(); final Stopwatch killtimerWatch = new Stopwatch(); + private AtomicBoolean isCompleted; + private AtomicBoolean killInvoked; TaskRunnerCallable(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, Configuration conf, ExecutionContext executionContext, Map envMap, @@ -130,6 +133,24 @@ this.metrics = metrics; this.requestId = getTaskAttemptId(request); this.killedTaskHandler = killedTaskHandler; + this.isCompleted = new AtomicBoolean(false); + this.killInvoked = new AtomicBoolean(false); + } + + public void setCompleted() { + isCompleted.set(true); + } + + public boolean isCompleted() { + return isCompleted.get(); + } + + public boolean isKillInvoked() { + return killInvoked.get(); + } + + public void setKillInvoked() { + killInvoked.set(true); } @Override @@ -342,21 +363,19 @@ public TaskRunnerCallback getCallback() { return new TaskRunnerCallback(request, this); } + public SubmitWorkRequestProto getRequest() { + return request; + } + final class TaskRunnerCallback implements FutureCallback { private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request; private final TaskRunnerCallable taskRunnerCallable; - private final String requestId; TaskRunnerCallback(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, TaskRunnerCallable taskRunnerCallable) { this.request = request; this.taskRunnerCallable = taskRunnerCallable; - this.requestId = getTaskIdentifierString(request); - } - - public String getRequestId() { - return requestId; } // Errors are handled on the way over. FAIL/SUCCESS is informed via regular heartbeats. Killed