diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index e544789..3fd7920 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -67,7 +67,6 @@ private final QueryTracker queryTracker; private final Scheduler executorService; private final AtomicReference localAddress; - private final String[] localDirsBase; private final Map localEnv = new HashMap<>(); private final long memoryPerExecutor; private final LlapDaemonExecutorMetrics metrics; @@ -87,7 +86,6 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi this.conf = conf; Preconditions.checkState(numExecutors > 0, "Invalid number of executors: " + numExecutors + ". Must be > 0"); - this.localDirsBase = localDirsBase; this.localAddress = localAddress; this.queryTracker = new QueryTracker(conf, localDirsBase); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java index e8d789b..101a69c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java @@ -27,8 +27,8 @@ * returned back. If the queue is not full, new element will be added to queue and null is returned. */ public class EvictingPriorityBlockingQueue { - private PriorityBlockingDeque deque; - private Comparator comparator; + private final PriorityBlockingDeque deque; + private final Comparator comparator; public EvictingPriorityBlockingQueue(Comparator comparator, int maxSize) { this.deque = new PriorityBlockingDeque<>(comparator, maxSize); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 08af1e2..5323f05 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -71,9 +71,13 @@ // some object to lock upon. Used by task scheduler to notify wait queue scheduler of new items // to wait queue private final Object waitLock; + // Thread pool for actual execution of work. private final ListeningExecutorService executorService; private final EvictingPriorityBlockingQueue waitQueue; + // Thread pool for taking entities off the wait queue. private final ListeningExecutorService waitQueueExecutorService; + // Thread pool for callbacks on completion of execution of a work unit. + private final ListeningExecutorService executionCompletionExecutorService; private final BlockingQueue preemptionQueue; private final boolean enablePreemption; private final ThreadPoolExecutor threadPoolExecutor; @@ -94,9 +98,14 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePr this.numSlotsAvailable = new AtomicInteger(numExecutors); // single threaded scheduler for tasks from wait queue to executor threads - ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT).build()); this.waitQueueExecutorService = MoreExecutors.listeningDecorator(wes); + + ExecutorService executionCompletionExecutorServiceRaw = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ExecutionCompletionThread #%d") + .build()); + executionCompletionExecutorService = MoreExecutors.listeningDecorator(executionCompletionExecutorServiceRaw); ListenableFuture future = waitQueueExecutorService.submit(new WaitQueueWorker()); Futures.addCallback(future, new WaitQueueWorkerCallback()); } @@ -125,8 +134,12 @@ public void run() { // if the task cannot finish and if no slots are available then don't schedule it. // TODO: Event notifications that change canFinish state should notify waitLock synchronized (waitLock) { + // KKK Is this a tight loop when there's only finishable tasks available ? if (!task.canFinish() && numSlotsAvailable.get() == 0) { waitLock.wait(); + // Another task at a higher priority may have come in during the wait. Lookup the + // queue again to pick up the task at the highest priority. + continue; } } @@ -190,7 +203,9 @@ private boolean trySchedule(final TaskRunnerCallable task) { try { ListenableFuture future = executorService.submit(task); FutureCallback wrappedCallback = new InternalCompletionListener(task); - Futures.addCallback(future, wrappedCallback); + // Callback on a separate thread so that when a task completes, the thread in the main queue + // is actually available for execution and will not potentially result in a RejectedExecution + Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService); if (isInfoEnabled) { LOG.info(task.getRequestId() + " scheduled for execution."); @@ -216,13 +231,15 @@ private boolean trySchedule(final TaskRunnerCallable task) { } TaskRunnerCallable pRequest = preemptionQueue.remove(); - if (pRequest != null && !pRequest.isCompleted() && !pRequest.isKillInvoked()) { + if (pRequest != null) { if (isInfoEnabled) { - LOG.info("Kill task invoked for " + pRequest.getRequestId() + " due to pre-emption"); + LOG.info("Invoking kill task for {} due to pre-emption.", pRequest.getRequestId()); } - pRequest.setKillInvoked(); + // The task will either be killed or is already in the process of completing, which will + // trigger the next scheduling run, or result in available slots being higher than 0, + // which will cause the scheduler loop to continue. pRequest.killTask(); } } @@ -241,14 +258,12 @@ public InternalCompletionListener(TaskRunnerCallable task) { @Override public void onSuccess(TaskRunner2Result result) { - task.setCompleted(); task.getCallback().onSuccess(result); updatePreemptionListAndNotify(result.getEndReason()); } @Override public void onFailure(Throwable t) { - task.setCompleted(); task.getCallback().onFailure(t); updatePreemptionListAndNotify(null); LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t)); @@ -282,23 +297,9 @@ public void shutDown(boolean awaitTermination) { LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + " service gracefully"); } - executorService.shutdown(); - try { - if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - executorService.shutdownNow(); - } - - waitQueueExecutorService.shutdown(); - try { - if (!waitQueueExecutorService.awaitTermination(1, TimeUnit.MINUTES)) { - waitQueueExecutorService.shutdownNow(); - } - } catch (InterruptedException e) { - waitQueueExecutorService.shutdownNow(); - } + shutdownExecutor(waitQueueExecutorService); + shutdownExecutor(executorService); + shutdownExecutor(executionCompletionExecutorService); } else { if (isDebugEnabled) { LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + @@ -309,6 +310,17 @@ public void shutDown(boolean awaitTermination) { } } + private void shutdownExecutor(ExecutorService executorService) { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + } + } + @VisibleForTesting public static class WaitQueueComparator implements Comparator { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index e505070..d1b1c61 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -102,8 +102,8 @@ private boolean shouldRunTask = true; final Stopwatch runtimeWatch = new Stopwatch(); final Stopwatch killtimerWatch = new Stopwatch(); - private AtomicBoolean isCompleted; - private AtomicBoolean killInvoked; + private final AtomicBoolean isCompleted = new AtomicBoolean(false); + private final AtomicBoolean killInvoked = new AtomicBoolean(false); TaskRunnerCallable(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, Configuration conf, ExecutionContext executionContext, Map envMap, @@ -133,24 +133,6 @@ this.metrics = metrics; this.requestId = getTaskAttemptId(request); this.killedTaskHandler = killedTaskHandler; - this.isCompleted = new AtomicBoolean(false); - this.killInvoked = new AtomicBoolean(false); - } - - public void setCompleted() { - isCompleted.set(true); - } - - public boolean isCompleted() { - return isCompleted.get(); - } - - public boolean isKillInvoked() { - return killInvoked.get(); - } - - public void setKillInvoked() { - killInvoked.set(true); } @Override @@ -226,6 +208,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { if (result.isContainerShutdownRequested()) { LOG.warn("Unexpected container shutdown requested while running task. Ignoring"); } + isCompleted.set(true); return result; } finally { @@ -242,21 +225,38 @@ public LlapTaskUmbilicalProtocol run() throws Exception { /** * Attempt to kill a running task. If the task has not started running, it will not start. * If it's already running, a kill request will be sent to it. - * + *

* The AM will be informed about the task kill. */ public void killTask() { - synchronized (this) { - LOG.info("Killing task with id {}, taskRunnerSetup={}", taskSpec.getTaskAttemptID(), (taskRunner != null)); - if (taskRunner != null) { - killtimerWatch.start(); - LOG.info("Issuing kill to task {}" + taskSpec.getTaskAttemptID()); - taskRunner.killTask(); - shouldRunTask = false; + if (!isCompleted.get()) { + if (!killInvoked.getAndSet(true)) { + synchronized (this) { + LOG.info("Kill task requested for id={}, taskRunnerSetup={}", taskSpec.getTaskAttemptID(), + (taskRunner != null)); + if (taskRunner != null) { + killtimerWatch.start(); + LOG.info("Issuing kill to task {}", taskSpec.getTaskAttemptID()); + boolean killed = taskRunner.killTask(); + if (killed) { + // Sending a kill message to the AM right here. Don't need to wait for the task to complete. + reportTaskKilled(); + } else { + LOG.info("Kill request for task {} did not complete because the task is already complete", + taskSpec.getTaskAttemptID()); + } + shouldRunTask = false; + } + } + } else { + // This should not happen. + LOG.warn("Ignoring kill request for task {} since a previous kill request was processed", + taskSpec.getTaskAttemptID()); } + } else { + LOG.info("Ignoring kill request for task {} since it's already complete", + taskSpec.getTaskAttemptID()); } - // Sending a kill message to the AM right here. Don't need to wait for the task to complete. - reportTaskKilled(); } /** @@ -382,6 +382,7 @@ public SubmitWorkRequestProto getRequest() { // via a kill message when a task kill is requested by the daemon. @Override public void onSuccess(TaskRunner2Result result) { + isCompleted.set(true); switch(result.getEndReason()) { // Only the KILLED case requires a message to be sent out to the AM. case SUCCESS: @@ -424,6 +425,7 @@ public void onSuccess(TaskRunner2Result result) { @Override public void onFailure(Throwable t) { + isCompleted.set(true); LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t); // TODO HIVE-10236 Report a fatal error over the umbilical taskRunnerCallable.shutdown();