diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 36a2d6bf49..d81ea5870c 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -395,7 +395,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task Map additionalResources, Credentials credentials, boolean credentialsChanged, - int priority) { + int priority, Object taskSchedulingInfo) { super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority); int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId(); @@ -449,8 +449,10 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task } SubmitWorkRequestProto requestProto; + + boolean isGuaranteed = ((LlapTaskSchedulingInfo)taskSchedulingInfo).isInitialGuaranteed(); try { - requestProto = constructSubmitWorkRequest(containerId, taskSpec, fragmentRuntimeInfo, currentHiveQueryId); + requestProto = constructSubmitWorkRequest(containerId, taskSpec, fragmentRuntimeInfo, currentHiveQueryId, isGuaranteed); } catch (IOException e) { throw new RuntimeException("Failed to construct request", e); } @@ -839,7 +841,7 @@ private String extractQueryIdFromContext() { private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId, TaskSpec taskSpec, FragmentRuntimeInfo fragmentRuntimeInfo, - String hiveQueryId) throws + String hiveQueryId, boolean isGuaranteed) throws IOException { SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder(); builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId()); @@ -858,10 +860,7 @@ private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerI taskSpec, currentQueryIdentifierProto, getTokenIdentifier(), user, hiveQueryId)).build()); // Don't call builder.setWorkSpecSignature() - Tez doesn't sign fragments builder.setFragmentRuntimeInfo(fragmentRuntimeInfo); - if (scheduler != null) { // May be null in tests - // TODO: see javadoc - builder.setIsGuaranteed(scheduler.isInitialGuaranteed(taskSpec.getTaskAttemptID())); - } + builder.setIsGuaranteed(isGuaranteed); return builder.build(); } diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 9cb8bc9449..24c045eb2c 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -45,6 +45,7 @@ import java.util.NavigableMap; import java.util.Random; import java.util.Set; +import java.util.Stack; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.Callable; @@ -130,7 +131,10 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import javax.annotation.Nullable; + public class LlapTaskSchedulerService extends TaskScheduler { + private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class); private static final Logger WM_LOG = LoggerFactory.getLogger("GuaranteedTasks"); private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator(); @@ -1331,32 +1335,6 @@ public void notifyStarted(TezTaskAttemptID attemptId) { handleUpdateResult(info, true); } - /** - * A hacky way for communicator and scheduler to share per-task info. Scheduler should be able - * to include this with task allocation to be passed to the communicator, instead. TEZ-3866. - * @param attemptId Task attempt ID. - * @return The initial value of the guaranteed flag to send with the task. - */ - boolean isInitialGuaranteed(TezTaskAttemptID attemptId) { - TaskInfo info = null; - readLock.lock(); - try { - info = tasksById.get(attemptId); - } finally { - readLock.unlock(); - } - if (info == null) { - WM_LOG.warn("Status requested for an unknown task " + attemptId); - return false; - } - synchronized (info) { - if (info.isGuaranteed == null) return false; // TODO: should never happen? - assert info.lastSetGuaranteed == null; - info.requestedValue = info.isGuaranteed; - return info.isGuaranteed; - } - } - // Must be called under the epic lock. private TaskInfo distributeGuaranteedOnTaskCompletion() { List toUpdate = new ArrayList<>(1); @@ -1725,9 +1703,13 @@ private void removePendingTask(TaskInfo taskInfo) { } } - /* Register a running task into the runningTasks structure */ + /** + * Register a task to running tasks structure + * @param taskInfo + * @return if the initial task is guaranteed + */ @VisibleForTesting - protected void registerRunningTask(TaskInfo taskInfo) { + protected boolean registerRunningTask(TaskInfo taskInfo) { boolean isGuaranteed = false; synchronized (taskInfo) { assert !taskInfo.isPendingUpdate; @@ -1749,6 +1731,7 @@ protected void registerRunningTask(TaskInfo taskInfo) { if (metrics != null) { metrics.decrPendingTasksCount(); } + return isGuaranteed; } finally { writeLock.unlock(); } @@ -2040,6 +2023,7 @@ private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource, nodeInfo.getServiceAddress()); writeLock.lock(); // While updating local structures // Note: this is actually called under the epic writeLock in schedulePendingTasks + boolean isGuaranteed; try { // The canAccept part of this log message does not account for this allocation. assignedTaskCounter.incrementAndGet(); @@ -2049,12 +2033,12 @@ private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource, dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, nodeInfo.getHost()); taskInfo.setAssignmentInfo(nodeInfo, container.getId(), clock.getTime()); - registerRunningTask(taskInfo); + isGuaranteed = registerRunningTask(taskInfo); nodeInfo.registerTaskScheduled(); } finally { writeLock.unlock(); } - getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container); + getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container, new LlapTaskSchedulingInfo(isGuaranteed)); return selectHostResult.scheduleResult; } diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulingInfo.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulingInfo.java new file mode 100644 index 0000000000..c633a47eac --- /dev/null +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulingInfo.java @@ -0,0 +1,18 @@ +package org.apache.hadoop.hive.llap.tezplugins; + +/** + * This class is used to pass information from LLAP scheduler to + * LLAP communicator. + */ +public class LlapTaskSchedulingInfo { + + private final boolean initialGuaranteed; + + public LlapTaskSchedulingInfo(boolean initialGuaranteed) { + this.initialGuaranteed = initialGuaranteed; + } + + public boolean isInitialGuaranteed() { + return initialGuaranteed; + } +} diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 707012399b..109f3cae2b 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -2157,9 +2157,10 @@ public LlapTaskSchedulerServiceForTest( } @Override - protected void registerRunningTask(TaskInfo taskInfo) { - super.registerRunningTask(taskInfo); + protected boolean registerRunningTask(TaskInfo taskInfo) { + boolean isGuaranteed = super.registerRunningTask(taskInfo); notifyStarted(taskInfo.getAttemptId()); // Do this here; normally communicator does this. + return isGuaranteed; } @Override