diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FinishableStateUpdateHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FinishableStateUpdateHandler.java index e69de29..8d40ce9 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FinishableStateUpdateHandler.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FinishableStateUpdateHandler.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon; + +public interface FinishableStateUpdateHandler { + + void finishableStateUpdated(boolean finishableState); + +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java index f6cd8ab..554864e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java @@ -15,9 +15,11 @@ package org.apache.hadoop.hive.llap.daemon.impl; import java.io.IOException; +import java.util.LinkedList; import java.util.List; import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; @@ -111,6 +113,32 @@ public boolean canFinish() { return queryInfo.getLocalDirs(); } + /** + * + * @param handler + * @param lastFinishableState + * @return true if the current state is the same as the lastFinishableState. false if the state has already changed. + */ + public boolean registerForFinishableStateUpdates(FinishableStateUpdateHandler handler, + boolean lastFinishableState) { + List sourcesOfInterest = new LinkedList<>(); + List inputSpecList = fragmentSpec.getInputSpecsList(); + if (inputSpecList != null && !inputSpecList.isEmpty()) { + for (IOSpecProto inputSpec : inputSpecList) { + if (isSourceOfInterest(inputSpec)) { + sourcesOfInterest.add(inputSpec.getConnectedVertexName()); + } + } + } + return queryInfo.registerForFinishableStateUpdates(handler, sourcesOfInterest, this, + lastFinishableState); + } + + + public void unregisterForFinishableStateUpdates(FinishableStateUpdateHandler handler) { + queryInfo.unregisterFinishableStateUpdate(handler); + } + private boolean isSourceOfInterest(IOSpecProto inputSpec) { String inputClassName = inputSpec.getIoDescriptor().getClassName(); // MRInput is not of interest since it'll always be ready. diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index efa18cd..3487e19 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -16,13 +16,22 @@ import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; @@ -42,6 +51,7 @@ private final ConcurrentMap sourceStateMap; + private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker(); public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier, String user, ConcurrentMap sourceStateMap, @@ -125,4 +135,108 @@ private static String createAppSpecificLocalDir(String baseDir, String applicati return baseDir + File.separator + "usercache" + File.separator + user + File.separator + "appcache" + File.separator + applicationIdString + File.separator + dagIdentifier; } + + /** + * + * @param handler + * @param sources + * @param fragmentInfo + * @param lastFinishableState + * @return true if the current state is the same as the lastFinishableState. false if the state has already changed. + */ + boolean registerForFinishableStateUpdates(FinishableStateUpdateHandler handler, + List sources, QueryFragmentInfo fragmentInfo, + boolean lastFinishableState) { + return finishableStateTracker + .registerForUpdates(handler, sources, fragmentInfo, lastFinishableState); + } + + void unregisterFinishableStateUpdate(FinishableStateUpdateHandler handler) { + finishableStateTracker.unregisterForUpdates(handler); + } + + void sourceStateUpdated(String sourceName) { + finishableStateTracker.sourceStateUpdated(sourceName); + } + + + private static class FinishableStateTracker { + + private final Map trackedEntities = new HashMap<>(); + private final Multimap sourceToEntity = HashMultimap.create(); + + synchronized boolean registerForUpdates(FinishableStateUpdateHandler handler, + List sources, QueryFragmentInfo fragmentInfo, + boolean lastFinishableState) { + EntityInfo entityInfo = + new EntityInfo(handler, sources, fragmentInfo, lastFinishableState); + if (trackedEntities.put(handler, entityInfo) != null) { + throw new IllegalStateException( + "Only a single registration allowed per entity. Duplicate for " + handler.toString()); + } + for (String source : sources) { + sourceToEntity.put(source, entityInfo); + } + + return lastFinishableState == fragmentInfo.canFinish(); + } + + synchronized void unregisterForUpdates(FinishableStateUpdateHandler handler) { + EntityInfo info = trackedEntities.remove(handler); + Preconditions.checkState(info != null, "Cannot invoke unregister on an entity which has not been registered"); + for (String source : info.getSources()) { + sourceToEntity.remove(source, info); + } + } + + synchronized void sourceStateUpdated(String sourceName) { + Collection interestedEntityInfos = sourceToEntity.get(sourceName); + if (interestedEntityInfos != null) { + for (EntityInfo entityInfo : interestedEntityInfos) { + boolean newFinishState = entityInfo.getFragmentInfo().canFinish(); + if (newFinishState != entityInfo.getLastFinishableState()) { + // State changed. Callback + entityInfo.setLastFinishableState(newFinishState); + entityInfo.getHandler().finishableStateUpdated(newFinishState); + } + } + } + } + + + } + + private static class EntityInfo { + final FinishableStateUpdateHandler handler; + final List sources; + final QueryFragmentInfo fragmentInfo; + boolean lastFinishableState; + + public EntityInfo(FinishableStateUpdateHandler handler, List sources, QueryFragmentInfo fragmentInfo, boolean lastFinishableState) { + this.handler = handler; + this.sources = sources; + this.fragmentInfo = fragmentInfo; + this.lastFinishableState = lastFinishableState; + } + + public FinishableStateUpdateHandler getHandler() { + return handler; + } + + public QueryFragmentInfo getFragmentInfo() { + return fragmentInfo; + } + + public boolean getLastFinishableState() { + return lastFinishableState; + } + + public List getSources() { + return sources; + } + + public void setLastFinishableState(boolean lastFinishableState) { + this.lastFinishableState = lastFinishableState; + } + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 90ad923..d796b24 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -178,7 +178,13 @@ void queryComplete(String queryId, String dagName, long deleteDelay) { */ void registerSourceStateChange(String dagName, String sourceName, SourceStateProto sourceState) { getSourceCompletionMap(dagName).put(sourceName, sourceState); - // TODO HIVE-10758 source completion notifications + QueryInfo queryInfo = queryInfoMap.get(dagName); + if (queryInfo != null) { + queryInfo.sourceStateUpdated(sourceName); + } else { + // Could be null if there's a race between the threads processing requests, with a + // dag finish processed earlier. + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 18daa75..3501677 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -19,6 +19,8 @@ import java.util.Comparator; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; @@ -29,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; import org.slf4j.Logger; @@ -62,15 +65,15 @@ * new tasks. Shutting down of the task executor service can be done gracefully or immediately. */ public class TaskExecutorService implements Scheduler { + + // KKK Move the queues to work off the TaskWrapper + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); private static final String TASK_EXECUTOR_THREAD_NAME_FORMAT = "Task-Executor-%d"; private static final String WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT = "Wait-Queue-Scheduler-%d"; - // some object to lock upon. Used by task scheduler to notify wait queue scheduler of new items - // to wait queue - private final Object waitLock; // Thread pool for actual execution of work. private final ListeningExecutorService executorService; private final EvictingPriorityBlockingQueue waitQueue; @@ -83,8 +86,10 @@ private final ThreadPoolExecutor threadPoolExecutor; private final AtomicInteger numSlotsAvailable; + // Tracks known tasks. + private final ConcurrentMap knownTasks = new ConcurrentHashMap<>(); + public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePreemption) { - this.waitLock = new Object(); this.waitQueue = new EvictingPriorityBlockingQueue<>(new WaitQueueComparator(), waitQueueSize); this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size numExecutors, // max pool size @@ -115,6 +120,7 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePr + ", enablePreemption=" + enablePreemption); } + /** * Worker that takes tasks from wait queue and schedule it for execution. */ @@ -125,20 +131,25 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePr public void run() { try { - synchronized (waitLock) { + synchronized (TaskExecutorService.this) { while (waitQueue.isEmpty()) { - waitLock.wait(); + LOG.info("ZZZ: WAIT 1"); + TaskExecutorService.this.wait(); } } // Since schedule() can be called from multiple threads, we peek the wait queue, // try scheduling the task and then remove the task if scheduling is successful. // This will make sure the task's place in the wait queue is held until it gets scheduled. - while ((task = waitQueue.peek()) != null) { + while (true) { + LOG.info("ZZZ: waitQueue {} , numAvailableSlots={}", waitQueue, numSlotsAvailable.get()); + synchronized (TaskExecutorService.this) { + task = waitQueue.peek(); + if (task == null) { + break; + } // if the task cannot finish and if no slots are available then don't schedule it. - // TODO: Event notifications that change canFinish state should notify waitLock - synchronized (waitLock) { boolean shouldWait = false; if (task.canFinish()) { if (numSlotsAvailable.get() == 0 && preemptionQueue.isEmpty()) { @@ -152,7 +163,8 @@ public void run() { if (shouldWait) { // Another task at a higher priority may have come in during the wait. Lookup the // queue again to pick up the task at the highest priority. - waitLock.wait(); + LOG.info("ZZZ: WAIT 2"); + TaskExecutorService.this.wait(); continue; } } @@ -161,12 +173,15 @@ public void run() { if (scheduled) { // wait queue could have been re-ordered in the mean time because of concurrent task // submission. So remove the specific task instead of the head task. - waitQueue.remove(task); + synchronized (TaskExecutorService.this) { + waitQueue.remove(task); + } } - synchronized (waitLock) { + synchronized (TaskExecutorService.this) { while (waitQueue.isEmpty()) { - waitLock.wait(); + LOG.info("ZZZ: WAIT 3"); + TaskExecutorService.this.wait(); } } } @@ -194,7 +209,17 @@ public void onFailure(Throwable t) { @Override public void schedule(TaskRunnerCallable task) throws RejectedExecutionException { - TaskRunnerCallable evictedTask = waitQueue.offer(task); + TaskWrapper taskWrapper = new TaskWrapper(task); + knownTasks.put(task, taskWrapper); + TaskRunnerCallable evictedTask; + try { + synchronized (this) { + evictedTask = waitQueue.offer(task); + } + } catch (RejectedExecutionException e) { + knownTasks.remove(taskWrapper); + throw e; + } if (evictedTask == null) { if (isInfoEnabled) { LOG.info(task.getRequestId() + " added to wait queue."); @@ -203,10 +228,12 @@ public void schedule(TaskRunnerCallable task) throws RejectedExecutionException if (isDebugEnabled) { LOG.debug("Wait Queue: {}", waitQueue); } - synchronized (waitLock) { - waitLock.notify(); + synchronized (this) { + notify(); } } else { + TaskWrapper evictedWrapper = knownTasks.remove(evictedTask); + evictedWrapper.maybeUnregisterForFinishedStateNotifications(); evictedTask.killTask(); if (isInfoEnabled) { LOG.info(task.getRequestId() + " evicted from wait queue because of low priority"); @@ -217,30 +244,37 @@ public void schedule(TaskRunnerCallable task) throws RejectedExecutionException private boolean trySchedule(final TaskRunnerCallable task) { boolean scheduled = false; + TaskWrapper taskWrapper = knownTasks.get(task); try { - ListenableFuture future = executorService.submit(task); - FutureCallback wrappedCallback = new InternalCompletionListener(task); - // Callback on a separate thread so that when a task completes, the thread in the main queue - // is actually available for execution and will not potentially result in a RejectedExecution - Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService); + synchronized (taskWrapper) { + boolean canFinish = task.canFinish(); + boolean stateChanged = taskWrapper.registerForFinishedStateNotifications(canFinish); + ListenableFuture future = executorService.submit(task); + taskWrapper.setIsQueued(false); + FutureCallback wrappedCallback = new InternalCompletionListener(task); + // Callback on a separate thread so that when a task completes, the thread in the main queue + // is actually available for execution and will not potentially result in a RejectedExecution + Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService); - if (isInfoEnabled) { - LOG.info(task.getRequestId() + " scheduled for execution."); - } - - // only tasks that cannot finish immediately are pre-emptable. In other words, if all inputs - // to the tasks are not ready yet, the task is eligible for pre-emptable. - if (enablePreemption && !task.canFinish()) { if (isInfoEnabled) { - LOG.info(task.getRequestId() + " is not finishable. Adding it to pre-emption queue."); + LOG.info("{} scheduled for execution. canFinish={}", task.getRequestId(), canFinish); } - preemptionQueue.add(task); - } + // only tasks that cannot finish immediately are pre-emptable. In other words, if all inputs + // to the tasks are not ready yet, the task is eligible for pre-emptable. + if (enablePreemption) { + if (!canFinish && !stateChanged) { + if (isInfoEnabled) { + LOG.info(task.getRequestId() + " is not finishable. Adding it to pre-emption queue."); + } + taskWrapper.setIsPreemptable(true); + preemptionQueue.add(task); + } + } + } numSlotsAvailable.decrementAndGet(); scheduled = true; } catch (RejectedExecutionException e) { - if (enablePreemption && task.canFinish() && !preemptionQueue.isEmpty()) { if (isDebugEnabled) { @@ -248,17 +282,24 @@ private boolean trySchedule(final TaskRunnerCallable task) { } TaskRunnerCallable pRequest = preemptionQueue.remove(); + // Avoid preempting tasks which are finishable - callback still to be processed. + // synchronization not required - with the canFinish check in place which always gets the + // latest known status. if (pRequest != null) { - - if (isInfoEnabled) { - LOG.info("Invoking kill task for {} due to pre-emption to run {}", - pRequest.getRequestId(), task.getRequestId()); + if (pRequest.canFinish()) { + LOG.info( + "Removed {} from preemption queue, but not preempting since it's now finishable", + pRequest.getRequest()); + } else { + if (isInfoEnabled) { + LOG.info("Invoking kill task for {} due to pre-emption to run {}", + pRequest.getRequestId(), task.getRequestId()); + } + // The task will either be killed or is already in the process of completing, which will + // trigger the next scheduling run, or result in available slots being higher than 0, + // which will cause the scheduler loop to continue. + pRequest.killTask(); } - - // The task will either be killed or is already in the process of completing, which will - // trigger the next scheduling run, or result in available slots being higher than 0, - // which will cause the scheduler loop to continue. - pRequest.killTask(); } } } @@ -266,6 +307,24 @@ private boolean trySchedule(final TaskRunnerCallable task) { return scheduled; } + private synchronized void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) { + if (taskWrapper.isQueued()) { + // Re-order the wait queue + LOG.info("ZZZ: Rejigging waitQueue for {}", taskWrapper.getTaskRunnerCallable().getRequestId()); + waitQueue.remove(taskWrapper.getTaskRunnerCallable()); + waitQueue.offer(taskWrapper.getTaskRunnerCallable()); + } + + if (newFinishableState == true && taskWrapper.isPreemptable()) { + LOG.info("ZZZ: Removing from preemptionQueue {}", taskWrapper.getTaskRunnerCallable().getRequestId()); + preemptionQueue.remove(taskWrapper.getTaskRunnerCallable()); + } else if (newFinishableState == false && !taskWrapper.isPreemptable()) { + LOG.info("ZZZ: Adding to preemptionQueue {}", taskWrapper.getTaskRunnerCallable().getRequestId()); + preemptionQueue.offer(taskWrapper.getTaskRunnerCallable()); + } + notify(); + } + private final class InternalCompletionListener implements FutureCallback { private TaskRunnerCallable task; @@ -276,12 +335,16 @@ public InternalCompletionListener(TaskRunnerCallable task) { @Override public void onSuccess(TaskRunner2Result result) { + TaskWrapper taskWrapper = knownTasks.remove(task); + taskWrapper.maybeUnregisterForFinishedStateNotifications(); task.getCallback().onSuccess(result); updatePreemptionListAndNotify(result.getEndReason()); } @Override public void onFailure(Throwable t) { + TaskWrapper taskWrapper = knownTasks.remove(task); + taskWrapper.maybeUnregisterForFinishedStateNotifications(); task.getCallback().onFailure(t); updatePreemptionListAndNotify(null); LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t)); @@ -291,17 +354,17 @@ private void updatePreemptionListAndNotify(EndReason reason) { // if this task was added to pre-emption list, remove it if (enablePreemption) { String state = reason == null ? "FAILED" : reason.name(); - preemptionQueue.remove(task.getRequest()); - if (isInfoEnabled) { + boolean removed = preemptionQueue.remove(task.getRequest()); + if (removed && isInfoEnabled) { LOG.info(TaskRunnerCallable.getTaskIdentifierString(task.getRequest()) + " request " + state + "! Removed from preemption list."); } } numSlotsAvailable.incrementAndGet(); - if (!waitQueue.isEmpty()) { - synchronized (waitLock) { - waitLock.notify(); + synchronized (TaskExecutorService.this) { + if (!waitQueue.isEmpty()) { + TaskExecutorService.this.notify(); } } } @@ -380,4 +443,74 @@ public int compare(TaskRunnerCallable o1, TaskRunnerCallable o2) { return 0; } } + + + private class TaskWrapper implements FinishableStateUpdateHandler { + private final TaskRunnerCallable taskRunnerCallable; + private boolean isQueued = true; + private boolean isPreemptable = false; + private boolean subscribedForNotifications = false; + + public TaskWrapper(TaskRunnerCallable taskRunnerCallable) { + this.taskRunnerCallable = taskRunnerCallable; + } + + /** + * + * @param currentFinishableState + * @return true if the current state is the same as the currentFinishableState. false if the state has already changed. + */ + public synchronized boolean registerForFinishedStateNotifications(boolean currentFinishableState) { + if (!subscribedForNotifications) { + subscribedForNotifications = true; + return taskRunnerCallable.getFragmentInfo() + .registerForFinishableStateUpdates(this, currentFinishableState); + } else { + return true; + } + } + + public synchronized void maybeUnregisterForFinishedStateNotifications() { + if (subscribedForNotifications) { + subscribedForNotifications = false; + taskRunnerCallable.getFragmentInfo().unregisterForFinishableStateUpdates(this); + } + } + + public TaskRunnerCallable getTaskRunnerCallable() { + return taskRunnerCallable; + } + + public synchronized boolean isQueued() { + return isQueued; + } + + public synchronized boolean isPreemptable() { + return isPreemptable; + } + + public synchronized void setIsQueued(boolean isQueued) { + this.isQueued = isQueued; + } + + public synchronized void setIsPreemptable(boolean isPreemptable) { + this.isPreemptable = isPreemptable; + } + + @Override + public String toString() { + return "TaskWrapper{" + + "taskRunnerCallable=" + taskRunnerCallable + + ", isQueued=" + isQueued + + ", isPreemptable=" + isPreemptable + + ", subscribedForNotifications=" + subscribedForNotifications + + '}'; + } + + @Override + public synchronized void finishableStateUpdated(boolean finishableState) { + LOG.info("ZZZ: Received finishableStateUpdate for {} as {}", taskRunnerCallable.getRequestId(), finishableState); + TaskExecutorService.this.finishableStateUpdated(this, finishableState); + } + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 2ea39b7..007c83d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -324,6 +324,10 @@ public String getRequestId() { return requestId; } + public QueryFragmentInfo getFragmentInfo() { + return fragmentInfo; + } + public TaskRunnerCallback getCallback() { return new TaskRunnerCallback(request, this); }