diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 1cec3dac11b..f00b1ffbb98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -317,6 +317,7 @@ public OpportunisticContainerAllocator( opportContext.addToOutstandingReqs(oppResourceReqs); Set nodeBlackList = new HashSet<>(opportContext.getBlacklist()); + Set allocatedNodes = new HashSet<>(); List allocatedContainers = new ArrayList<>(); // Satisfy the outstanding OPPORTUNISTIC requests. @@ -334,7 +335,7 @@ public OpportunisticContainerAllocator( // the outstanding reqs) Map> allocation = allocate( rmIdentifier, opportContext, schedulerKey, applicationAttemptId, - appSubmitter, nodeBlackList); + appSubmitter, nodeBlackList, allocatedNodes); if (allocation.size() > 0) { allocations.add(allocation); continueLoop = true; @@ -356,14 +357,15 @@ public OpportunisticContainerAllocator( private Map> allocate(long rmIdentifier, OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, - ApplicationAttemptId appAttId, String userName, Set blackList) + ApplicationAttemptId appAttId, String userName, Set blackList, + Set allocatedNodes) throws YarnException { Map> containers = new HashMap<>(); for (EnrichedResourceRequest enrichedAsk : appContext.getOutstandingOpReqs().get(schedKey).values()) { allocateContainersInternal(rmIdentifier, appContext.getAppParams(), - appContext.getContainerIdGenerator(), blackList, appAttId, - appContext.getNodeMap(), userName, containers, enrichedAsk); + appContext.getContainerIdGenerator(), blackList, allocatedNodes, + appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk); ResourceRequest anyAsk = enrichedAsk.getRequest(); if (!containers.isEmpty()) { LOG.info("Opportunistic allocation requested for [priority={}, " @@ -378,9 +380,9 @@ public OpportunisticContainerAllocator( private void allocateContainersInternal(long rmIdentifier, AllocationParams appParams, ContainerIdGenerator idCounter, - Set blacklist, ApplicationAttemptId id, - Map allNodes, String userName, - Map> allocations, + Set blacklist, Set allocatedNodes, + ApplicationAttemptId id, Map allNodes, + String userName, Map> allocations, EnrichedResourceRequest enrichedAsk) throws YarnException { if (allNodes.size() == 0) { @@ -405,7 +407,8 @@ private void allocateContainersInternal(long rmIdentifier, } while (numAllocated < toAllocate) { Collection nodeCandidates = - findNodeCandidates(loopIndex, allNodes, blacklist, enrichedAsk); + findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes, + enrichedAsk); for (RemoteNode rNode : nodeCandidates) { String rNodeHost = rNode.getNodeId().getHost(); // Ignore black list @@ -422,6 +425,11 @@ private void allocateContainersInternal(long rmIdentifier, continue; } } + else if (allocatedNodes.contains(rNodeHost)) { + LOG.info("Opportunistic container has already been allocated on " + + " [" + rNodeHost + "].."); + continue; + } if (loopIndex == RACK_LOCAL_LOOP) { if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) { location = rNode.getRackName(); @@ -433,11 +441,7 @@ private void allocateContainersInternal(long rmIdentifier, idCounter, id, userName, allocations, location, anyAsk, rNode); numAllocated++; - // Try to spread the allocations across the nodes. - // But don't add if it is a node local request. - if (loopIndex != NODE_LOCAL_LOOP) { - blacklist.add(rNode.getNodeId().getHost()); - } + allocatedNodes.add(rNodeHost); LOG.info("Allocated [" + container.getId() + "] as opportunistic at " + "location [" + location + "]"); if (numAllocated >= toAllocate) { @@ -461,7 +465,7 @@ private void allocateContainersInternal(long rmIdentifier, private Collection findNodeCandidates(int loopIndex, Map allNodes, Set blackList, - EnrichedResourceRequest enrichedRR) { + Set allocatedNodes, EnrichedResourceRequest enrichedRR) { LinkedList retList = new LinkedList<>(); String partition = getRequestPartition(enrichedRR); if (loopIndex > 1) { @@ -481,8 +485,9 @@ private void allocateContainersInternal(long rmIdentifier, allNodes, enrichedRR, retList, numContainers); } else { // Rack local candidates - numContainers = collectRackLocalCandidates( - allNodes, enrichedRR, retList, blackList, numContainers); + numContainers = + collectRackLocalCandidates(allNodes, enrichedRR, retList, + blackList, allocatedNodes, numContainers); } if (numContainers == enrichedRR.getRequest().getNumContainers()) { // If there is no change in numContainers, then there is no point @@ -496,12 +501,16 @@ private void allocateContainersInternal(long rmIdentifier, private int collectRackLocalCandidates(Map allNodes, EnrichedResourceRequest enrichedRR, LinkedList retList, - Set blackList, int numContainers) { + Set blackList, Set allocatedNodes, int numContainers) { String partition = getRequestPartition(enrichedRR); for (RemoteNode rNode : allNodes.values()) { if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) && enrichedRR.getRackLocations().contains(rNode.getRackName())) { - if (blackList.contains(rNode.getNodeId().getHost())) { + String rHost = rNode.getNodeId().getHost(); + if (blackList.contains(rHost)) { + continue; + } + if (allocatedNodes.contains(rHost)) { retList.addLast(rNode); } else { retList.addFirst(rNode); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java index 2d3b09914c6..30f02edc6b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -193,36 +193,42 @@ public void testNodeLocalAllocation() throws Exception { List reqs = Arrays.asList( ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) .priority(Priority.newInstance(1)) .resourceName("/r1") .capability(Resources.createResource(1 * GB)) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), - ResourceRequest.newBuilder().allocationRequestId(1) + ResourceRequest.newBuilder().allocationRequestId(2) .priority(Priority.newInstance(1)) .resourceName("h1") .capability(Resources.createResource(1 * GB)) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), - ResourceRequest.newBuilder().allocationRequestId(1) + ResourceRequest.newBuilder().allocationRequestId(2) .priority(Priority.newInstance(1)) .resourceName(ResourceRequest.ANY) .capability(Resources.createResource(1 * GB)) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), - ResourceRequest.newBuilder().allocationRequestId(2) + ResourceRequest.newBuilder().allocationRequestId(3) .priority(Priority.newInstance(1)) .resourceName("/r1") .capability(Resources.createResource(1 * GB)) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), - ResourceRequest.newBuilder().allocationRequestId(2) + ResourceRequest.newBuilder().allocationRequestId(3) .priority(Priority.newInstance(1)) .resourceName("h1") .capability(Resources.createResource(1 * GB)) .relaxLocality(true) .executionType(ExecutionType.OPPORTUNISTIC).build(), - ResourceRequest.newBuilder().allocationRequestId(2) + ResourceRequest.newBuilder().allocationRequestId(3) .priority(Priority.newInstance(1)) .resourceName(ResourceRequest.ANY) .capability(Resources.createResource(1 * GB)) @@ -243,14 +249,12 @@ public void testNodeLocalAllocation() throws Exception { List containers = allocator.allocateContainers( blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); System.out.println(containers); - Set allocatedHosts = new HashSet<>(); for (Container c : containers) { - allocatedHosts.add(c.getNodeHttpAddress()); + if (c.getAllocationRequestId() == 2 || c.getAllocationRequestId() == 3) { + Assert.assertEquals("h1:1234", c.getNodeHttpAddress()); + } } - Assert.assertEquals(2, containers.size()); - Assert.assertTrue(allocatedHosts.contains("h1:1234")); - Assert.assertFalse(allocatedHosts.contains("h2:1234")); - Assert.assertFalse(allocatedHosts.contains("h3:1234")); + Assert.assertEquals(3, containers.size()); } @Test