diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fa75eb4025c..e3d7f8cbe4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -402,6 +402,17 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = false; + /** + * Maximum number of opportunistic containers to be allocated in + * AM heartbeat. + */ + @Unstable + public static final String + OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT = + RM_PREFIX + "opportunistic.max.container-allocation.per.am.heartbeat"; + public static final int + DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT = -1; + /** Number of nodes to be used by the Opportunistic Container allocator for * dispatching containers during container allocation. */ @Unstable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 1cec3dac11b..e761f8db927 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.scheduler; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; @@ -69,6 +70,8 @@ private static final int RACK_LOCAL_LOOP = 1; private static final int OFF_SWITCH_LOOP = 2; + private int maxAllocationsPerAMHeartbeat = -1; + /** * This class encapsulates application specific parameters used to build a * Container. @@ -290,6 +293,24 @@ public OpportunisticContainerAllocator( this.tokenSecretManager = tokenSecretManager; } + /** + * Create a new Opportunistic Container Allocator. + * @param tokenSecretManager TokenSecretManager + * @param maxAllocationsPerAMHeartbeat max number of containers to be + * allocated in one AM heartbeat + */ + public OpportunisticContainerAllocator( + BaseContainerTokenSecretManager tokenSecretManager, + int maxAllocationsPerAMHeartbeat) { + this.tokenSecretManager = tokenSecretManager; + this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat; + } + + @VisibleForTesting + void setMaxAllocationsPerAMHeartbeat(int maxAllocationsPerAMHeartbeat) { + this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat; + } + /** * Allocate OPPORTUNISTIC containers. * @param blackList Resource BlackList Request @@ -315,7 +336,6 @@ public OpportunisticContainerAllocator( // Add OPPORTUNISTIC requests to the outstanding ones. opportContext.addToOutstandingReqs(oppResourceReqs); - Set nodeBlackList = new HashSet<>(opportContext.getBlacklist()); List allocatedContainers = new ArrayList<>(); @@ -332,9 +352,18 @@ public OpportunisticContainerAllocator( // might be different than what is requested, which is why // we need the requested capability (key) to match against // the outstanding reqs) + int remAllocs = -1; + if (maxAllocationsPerAMHeartbeat > 0) { + remAllocs = + maxAllocationsPerAMHeartbeat - allocatedContainers.size() + - getTotalAllocations(allocations); + if (remAllocs <= 0) { + break; + } + } Map> allocation = allocate( rmIdentifier, opportContext, schedulerKey, applicationAttemptId, - appSubmitter, nodeBlackList); + appSubmitter, nodeBlackList, remAllocs); if (allocation.size() > 0) { allocations.add(allocation); continueLoop = true; @@ -354,16 +383,40 @@ public OpportunisticContainerAllocator( return allocatedContainers; } + private int getTotalAllocations( + List>> allocations) { + int totalAllocs = 0; + for (Map> allocation : allocations) { + for (List allocs : allocation.values()) { + totalAllocs += allocs.size(); + } + } + return totalAllocs; + } + private Map> allocate(long rmIdentifier, OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, - ApplicationAttemptId appAttId, String userName, Set blackList) + ApplicationAttemptId appAttId, String userName, Set blackList, + int maxAllocations) throws YarnException { Map> containers = new HashMap<>(); for (EnrichedResourceRequest enrichedAsk : appContext.getOutstandingOpReqs().get(schedKey).values()) { + int remainingAllocs = -1; + if (maxAllocations > 0) { + int totalAllocated = 0; + for (List allocs : containers.values()) { + totalAllocated += allocs.size(); + } + remainingAllocs = maxAllocations - totalAllocated; + if (remainingAllocs <= 0) { + break; + } + } allocateContainersInternal(rmIdentifier, appContext.getAppParams(), appContext.getContainerIdGenerator(), blackList, appAttId, - appContext.getNodeMap(), userName, containers, enrichedAsk); + appContext.getNodeMap(), userName, containers, enrichedAsk, + remainingAllocs); ResourceRequest anyAsk = enrichedAsk.getRequest(); if (!containers.isEmpty()) { LOG.info("Opportunistic allocation requested for [priority={}, " @@ -381,7 +434,7 @@ private void allocateContainersInternal(long rmIdentifier, Set blacklist, ApplicationAttemptId id, Map allNodes, String userName, Map> allocations, - EnrichedResourceRequest enrichedAsk) + EnrichedResourceRequest enrichedAsk, int maxAllocations) throws YarnException { if (allNodes.size() == 0) { LOG.info("No nodes currently available to " + @@ -394,6 +447,9 @@ private void allocateContainersInternal(long rmIdentifier, allocations.get(anyAsk.getCapability()).size()); toAllocate = Math.min(toAllocate, appParams.getMaxAllocationsPerSchedulerKeyPerRound()); + if (maxAllocations >= 0) { + toAllocate = Math.min(maxAllocations, toAllocate); + } int numAllocated = 0; // Node Candidates are selected as follows: // * Node local candidates selected in loop == 0 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java index 2d3b09914c6..9b601edbff4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -633,4 +633,97 @@ public void testAllocationWithNodeLabels() throws Exception { Assert.assertEquals(1, containers.size()); Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size()); } + + @Test + public void testMaxAllocationsPerAMHeartbeat() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + allocator.setMaxAllocationsPerAMHeartbeat(2); + List reqs = + Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 3, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "h6", + Resources.createResource(1 * GB), 3, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "/r3", + Resources.createResource(1 * GB), 3, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Assert.assertEquals(2, containers.size()); + containers = allocator.allocateContainers( + blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Assert.assertEquals(1, containers.size()); + } + + @Test + public void testMaxAllocationsPerAMHeartbeatDifferentSchedKey() + throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + allocator.setMaxAllocationsPerAMHeartbeat(2); + List reqs = + Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(2), "h6", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(3), "/r3", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Assert.assertEquals(2, containers.size()); + containers = allocator.allocateContainers( + blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Assert.assertEquals(2, containers.size()); + containers = allocator.allocateContainers( + blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Assert.assertEquals(1, containers.size()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 1ed1fdaabdb..89e3b478d1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -473,10 +473,14 @@ protected void serviceInit(Configuration conf) throws Exception { .getContainersMonitor(), this.aclsManager, dirsHandler); addService(webServer); ((NMContext) context).setWebServer(webServer); - + int maxAllocationsPerAMHeartbeat = conf.getInt( + YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT, + YarnConfiguration. + DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT); ((NMContext) context).setQueueableContainerAllocator( new OpportunisticContainerAllocator( - context.getContainerTokenSecretManager())); + context.getContainerTokenSecretManager(), + maxAllocationsPerAMHeartbeat)); dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(NodeManagerEventType.class, this); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index f6d23f71b7d..e8cd87fd1e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -225,8 +225,13 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext, YarnScheduler scheduler) { super(OpportunisticContainerAllocatorAMService.class.getName(), rmContext, scheduler); + int maxAllocationsPerAMHeartbeat = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT, + YarnConfiguration. + DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT); this.oppContainerAllocator = new OpportunisticContainerAllocator( - rmContext.getContainerTokenSecretManager()); + rmContext.getContainerTokenSecretManager(), + maxAllocationsPerAMHeartbeat); this.k = rmContext.getYarnConfiguration().getInt( YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED);