diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 18ce03c..dfba118 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -67,6 +67,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -127,6 +128,8 @@ private final ConcurrentMap knownNodeMap = new ConcurrentHashMap<>(); private final ConcurrentMap pingedNodeMap = new ConcurrentHashMap<>(); + private final ConcurrentMap attemptStartSent = new ConcurrentHashMap<>(); + private final LlapRegistryService serviceRegistry; private volatile QueryIdentifierProto currentQueryIdentifierProto; @@ -338,7 +341,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task // Have to register this up front right now. Otherwise, it's possible for the task to start // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them. - getContext().taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); + getContext().taskSubmitted(taskSpec.getTaskAttemptID(), containerId); communicator.sendSubmitWork(requestProto, host, port, new LlapProtocolClientProxy.ExecuteRequestCallback() { @Override @@ -659,6 +662,7 @@ void nodePinged(String hostname, String uniqueId, int port, TezAttemptArray task // Also, we prefer a missed heartbeat over a stuck query in case of discrepancy in ET. if (taskNodeId != null && taskNodeId.equals(uniqueId)) { if (attempts.contains(attemptId)) { + maybeSendFragmentStart(entry.getValue()); getContext().taskAlive(entry.getValue()); } else { error += (attemptId + ", "); @@ -689,6 +693,7 @@ private void resetCurrentDag(int newDagId, String hiveQueryId) { currentHiveQueryId = hiveQueryId; sourceStateTracker.resetState(currentQueryIdentifierProto); nodesForQuery.clear(); + attemptStartSent.clear(); LOG.info("CurrentDagId set to: " + newDagId + ", name=" + getContext().getCurrentDagInfo().getName() + ", queryId=" + hiveQueryId); // TODO Is it possible for heartbeats to come in from lost tasks - those should be told to die, which @@ -738,6 +743,13 @@ private ByteBuffer serializeCredentials(Credentials credentials) throws IOExcept return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); } + private void maybeSendFragmentStart(TezTaskAttemptID fragmentId) { + if (fragmentId != null) { + if (attemptStartSent.putIfAbsent(fragmentId, Boolean.TRUE) == null) { + getContext().taskStartedRemotely(fragmentId); + } + } + } protected class LlapTaskUmbilicalProtocolImpl implements LlapTaskUmbilicalProtocol { @@ -756,6 +768,7 @@ public boolean canCommit(TezTaskAttemptID taskid) throws IOException { @Override public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, TezException { + maybeSendFragmentStart(request.getCurrentTaskAttemptID()); return tezUmbilical.heartbeat(request); } diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 6bedccb..93fb66d 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -65,7 +65,6 @@ import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; -import org.apache.hadoop.hive.llap.registry.ServiceRegistry; import org.apache.hadoop.hive.llap.registry.impl.InactiveServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock; @@ -92,6 +91,7 @@ import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; +import org.apache.tez.serviceplugins.api.ServicePluginException; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -102,7 +102,7 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class); - private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator(); + private static final TaskAssignTimeComparator TASK_INFO_COMPARATOR = new TaskAssignTimeComparator(); private final Configuration conf; @@ -129,8 +129,8 @@ public int compare(Priority o1, Priority o2) { // Tracks running and queued tasks. Cleared after a task completes. private final ConcurrentMap knownTasks = new ConcurrentHashMap<>(); - // Tracks tasks which are running. Useful for selecting a task to preempt based on when it started. - private final TreeMap> runningTasks = new TreeMap<>(); + // Tracks tasks which are assigned. Useful for selecting a task to preempt based on when it started. + private final TreeMap> assignedTasks = new TreeMap<>(); // Queue for disabled nodes. Nodes make it out of this queue when their expiration timeout is hit. @@ -554,7 +554,7 @@ public void dagComplete() { } } int runningCount = 0; - for (Entry> entry : runningTasks.entrySet()) { + for (Entry> entry : assignedTasks.entrySet()) { if (entry.getValue() != null) { runningCount += entry.getValue().size(); } @@ -643,7 +643,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd return false; } if (taskInfo.containerId == null) { - if (taskInfo.getState() == TaskInfo.State.ASSIGNED) { + if (taskInfo.isAssigned()) { LOG.error("Task: " + task + " assigned, but could not find the corresponding containerId." @@ -722,6 +722,19 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd } @Override + public void taskStateUpdated(Object task, SchedulerTaskState state) { + if (state == SchedulerTaskState.STARTED) { + writeLock.lock(); + try { + TaskInfo taskInfo = knownTasks.get(task); + taskInfo.setRunning(clock.getTime()); + } finally { + writeLock.unlock(); + } + } + } + + @Override public Object deallocateContainer(ContainerId containerId) { if (LOG.isDebugEnabled()) { LOG.debug("Ignoring deallocateContainer for containerId: {}", @@ -1053,15 +1066,15 @@ private void removePendingTask(TaskInfo taskInfo) { } } - /* Register a running task into the runningTasks structure */ - private void registerRunningTask(TaskInfo taskInfo) { + /* Register a running task into the assignedTasks structure */ + private void registerTaskAssignment(TaskInfo taskInfo) { writeLock.lock(); try { int priority = taskInfo.priority.getPriority(); - TreeSet tasksAtpriority = runningTasks.get(priority); + TreeSet tasksAtpriority = assignedTasks.get(priority); if (tasksAtpriority == null) { tasksAtpriority = new TreeSet<>(TASK_INFO_COMPARATOR); - runningTasks.put(priority, tasksAtpriority); + assignedTasks.put(priority, tasksAtpriority); } tasksAtpriority.add(taskInfo); if (metrics != null) { @@ -1078,15 +1091,15 @@ private TaskInfo unregisterTask(Object task) { try { TaskInfo taskInfo = knownTasks.remove(task); if (taskInfo != null) { - if (taskInfo.getState() == TaskInfo.State.ASSIGNED) { + if (taskInfo.isAssigned()) { // Remove from the running list. int priority = taskInfo.priority.getPriority(); - Set tasksAtPriority = runningTasks.get(priority); + Set tasksAtPriority = assignedTasks.get(priority); Preconditions.checkState(tasksAtPriority != null, - "runningTasks should contain an entry if the task was in running state. Caused by task: {}", task); + "assignedTasks should contain an entry if the task was in running state. Caused by task: {}", task); tasksAtPriority.remove(taskInfo); if (tasksAtPriority.isEmpty()) { - runningTasks.remove(priority); + assignedTasks.remove(priority); } } } else { @@ -1293,7 +1306,7 @@ private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource) { dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks, nodeInfo.getHost()); taskInfo.setAssignmentInfo(nodeInfo, container.getId(), clock.getTime()); - registerRunningTask(taskInfo); + registerTaskAssignment(taskInfo); nodeInfo.registerTaskScheduled(); } finally { writeLock.unlock(); @@ -1311,7 +1324,7 @@ private void preemptTasks(int forPriority, int numTasksToPreempt, String []poten writeLock.lock(); List preemptedTaskList = null; try { - NavigableMap> orderedMap = runningTasks.descendingMap(); + NavigableMap> orderedMap = assignedTasks.descendingMap(); Iterator>> iterator = orderedMap.entrySet().iterator(); int preemptedCount = 0; while (iterator.hasNext() && preemptedCount < numTasksToPreempt) { @@ -1323,6 +1336,9 @@ private void preemptTasks(int forPriority, int numTasksToPreempt, String []poten Iterator taskInfoIterator = entryAtPriority.getValue().iterator(); while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) { TaskInfo taskInfo = taskInfoIterator.next(); + if (!taskInfo.isRunning()) { + continue; + } if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedNode.getHost())) { // Candidate for preemption. preemptedCount++; @@ -1457,7 +1473,7 @@ public void processEvictedTask(TaskInfo taskInfo) { } public boolean shouldScheduleTask(TaskInfo taskInfo) { - return taskInfo.getState() == TaskInfo.State.PENDING; + return taskInfo.isPending(); } } @@ -1929,7 +1945,7 @@ private void _registerAllocationInHostMap(String host, Map { + private static class TaskAssignTimeComparator implements Comparator { @Override public int compare(TaskInfo o1, TaskInfo o2) { - if (o1.startTime > o2.startTime) { + if (o1.assignedTime > o2.assignedTime) { return -1; - } else if (o1.startTime < o2.startTime) { + } else if (o1.assignedTime < o2.assignedTime) { return 1; } else { // Comparing on time is not sufficient since two may be created at the same time, diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 339f513..044040e 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -57,6 +57,7 @@ import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -118,6 +119,67 @@ public void testSimpleNoLocalityAllocation() throws IOException, InterruptedExce } } + @Test(timeout = 10000) + public void testPreemption_StartedAttemptsOnly() throws InterruptedException, IOException { + Priority priority1 = Priority.newInstance(1); + Priority priority2 = Priority.newInstance(2); + Priority priority3 = Priority.newInstance(3); + String [] hosts = new String[] {HOST1}; + TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 2, 0); + try { + + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + Object task3 = "task3"; + Object clientCookie3 = "cookie3"; + + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1); + tsWrapper.allocateTask(task2, hosts, priority3, clientCookie2); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numLocalAllocations == 2) { + break; + } + } + ArgumentCaptor taskCapture = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor containerCaptor = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(2)) + .taskAllocated(taskCapture.capture(), any(Object.class), containerCaptor.capture()); + assertEquals(task1, taskCapture.getAllValues().get(0)); + assertEquals(task2, taskCapture.getAllValues().get(1)); + + verify(tsWrapper.mockAppCallback, times(2)).taskAllocated(any(Object.class), + any(Object.class), any(Container.class)); + assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + + // Now start the higher priority task ("nice" priority levels) + tsWrapper.taskStarted(task1); + + reset(tsWrapper.mockAppCallback); + + tsWrapper.allocateTask(task3, hosts, priority1, clientCookie3); + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) { + break; + } + } + + ArgumentCaptor containerIdArgumentCaptor = ArgumentCaptor.forClass(ContainerId.class); + // Verify that task1 (higher priority than task2) got preempted, since task2 had not yet started. + verify(tsWrapper.mockAppCallback).preemptContainer(containerIdArgumentCaptor.capture()); + assertEquals(containerCaptor.getAllValues().get(0).getId(), containerIdArgumentCaptor.getValue()); + + } finally { + tsWrapper.shutdown(); + } + } @Test(timeout = 10000) public void testPreemption() throws InterruptedException, IOException { @@ -153,6 +215,9 @@ public void testPreemption() throws InterruptedException, IOException { assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations); assertEquals(0, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + tsWrapper.taskStarted(task1); + tsWrapper.taskStarted(task2); + reset(tsWrapper.mockAppCallback); tsWrapper.allocateTask(task4, hosts, priority1, clientCookie4); @@ -681,6 +746,8 @@ public void testForcedLocalityPreemption() throws IOException, InterruptedExcept assertEquals(2, argumentCaptor.getAllValues().size()); assertEquals(task1, argumentCaptor.getAllValues().get(0)); assertEquals(task2, argumentCaptor.getAllValues().get(1)); + tsWrapper.taskStarted(task1); + tsWrapper.taskStarted(task2); reset(tsWrapper.mockAppCallback); // Allocate t4 at higher priority. t3 should not be allocated, @@ -741,6 +808,8 @@ public void testPreemptionChoiceTimeOrdering() throws IOException, InterruptedEx .taskAllocated(argumentCaptor.capture(), any(Object.class), cArgCaptor.capture()); ContainerId t1Cid = cArgCaptor.getValue().getId(); + tsWrapper.taskStarted(task1); + reset(tsWrapper.mockAppCallback); // Move clock backwards (so that t1 allocation is after t2 allocation) // Request task2 (task1 already started at previously set time) @@ -750,6 +819,7 @@ public void testPreemptionChoiceTimeOrdering() throws IOException, InterruptedEx verify(tsWrapper.mockAppCallback, times(1)) .taskAllocated(argumentCaptor.capture(), any(Object.class), cArgCaptor.capture()); + tsWrapper.taskStarted(task2); reset(tsWrapper.mockAppCallback); // Move clock forward, and request a task at p=1 @@ -820,6 +890,9 @@ public void testForcedLocalityMultiplePreemptionsSameHost1() throws IOException, assertEquals(2, cArgCaptor.getAllValues().size()); ContainerId t1CId = cArgCaptor.getAllValues().get(0).getId(); + tsWrapper.taskStarted(task1); + tsWrapper.taskStarted(task2); + reset(tsWrapper.mockAppCallback); // At this point. 2 tasks running - both at priority 2. // Try running a priority 1 task @@ -923,6 +996,9 @@ public void testForcedLocalityMultiplePreemptionsSameHost2() throws IOException, assertEquals(2, cArgCaptor.getAllValues().size()); ContainerId t1CId = cArgCaptor.getAllValues().get(0).getId(); + tsWrapper.taskStarted(task1); + tsWrapper.taskStarted(task2); + reset(tsWrapper.mockAppCallback); // At this point. 2 tasks running - both at priority 2. // Try running a priority 1 task @@ -1582,6 +1658,9 @@ void allocateTask(Object task, String[] hosts, Priority priority, Object clientC ts.allocateTask(task, resource, hosts, null, priority, null, clientCookie); } + void taskStarted(Object task) { + ts.taskStateUpdated(task, TaskScheduler.SchedulerTaskState.STARTED); + } void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReason) { diff --git pom.xml pom.xml index e0aae27..21d134f 100644 --- pom.xml +++ pom.xml @@ -188,7 +188,7 @@ 1.7.10 4.0.4 3.0.0-SNAPSHOT - 0.8.4 + 0.9.0-SNAPSHOT 0.92.0-incubating 2.2.0 2.0.0