diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java index b9b4b02..afb6f59 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java @@ -111,6 +111,7 @@ public static void setup() throws Exception { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setBoolean( YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); @@ -229,6 +230,9 @@ public void testAMRMClient() throws YarnException, IOException { amClient.registerApplicationMaster("Host", 10000, ""); + testOpportunisticAllocation( + (AMRMClientImpl) amClient); + testAllocation((AMRMClientImpl)amClient); amClient @@ -247,7 +251,6 @@ private void testAllocation( final AMRMClientImpl amClient) throws YarnException, IOException { // setup container request - assertEquals(0, amClient.ask.size()); assertEquals(0, amClient.release.size()); @@ -388,6 +391,119 @@ private void testAllocation( assertEquals(0, amClient.release.size()); } + /** + * Tests allocation with requests comprising only opportunistic containers. + */ + private void testOpportunisticAllocation( + final AMRMClientImpl amClient) + throws YarnException, IOException { + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(capability, null, null, priority, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + + int oppContainersRequestedAny = + amClient.getTable(0).get(priority, ResourceRequest.ANY, + ExecutionType.OPPORTUNISTIC, capability).remoteRequest + .getNumContainers(); + + assertEquals(2, oppContainersRequestedAny); + + assertEquals(1, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // RM should allocate container within 2 calls to allocate() + int allocatedContainerCount = 0; + int iterationsLeft = 10; + Set releases = new TreeSet<>(); + + amClient.getNMTokenCache().clearCache(); + Assert.assertEquals(0, + amClient.getNMTokenCache().numberOfTokensInCache()); + HashMap receivedNMTokens = new HashMap<>(); + + while (allocatedContainerCount < oppContainersRequestedAny + && iterationsLeft-- > 0) { + AllocateResponse allocResponse = amClient.allocate(0.1f); + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + for (Container container : allocResponse.getAllocatedContainers()) { + allocatedContainerCount++; + ContainerId rejectContainerId = container.getId(); + releases.add(rejectContainerId); + } + + for (NMToken token : allocResponse.getNMTokens()) { + String nodeID = token.getNodeId().toString(); + receivedNMTokens.put(nodeID, token.getToken()); + } + + if (allocatedContainerCount < oppContainersRequestedAny) { + // sleep to let NM's heartbeat to RM and trigger allocations + sleep(100); + } + } + + assertEquals(1, receivedNMTokens.values().size()); + + assertEquals(oppContainersRequestedAny, allocatedContainerCount); + + for (ContainerId rejectContainerId : releases) { + amClient.releaseAssignedContainer(rejectContainerId); + } + assertEquals(2, amClient.release.size()); + assertEquals(0, amClient.ask.size()); + + // need to tell the AMRMClient that we don't need these resources anymore + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + amClient.removeContainerRequest( + new AMRMClient.ContainerRequest(capability, nodes, racks, priority, 0, + true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + assertEquals(1, amClient.ask.size()); + + iterationsLeft = 3; + // do a few iterations to ensure RM is not going to send new containers + while (iterationsLeft-- > 0) { + // inform RM of rejection + AllocateResponse allocResponse = amClient.allocate(0.1f); + // RM did not send new containers because AM does not need any + assertEquals(0, allocResponse.getAllocatedContainers().size()); + if (allocResponse.getCompletedContainersStatuses().size() > 0) { + for (ContainerStatus cStatus : allocResponse + .getCompletedContainersStatuses()) { + if (releases.contains(cStatus.getContainerId())) { + assertEquals(cStatus.getState(), ContainerState.COMPLETE); + assertEquals(-100, cStatus.getExitStatus()); + releases.remove(cStatus.getContainerId()); + } + } + } + if (iterationsLeft > 0) { + // sleep to make sure NM's heartbeat + sleep(100); + } + } + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + } + private void sleep(int sleepTime) { try { Thread.sleep(sleepTime); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 4410db1..16436bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -38,7 +38,6 @@ import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -157,12 +156,18 @@ public long generateContainerId() { } } - static class PartitionedResourceRequests { + /** + * Class that includes two lists of {@link ResourceRequest}s: one for + * GUARANTEED and one for OPPORTUNISTIC {@link ResourceRequest}s. + */ + public static class PartitionedResourceRequests { private List guaranteed = new ArrayList<>(); private List opportunistic = new ArrayList<>(); + public List getGuaranteed() { return guaranteed; } + public List getOpportunistic() { return opportunistic; } @@ -186,10 +191,10 @@ public OpportunisticContainerAllocator( } /** - * Entry point into the Opportunistic Container Allocator. + * Allocate OPPORTUNISTIC containers. * @param request AllocateRequest * @param applicationAttemptId ApplicationAttemptId - * @param appContext App Specific OpportunisticContainerContext + * @param opportContext App specific OpportunisticContainerContext * @param rmIdentifier RM Identifier * @param appSubmitter App Submitter * @return List of Containers. @@ -197,37 +202,31 @@ public OpportunisticContainerAllocator( */ public List allocateContainers( AllocateRequest request, ApplicationAttemptId applicationAttemptId, - OpportunisticContainerContext appContext, long rmIdentifier, + OpportunisticContainerContext opportContext, long rmIdentifier, String appSubmitter) throws YarnException { - // Partition requests into GUARANTEED and OPPORTUNISTIC reqs - PartitionedResourceRequests partitionedAsks = - partitionAskList(request.getAskList()); - - if (partitionedAsks.getOpportunistic().isEmpty()) { - return Collections.emptyList(); - } - + // Update released containers. List releasedContainers = request.getReleaseList(); int numReleasedContainers = releasedContainers.size(); if (numReleasedContainers > 0) { LOG.info("AttemptID: " + applicationAttemptId + " released: " + numReleasedContainers); - appContext.getContainersAllocated().removeAll(releasedContainers); + opportContext.getContainersAllocated().removeAll(releasedContainers); } - // Also, update black list + // Update black list. ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); if (rbr != null) { - appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); - appContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); + opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); + opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); } - // Add OPPORTUNISTIC reqs to the outstanding reqs - appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic()); + // Add OPPORTUNISTIC requests to the outstanding ones. + opportContext.addToOutstandingReqs(request.getAskList()); + // Satisfy the outstanding OPPORTUNISTIC requests. List allocatedContainers = new ArrayList<>(); for (Priority priority : - appContext.getOutstandingOpReqs().descendingKeySet()) { + opportContext.getOutstandingOpReqs().descendingKeySet()) { // Allocated containers : // Key = Requested Capability, // Value = List of Containers of given cap (the actual container size @@ -235,16 +234,14 @@ public OpportunisticContainerAllocator( // we need the requested capability (key) to match against // the outstanding reqs) Map> allocated = allocate(rmIdentifier, - appContext, priority, applicationAttemptId, appSubmitter); + opportContext, priority, applicationAttemptId, appSubmitter); for (Map.Entry> e : allocated.entrySet()) { - appContext.matchAllocationToOutstandingRequest( + opportContext.matchAllocationToOutstandingRequest( e.getKey(), e.getValue()); allocatedContainers.addAll(e.getValue()); } } - // Send all the GUARANTEED Reqs to RM - request.setAskList(partitionedAsks.getGuaranteed()); return allocatedContainers; } @@ -359,8 +356,14 @@ private static Token newContainerToken(NodeId nodeId, byte[] password, return containerToken; } - private PartitionedResourceRequests partitionAskList(List - askList) { + /** + * Partitions a list of ResourceRequest to two separate lists, one for + * GUARANTEED and one for OPPORTUNISTIC ResourceRequests. + * @param askList the list of ResourceRequests to be partitioned + * @return the partitioned ResourceRequests + */ + public PartitionedResourceRequests partitionAskList( + List askList) { PartitionedResourceRequests partitionedRequests = new PartitionedResourceRequests(); for (ResourceRequest rr : askList) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index a12d16a..0f47c93 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -220,16 +220,27 @@ private void setNodeList(List nodeList) { public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( DistributedSchedulingAllocateRequest request) throws YarnException, IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Forwarding allocate request to the" + - "Distributed Scheduler Service on YARN RM"); - } + + // Partition requests to GUARANTEED and OPPORTUNISTIC. + OpportunisticContainerAllocator.PartitionedResourceRequests + partitionedAsks = containerAllocator + .partitionAskList(request.getAllocateRequest().getAskList()); + + // Allocate OPPORTUNISTIC containers. + request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic()); List allocatedContainers = containerAllocator.allocateContainers( request.getAllocateRequest(), applicationAttemptId, oppContainerContext, rmIdentifier, appSubmitter); + // Prepare request for sending to RM for scheduling GUARANTEED containers. request.setAllocatedContainers(allocatedContainers); + request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocate request to the" + + "Distributed Scheduler Service on YARN RM"); + } DistributedSchedulingAllocateResponse dsResp = getNextInterceptor().allocateForDistributedScheduling(request); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 4d73ba2..4f952b7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -542,7 +542,8 @@ public AllocateResponse allocate(AllocateRequest request) RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); AllocateResponse allocateResponse = recordFactory.newRecordInstance(AllocateResponse.class); - if (!allocation.getContainers().isEmpty()) { + if (allocation.getNMTokens() != null && + !allocation.getNMTokens().isEmpty()) { allocateResponse.setNMTokens(allocation.getNMTokens()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index bdd5718..50a9c4d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -220,34 +220,51 @@ public long generateContainerId() { public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { + // Partition requests to GUARANTEED and OPPORTUNISTIC. + OpportunisticContainerAllocator.PartitionedResourceRequests + partitionedAsks = + oppContainerAllocator.partitionAskList(request.getAskList()); + + // Allocate OPPORTUNISTIC containers. + request.setAskList(partitionedAsks.getOpportunistic()); final ApplicationAttemptId appAttemptId = getAppAttemptId(); SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) rmContext.getScheduler()).getApplicationAttempt(appAttemptId); + OpportunisticContainerContext oppCtx = appAttempt.getOpportunisticContainerContext(); oppCtx.updateNodeList(getLeastLoadedNodes()); + List oppContainers = oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx, ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); + // Create RMContainers and update the NMTokens. if (!oppContainers.isEmpty()) { handleNewContainers(oppContainers, false); appAttempt.updateNMTokens(oppContainers); } - // Allocate all guaranteed containers + // Allocate GUARANTEED containers. + request.setAskList(partitionedAsks.getGuaranteed()); AllocateResponse allocateResp = super.allocate(request); + // Add allocated OPPORTUNISTIC containers to the AllocateResponse. + if (!oppContainers.isEmpty()) { + allocateResp.getAllocatedContainers().addAll(oppContainers); + } + + // Update opportunistic container context with the allocated GUARANTEED + // containers. oppCtx.updateCompletedContainers(allocateResp); // Add all opportunistic containers - allocateResp.getAllocatedContainers().addAll(oppContainers); return allocateResp; } @Override public RegisterDistributedSchedulingAMResponse - registerApplicationMasterForDistributedScheduling( + registerApplicationMasterForDistributedScheduling( RegisterApplicationMasterRequest request) throws YarnException, IOException { RegisterApplicationMasterResponse response =