diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 7838bef..b1240aa 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -782,27 +782,60 @@ private SelectHostResult selectHost(TaskInfo request) { } } } - /* fall through - miss in locality (random scheduling) or no locality-requested */ - Collection instances = activeInstances.getAll(); - ArrayList all = new ArrayList<>(instances.size()); + + /* fall through - miss in locality or no locality-requested */ + Collection instances = activeInstances.getAllInstancesOrdered(true); + ArrayList allNodes = new ArrayList<>(instances.size()); for (ServiceInstance inst : instances) { NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); - if (nodeInfo != null && nodeInfo.canAcceptTask()) { - all.add(nodeInfo); + if (nodeInfo != null) { + allNodes.add(nodeInfo); } } - if (LOG.isDebugEnabled()) { - LOG.debug("Attempting random allocation for task={}", request.task); - } - if (all.isEmpty()) { - return SELECT_HOST_RESULT_DELAYED_RESOURCES; - } - NodeInfo randomNode = all.get(random.nextInt(all.size())); - LOG.info("Assigning " + randomNode.toShortString() - + " when looking for any host, from #hosts=" + all.size() + ", requestedHosts=" - + ((requestedHosts == null || requestedHosts.length == 0) + + if (requestedHosts == null || requestedHosts.length == 0) { + // no locality-requested, iterate the available hosts in consistent order from the beginning + if (LOG.isDebugEnabled()) { + LOG.debug("No-locality requested. Attempting to allocate next available host for task={}", request.task); + } + for (NodeInfo nodeInfo : allNodes) { + if (nodeInfo.canAcceptTask()) { + LOG.info("Assigning " + nodeInfo.toShortString() + + " when looking for any host, from #hosts=" + allNodes.size() + ", requestedHosts=" + + ((requestedHosts == null || requestedHosts.length == 0) ? "null" : Arrays.toString(requestedHosts))); - return new SelectHostResult(randomNode); + return new SelectHostResult(nodeInfo); + } + } + } else { + // miss in locality request, try the next available host that can accept tasks (assume the consistent instances + // list as a ring) from the index of first requested host + final String firstRequestedHost = requestedHosts[0]; + if (LOG.isDebugEnabled()) { + LOG.debug("Locality miss. Attempting to allocate next available host from first requested host({}) for " + + "task={}", firstRequestedHost, request.task); + } + int requestedHostIdx = -1; + for (int i = 0; i < allNodes.size(); i++) { + if (allNodes.get(i).getHost().equals(firstRequestedHost)) { + requestedHostIdx = i; + break; + } + } + + for (int i = 0; i < allNodes.size(); i++) { + NodeInfo nodeInfo = allNodes.get((i + requestedHostIdx + 1) % allNodes.size()); + if (nodeInfo.canAcceptTask()) { + LOG.info("Assigning " + nodeInfo.toShortString() + + " when looking for first requested host, from #hosts=" + allNodes.size() + ", requestedHosts=" + + ((requestedHosts == null || requestedHosts.length == 0) + ? "null" : Arrays.toString(requestedHosts))); + return new SelectHostResult(nodeInfo); + } + } + } + + return SELECT_HOST_RESULT_DELAYED_RESOURCES; } finally { readLock.unlock(); } diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 85d2bcd..d60635b 100644 --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -456,6 +456,62 @@ public void testForcedLocalityUnknownHost() throws IOException, InterruptedExcep } } + @Test(timeout = 10000000) + public void testFallbackAllocationOrderedNext() throws IOException, InterruptedException { + Priority priority1 = Priority.newInstance(1); + + String[] hostsKnown = new String[]{HOST1, HOST2}; + String[] hostsUnknown = new String[]{HOST3}; + String[] noHosts = new String[]{}; + + TestTaskSchedulerServiceWrapper tsWrapper = + new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 1, -1l); + try { + Object task1 = "task1"; + Object clientCookie1 = "cookie1"; + + Object task2 = "task2"; + Object clientCookie2 = "cookie2"; + + Object task3 = "task3"; + Object clientCookie3 = "cookie3"; + + tsWrapper.controlScheduler(true); + tsWrapper.allocateTask(task1, hostsUnknown, priority1, clientCookie1); + tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2); + tsWrapper.allocateTask(task3, noHosts, priority1, clientCookie3); + + while (true) { + tsWrapper.signalSchedulerRun(); + tsWrapper.awaitSchedulerRun(); + if (tsWrapper.ts.dagStats.numTotalAllocations == 3) { + break; + } + } + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Object.class); + ArgumentCaptor argumentCaptor2 = ArgumentCaptor.forClass(Container.class); + verify(tsWrapper.mockAppCallback, times(3)) + .taskAllocated(argumentCaptor.capture(), any(Object.class), argumentCaptor2.capture()); + assertEquals(3, argumentCaptor.getAllValues().size()); + assertEquals(task1, argumentCaptor.getAllValues().get(0)); + // 1st task provided unknown host location, it should be assigned first host + assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(0).getNodeId().getHost()); + assertEquals(task2, argumentCaptor.getAllValues().get(1)); + // 2nd task provided host1 as location preference, it should be assigned host1 as it has capacity + assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(1).getNodeId().getHost()); + assertEquals(task3, argumentCaptor.getAllValues().get(2)); + // 3rd task provided no location preference, it is tried with host1 but it is full, so gets assigned host2 + assertEquals(hostsKnown[1], argumentCaptor2.getAllValues().get(2).getNodeId().getHost()); + + assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); + assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations); + assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations); + } finally { + tsWrapper.shutdown(); + } + } + @Test(timeout = 10000) public void testForcedLocalityPreemption() throws IOException, InterruptedException { Priority priority1 = Priority.newInstance(1);