diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index dc10f22bf9..21ff31d40d 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -75,6 +75,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -135,6 +136,8 @@ private final ConcurrentMap knownNodeMap = new ConcurrentHashMap<>(); private final ConcurrentMap pingedNodeMap = new ConcurrentHashMap<>(); + private final ConcurrentMap attemptStartSent = new ConcurrentHashMap<>(); + private final LlapRegistryService serviceRegistry; private volatile QueryIdentifierProto currentQueryIdentifierProto; @@ -457,7 +460,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task // Have to register this up front right now. Otherwise, it's possible for the task to start // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them. - getContext().taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); + getContext().taskSubmitted(taskSpec.getTaskAttemptID(), containerId); communicator.sendSubmitWork(requestProto, host, port, new LlapProtocolClientProxy.ExecuteRequestCallback() { @Override @@ -574,6 +577,13 @@ public void indicateError(Throwable t) { } } + private void maybeSendFragmentStart(TezTaskAttemptID fragmentId) { + if (fragmentId != null) { + if (attemptStartSent.putIfAbsent(fragmentId, Boolean.TRUE) == null) { + getContext().taskStartedRemotely(fragmentId); + } + } + } @@ -816,6 +826,7 @@ private void resetCurrentDag(int newDagId, String hiveQueryId) { currentHiveQueryId = hiveQueryId; sourceStateTracker.resetState(currentQueryIdentifierProto); nodesForQuery.clear(); + attemptStartSent.clear(); LOG.info("CurrentDagId set to: " + newDagId + ", name=" + getContext().getCurrentDagInfo().getName() + ", queryId=" + hiveQueryId); // TODO Is it possible for heartbeats to come in from lost tasks - those should be told to die, which @@ -884,6 +895,7 @@ public boolean canCommit(TezTaskAttemptID taskid) throws IOException { @Override public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, TezException { + maybeSendFragmentStart(request.getCurrentTaskAttemptID()); return tezUmbilical.heartbeat(request); } diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index cdf767f1db..8602b88467 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -131,14 +131,9 @@ public class LlapTaskSchedulerService extends TaskScheduler { private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class); private static final Logger WM_LOG = LoggerFactory.getLogger("GuaranteedTasks"); - private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator(); + private static final TaskAssignTimeComparator TASK_INFO_COMPARATOR = new TaskAssignTimeComparator(); - private final static Comparator PRIORITY_COMPARATOR = new Comparator() { - @Override - public int compare(Priority o1, Priority o2) { - return o1.getPriority() - o2.getPriority(); - } - }; + private final static Comparator PRIORITY_COMPARATOR = Comparator.comparingInt(Priority::getPriority); private final UpdateOperationCallback UPDATE_CALLBACK = new UpdateOperationCallback(); private final class UpdateOperationCallback implements OperationCallback { @@ -542,7 +537,7 @@ public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock } private static String serializeToken(Token token) { - byte[] bytes = null; + byte[] bytes; try { ByteArrayDataOutput out = ByteStreams.newDataOutput(); token.write(out); @@ -628,7 +623,7 @@ void updateGuaranteedCount(int newTotalGuaranteed) { @VisibleForTesting protected void checkAndSendGuaranteedStateUpdate(TaskInfo ti) { - boolean newState = false; + boolean newState; synchronized (ti) { assert ti.isPendingUpdate; if ((ti.lastSetGuaranteed != null && ti.lastSetGuaranteed == ti.isGuaranteed) @@ -748,7 +743,7 @@ protected void handleUpdateResult(TaskInfo ti, boolean isOk) { } // Now try to pick another task to update - or potentially the same task. - int count = 0; + int count; if (newStateAnyTask) { count = distributeGuaranteed(1, ti, toUpdate); } else { @@ -776,17 +771,14 @@ public void start() throws IOException { } writeLock.lock(); try { - scheduledLoggingExecutor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - readLock.lock(); - try { - if (dagRunning) { - LOG.info("Stats for current dag: {}", dagStats); - } - } finally { - readLock.unlock(); + scheduledLoggingExecutor.scheduleAtFixedRate(() -> { + readLock.lock(); + try { + if (dagRunning) { + LOG.info("Stats for current dag: {}", dagStats); } + } finally { + readLock.unlock(); } }, 0, 10000L, TimeUnit.MILLISECONDS); @@ -867,8 +859,7 @@ private void startTimeoutMonitor() { // If timer is null, start a new one. // If timer has completed during previous invocation, start a new one. // If timer already started and is not completed, leaving it running without resetting it. - if ((timeoutFuture == null || (timeoutFuture != null && timeoutFuture.isDone())) - && activeInstances.size() == 0) { + if ((timeoutFuture == null || timeoutFuture.isDone()) && activeInstances.size() == 0) { timeoutFuture = timeoutExecutor.schedule(timeoutMonitor, timeout, TimeUnit.MILLISECONDS); timeoutFutureRef.set(timeoutFuture); LOG.info("Scheduled timeout monitor task to run after {} ms", timeout); @@ -929,9 +920,7 @@ public void shutdown() { } schedulerExecutor.shutdownNow(); - if (registry != null) { - registry.stop(); - } + registry.stop(); if (amRegistry != null) { amRegistry.stop(); } @@ -1043,11 +1032,7 @@ public void dagComplete() { int runningCount = 0; // We don't send messages to pending tasks with the flags; they should be killed elsewhere. for (Entry> entry : guaranteedTasks.entrySet()) { - TreeSet set = speculativeTasks.get(entry.getKey()); - if (set == null) { - set = new TreeSet<>(); - speculativeTasks.put(entry.getKey(), set); - } + TreeSet set = speculativeTasks.computeIfAbsent(entry.getKey(), k -> new TreeSet<>()); for (TaskInfo info : entry.getValue()) { synchronized (info) { info.isGuaranteed = false; @@ -1205,7 +1190,7 @@ public boolean deallocateTask( // Do not put the unused duck back; we'd run the tasks below, then assign it by priority. // NOTE: this method MUST call distributeGuaranteedOnTaskCompletion before exiting. if (taskInfo.containerId == null) { - if (taskInfo.getState() == TaskInfo.State.ASSIGNED) { + if (taskInfo.isAssigned()) { LOG.error("Task: " + task + " assigned, but could not find the corresponding containerId." @@ -1273,8 +1258,7 @@ public boolean deallocateTask( " this was a temporary communication failure", task, nodeInfo.toShortString()); } - boolean commFailure = - endReason != null && endReason == TaskAttemptEndReason.COMMUNICATION_ERROR; + boolean commFailure = endReason == TaskAttemptEndReason.COMMUNICATION_ERROR; disableNode(nodeInfo, commFailure); } } @@ -1300,7 +1284,7 @@ public boolean deallocateTask( } public void notifyStarted(TezTaskAttemptID attemptId) { - TaskInfo info = null; + TaskInfo info; writeLock.lock(); try { info = tasksById.get(attemptId); @@ -1357,6 +1341,19 @@ private TaskInfo distributeGuaranteedOnTaskCompletion() { return toUpdate.get(0); } + @Override + public void taskStateUpdated(Object task, SchedulerTaskState state) { + if (state == SchedulerTaskState.STARTED) { + writeLock.lock(); + try { + TaskInfo taskInfo = knownTasks.get(task); + taskInfo.setRunning(clock.getTime()); + } finally { + writeLock.unlock(); + } + } + } + @Override public Object deallocateContainer(ContainerId containerId) { if (LOG.isDebugEnabled()) { @@ -1381,7 +1378,7 @@ public boolean hasUnregistered() { /** * @param request the list of preferred hosts. null implies any host - * @return + * @return Selected Host Results. */ private SelectHostResult selectHost(TaskInfo request) { String[] requestedHosts = request.requestedHosts; @@ -1667,11 +1664,7 @@ private static NodeReport constructNodeReport(LlapServiceInstance serviceInstanc private void addPendingTask(TaskInfo taskInfo) { writeLock.lock(); try { - List tasksAtPriority = pendingTasks.get(taskInfo.priority); - if (tasksAtPriority == null) { - tasksAtPriority = new LinkedList<>(); - pendingTasks.put(taskInfo.priority, tasksAtPriority); - } + List tasksAtPriority = pendingTasks.computeIfAbsent(taskInfo.priority, k -> new LinkedList<>()); // Delayed tasks will not kick in right now. That will happen in the scheduling loop. tasksAtPriority.add(taskInfo); knownTasks.putIfAbsent(taskInfo.task, taskInfo); @@ -1702,9 +1695,9 @@ private void removePendingTask(TaskInfo taskInfo) { } } - /* Register a running task into the runningTasks structure */ + /* Register a running task into the assignedTasks structure */ @VisibleForTesting - protected void registerRunningTask(TaskInfo taskInfo) { + protected void registerTaskAssignment(TaskInfo taskInfo) { boolean isGuaranteed = false; synchronized (taskInfo) { assert !taskInfo.isPendingUpdate; @@ -1745,11 +1738,11 @@ private TaskInfo unregisterTask(Object task) { if (taskInfo != null) { tasksById.remove(taskInfo.attemptId); WM_LOG.info("Unregistering " + taskInfo.attemptId + "; " + taskInfo.isGuaranteed); - if (taskInfo.getState() == TaskInfo.State.ASSIGNED) { + if (taskInfo.isAssigned()) { // Remove from the running list. if (!removeFromRunningTaskMap(speculativeTasks, task, taskInfo) && !removeFromRunningTaskMap(guaranteedTasks, task, taskInfo)) { - Preconditions.checkState(false, "runningTasks should contain an entry if the task" + + Preconditions.checkState(false, "assignedTasks should contain an entry if the task" + " was in running state. Caused by task: {}", task); } } @@ -1763,24 +1756,22 @@ private TaskInfo unregisterTask(Object task) { } private static void addToRunningTasksMap( - TreeMap> runningTasks, TaskInfo taskInfo) { + TreeMap> assignedTasks, TaskInfo taskInfo) { int priority = taskInfo.priority.getPriority(); - TreeSet tasksAtpriority = runningTasks.get(priority); - if (tasksAtpriority == null) { - tasksAtpriority = new TreeSet<>(TASK_INFO_COMPARATOR); - runningTasks.put(priority, tasksAtpriority); - } + TreeSet + tasksAtpriority = + assignedTasks.computeIfAbsent(priority, k -> new TreeSet<>(TASK_INFO_COMPARATOR)); tasksAtpriority.add(taskInfo); } - private static boolean removeFromRunningTaskMap(TreeMap> runningTasks, + private static boolean removeFromRunningTaskMap(TreeMap> assignedTasks, Object task, TaskInfo taskInfo) { int priority = taskInfo.priority.getPriority(); - Set tasksAtPriority = runningTasks.get(priority); + Set tasksAtPriority = assignedTasks.get(priority); if (tasksAtPriority == null) return false; boolean result = tasksAtPriority.remove(taskInfo); if (tasksAtPriority.isEmpty()) { - runningTasks.remove(priority); + assignedTasks.remove(priority); } return result; } @@ -2026,7 +2017,7 @@ private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource, dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, nodeInfo.getHost()); taskInfo.setAssignmentInfo(nodeInfo, container.getId(), clock.getTime()); - registerRunningTask(taskInfo); + registerTaskAssignment(taskInfo); nodeInfo.registerTaskScheduled(); } finally { writeLock.unlock(); @@ -2070,10 +2061,10 @@ private void preemptTasks( // preempted task. } - private List preemptTasksFromMap(TreeMap> runningTasks, + private List preemptTasksFromMap(TreeMap> assignedTasks, int forPriority, int forVertex, int numTasksToPreempt, String[] potentialHosts, Set preemptHosts, List preemptedTaskList) { - NavigableMap> orderedMap = runningTasks.descendingMap(); + NavigableMap> orderedMap = assignedTasks.descendingMap(); Iterator>> iterator = orderedMap.entrySet().iterator(); int preemptedCount = 0; while (iterator.hasNext() && preemptedCount < numTasksToPreempt) { @@ -2085,6 +2076,9 @@ private void preemptTasks( Iterator taskInfoIterator = entryAtPriority.getValue().iterator(); while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) { TaskInfo taskInfo = taskInfoIterator.next(); + if (!taskInfo.isRunning()) { + continue; + } if (preemptHosts != null && !preemptHosts.contains(taskInfo.assignedNode.getHost())) { continue; // Not the right host. } @@ -2343,7 +2337,7 @@ public void processEvictedTask(TaskInfo taskInfo) { } public boolean shouldScheduleTask(TaskInfo taskInfo) { - return taskInfo.getState() == TaskInfo.State.PENDING; + return taskInfo.isPending(); } } @@ -2490,7 +2484,6 @@ public void shutdown() { // Indicates whether a node is disabled - for whatever reason - commFailure, busy, etc. private boolean disabled = false; - private int numPreemptedTasks = 0; private int numScheduledTasks = 0; private final int numSchedulableTasks; private final LlapTaskSchedulerMetrics metrics; @@ -2625,7 +2618,6 @@ void registerUnsuccessfulTaskEnd(boolean wasPreempted) { metrics.incrSchedulableTasksCount(); } if (wasPreempted) { - numPreemptedTasks++; if (metrics != null) { metrics.incrPreemptedTasksCount(); } @@ -2826,7 +2818,7 @@ private void _registerAllocationInHostMap(String host, Map { + private static class TaskAssignTimeComparator implements Comparator { @Override public int compare(TaskInfo o1, TaskInfo o2) { - if (o1.startTime > o2.startTime) { + if (o1.assignedTime > o2.assignedTime) { return -1; - } else if (o1.startTime < o2.startTime) { + } else if (o1.assignedTime < o2.assignedTime) { return 1; } else { // Comparing on time is not sufficient since two may be created at the same time, diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 46007559cd..ae35a2d11d 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -66,6 +66,7 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -600,6 +601,67 @@ public void testSimpleNoLocalityAllocation() throws IOException, InterruptedExce } } + @Test(timeout = 10000) + public void testPreemption_StartedAttemptsOnly() throws InterruptedException, IOException { + Priority priority1 = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + Priority priority3 = Priority.newInstance(3); + String [] hosts = new String[] {HOST1}; + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 2, 0); + try { + + TezTaskAttemptID task1 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie1 = "cookie1"; + TezTaskAttemptID task2 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie2 = "cookie2"; + TezTaskAttemptID task3 = TestTaskSchedulerServiceWrapper.generateTaskAttemptId(); + Object clientCookie3 = "cookie3"; + + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1); + tsWrapper.allocateTask(task2, hosts, priority3, clientCookie2); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numLocalAllocations == 2) { + break; + } + } + ArgumentCaptor taskCapture = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(taskCapture.capture(), any(Object.class), containerCaptor.capture()); + assertEquals(task1, taskCapture.getAllValues().get(0)); + assertEquals(task2, taskCapture.getAllValues().get(1)); + + verify(tsWrapper.mockAppCallback, times(2)).taskAllocated(any(Object.class), + any(Object.class), any(Container.class)); + assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + + // Now start the higher priority task ("nice" priority levels) + tsWrapper.taskStarted(task1); + + reset(tsWrapper.mockAppCallback); + + tsWrapper.allocateTask(task3, hosts, priority1, clientCookie3); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) { + break; + } + } + + ArgumentCaptor containerIdArgumentCaptor = ArgumentCaptor.forClass(ContainerId.class); + // Verify that task1 (higher priority than task2) got preempted, since task2 had not yet started. + verify(tsWrapper.mockAppCallback).preemptContainer(containerIdArgumentCaptor.capture()); + assertEquals(containerCaptor.getAllValues().get(0).getId(), containerIdArgumentCaptor.getValue()); + + } finally { + tsWrapper.shutdown(); + } + } @Test(timeout = 10000) public void testPreemption() throws InterruptedException, IOException { @@ -635,6 +697,9 @@ public void testPreemption() throws InterruptedException, IOException { assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations); assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + tsWrapper.taskStarted(task1); + tsWrapper.taskStarted(task2); + reset(tsWrapper.mockAppCallback); tsWrapper.allocateTask(task4, hosts, priority1, clientCookie4); @@ -1163,6 +1228,8 @@ public void testForcedLocalityPreemption() throws IOException, InterruptedExcept assertEquals(2, argumentCaptor.getAllValues().size()); assertEquals(task1, argumentCaptor.getAllValues().get(0)); assertEquals(task2, argumentCaptor.getAllValues().get(1)); + tsWrapper.taskStarted(task1); + tsWrapper.taskStarted(task2); reset(tsWrapper.mockAppCallback); // Allocate t4 at higher priority. t3 should not be allocated, @@ -1223,6 +1290,8 @@ public void testPreemptionChoiceTimeOrdering() throws IOException, InterruptedEx .taskAllocated(argumentCaptor.capture(), any(Object.class), cArgCaptor.capture()); ContainerId t1Cid = cArgCaptor.getValue().getId(); + tsWrapper.taskStarted(task1); + reset(tsWrapper.mockAppCallback); // Move clock backwards (so that t1 allocation is after t2 allocation) // Request task2 (task1 already started at previously set time) @@ -1232,6 +1301,7 @@ public void testPreemptionChoiceTimeOrdering() throws IOException, InterruptedEx verify(tsWrapper.mockAppCallback, times(1)) .taskAllocated(argumentCaptor.capture(), any(Object.class), cArgCaptor.capture()); + tsWrapper.taskStarted(task2); reset(tsWrapper.mockAppCallback); // Move clock forward, and request a task at p=1 @@ -1302,6 +1372,9 @@ public void testForcedLocalityMultiplePreemptionsSameHost1() throws IOException, assertEquals(2, cArgCaptor.getAllValues().size()); ContainerId t1CId = cArgCaptor.getAllValues().get(0).getId(); + tsWrapper.taskStarted(task1); + tsWrapper.taskStarted(task2); + reset(tsWrapper.mockAppCallback); // At this point. 2 tasks running - both at priority 2. // Try running a priority 1 task @@ -1405,6 +1478,9 @@ public void testForcedLocalityMultiplePreemptionsSameHost2() throws IOException, assertEquals(2, cArgCaptor.getAllValues().size()); ContainerId t1CId = cArgCaptor.getAllValues().get(0).getId(); + tsWrapper.taskStarted(task1); + tsWrapper.taskStarted(task2); + reset(tsWrapper.mockAppCallback); // At this point. 2 tasks running - both at priority 2. // Try running a priority 1 task @@ -2073,6 +2149,9 @@ public static TezTaskAttemptID generateTaskAttemptId() { return TezTaskAttemptID.getInstance(TezTaskID.getInstance(VERTEX_ID, taskId), 0); } + void taskStarted(Object task) { + ts.taskStateUpdated(task, TaskScheduler.SchedulerTaskState.STARTED); + } void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReason) { ts.deallocateTask(task, succeeded, endReason, null); @@ -2156,12 +2235,6 @@ public LlapTaskSchedulerServiceForTest( super(appClient, clock, false); } - @Override - protected void registerRunningTask(TaskInfo taskInfo) { - super.registerRunningTask(taskInfo); - notifyStarted(taskInfo.getAttemptId()); // Do this here; normally communicator does this. - } - @Override protected void checkAndSendGuaranteedStateUpdate(TaskInfo ti) { // A test-specific delay just before the check happens.