diff --git a/common/src/java/org/apache/hive/http/StackServlet.java b/common/src/java/org/apache/hive/http/StackServlet.java index 610b391..3345466 100644 --- a/common/src/java/org/apache/hive/http/StackServlet.java +++ b/common/src/java/org/apache/hive/http/StackServlet.java @@ -74,7 +74,7 @@ private synchronized void printThreadInfo( Thread.State state = info.getThreadState(); stream.println(" State: " + state); stream.println(" Blocked count: " + info.getBlockedCount()); - stream.println(" Wtaited count: " + info.getWaitedCount()); + stream.println(" Waited count: " + info.getWaitedCount()); if (contention) { stream.println(" Blocked time: " + info.getBlockedTime()); stream.println(" Waited time: " + info.getWaitedTime()); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 97191f8..3f0f301 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -193,6 +193,7 @@ public int compare(Priority o1, Priority o2) { private final LlapTaskSchedulerMetrics metrics; private final JvmPauseMonitor pauseMonitor; + private final Random random = new Random(); public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { this(taskSchedulerContext, new MonotonicClock(), true); @@ -804,27 +805,30 @@ private SelectHostResult selectHost(TaskInfo request) { /* fall through - miss in locality or no locality-requested */ Collection instances = activeInstances.getAllInstancesOrdered(true); - ArrayList allNodes = new ArrayList<>(instances.size()); + List allNodes = new ArrayList<>(instances.size()); for (ServiceInstance inst : instances) { NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity()); - if (nodeInfo != null) { + if (nodeInfo != null && nodeInfo.canAcceptTask()) { allNodes.add(nodeInfo); } } + if (allNodes.isEmpty()) { + return SELECT_HOST_RESULT_DELAYED_RESOURCES; + } + if (requestedHosts == null || requestedHosts.length == 0) { // no locality-requested, iterate the available hosts in consistent order from the beginning if (LOG.isDebugEnabled()) { - LOG.debug("No-locality requested. Attempting to allocate next available host for task={}", request.task); + LOG.debug("No-locality requested. Selecting a random host for task={}", request.task); } - for (NodeInfo nodeInfo : allNodes) { - if (nodeInfo.canAcceptTask()) { - LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts={}", - nodeInfo.toShortString(), allNodes.size(), ((requestedHosts == null || requestedHosts.length == 0) - ? "null" : requestedHostsDebugStr)); - return new SelectHostResult(nodeInfo); - } + + NodeInfo randomNode = allNodes.get(random.nextInt(allNodes.size())); + if (LOG.isInfoEnabled()) { + LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts=null", + randomNode.toShortString(), allNodes.size()); } + return new SelectHostResult(randomNode); } else { // miss in locality request, try the next available host that can accept tasks (assume the consistent instances // list as a ring) from the index of first requested host @@ -843,15 +847,13 @@ private SelectHostResult selectHost(TaskInfo request) { for (int i = 0; i < allNodes.size(); i++) { NodeInfo nodeInfo = allNodes.get((i + requestedHostIdx + 1) % allNodes.size()); - if (nodeInfo.canAcceptTask()) { - if (LOG.isInfoEnabled()) { - LOG.info("Assigning {} when looking for first requested host, from #hosts={}," - + " requestedHosts={}", nodeInfo.toShortString(), allNodes.size(), - ((requestedHosts == null || requestedHosts.length == 0) ? "null" : - requestedHostsDebugStr)); - } - return new SelectHostResult(nodeInfo); + if (LOG.isInfoEnabled()) { + LOG.info("Assigning {} when looking for first requested host, from #hosts={}," + + " requestedHosts={}", nodeInfo.toShortString(), allNodes.size(), + ((requestedHosts == null || requestedHosts.length == 0) ? "null" : + requestedHostsDebugStr)); } + return new SelectHostResult(nodeInfo); } } diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index d60635b..30a8e43 100644 --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.verify; import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -501,8 +502,8 @@ public void testFallbackAllocationOrderedNext() throws IOException, InterruptedE // 2nd task provided host1 as location preference, it should be assigned host1 as it has capacity assertEquals(hostsKnown[0], argumentCaptor2.getAllValues().get(1).getNodeId().getHost()); assertEquals(task3, argumentCaptor.getAllValues().get(2)); - // 3rd task provided no location preference, it is tried with host1 but it is full, so gets assigned host2 - assertEquals(hostsKnown[1], argumentCaptor2.getAllValues().get(2).getNodeId().getHost()); + // 3rd task provided no location preference, it will get random allocation + assertTrue(Arrays.asList(hostsKnown).contains(argumentCaptor2.getAllValues().get(2).getNodeId().getHost())); assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest); assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);