diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 4410db1..47a7622 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -157,12 +157,14 @@ public long generateContainerId() { } } - static class PartitionedResourceRequests { + public class PartitionedResourceRequests { private List guaranteed = new ArrayList<>(); private List opportunistic = new ArrayList<>(); + public List getGuaranteed() { return guaranteed; } + public List getOpportunistic() { return opportunistic; } @@ -186,10 +188,10 @@ public OpportunisticContainerAllocator( } /** - * Entry point into the Opportunistic Container Allocator. + * Allocate OPPORTUNISTIC containers. * @param request AllocateRequest * @param applicationAttemptId ApplicationAttemptId - * @param appContext App Specific OpportunisticContainerContext + * @param opportContext App specific OpportunisticContainerContext * @param rmIdentifier RM Identifier * @param appSubmitter App Submitter * @return List of Containers. @@ -197,37 +199,31 @@ public OpportunisticContainerAllocator( */ public List allocateContainers( AllocateRequest request, ApplicationAttemptId applicationAttemptId, - OpportunisticContainerContext appContext, long rmIdentifier, + OpportunisticContainerContext opportContext, long rmIdentifier, String appSubmitter) throws YarnException { - // Partition requests into GUARANTEED and OPPORTUNISTIC reqs - PartitionedResourceRequests partitionedAsks = - partitionAskList(request.getAskList()); - - if (partitionedAsks.getOpportunistic().isEmpty()) { - return Collections.emptyList(); - } - + // Update released containers. List releasedContainers = request.getReleaseList(); int numReleasedContainers = releasedContainers.size(); if (numReleasedContainers > 0) { LOG.info("AttemptID: " + applicationAttemptId + " released: " + numReleasedContainers); - appContext.getContainersAllocated().removeAll(releasedContainers); + opportContext.getContainersAllocated().removeAll(releasedContainers); } - // Also, update black list + // Update black list. ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); if (rbr != null) { - appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); - appContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); + opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); + opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); } - // Add OPPORTUNISTIC reqs to the outstanding reqs - appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic()); + // Add OPPORTUNISTIC requests to the outstanding ones. + opportContext.addToOutstandingReqs(request.getAskList()); + // Satisfy the outstanding OPPORTUNISTIC requests. List allocatedContainers = new ArrayList<>(); for (Priority priority : - appContext.getOutstandingOpReqs().descendingKeySet()) { + opportContext.getOutstandingOpReqs().descendingKeySet()) { // Allocated containers : // Key = Requested Capability, // Value = List of Containers of given cap (the actual container size @@ -235,16 +231,14 @@ public OpportunisticContainerAllocator( // we need the requested capability (key) to match against // the outstanding reqs) Map> allocated = allocate(rmIdentifier, - appContext, priority, applicationAttemptId, appSubmitter); + opportContext, priority, applicationAttemptId, appSubmitter); for (Map.Entry> e : allocated.entrySet()) { - appContext.matchAllocationToOutstandingRequest( + opportContext.matchAllocationToOutstandingRequest( e.getKey(), e.getValue()); allocatedContainers.addAll(e.getValue()); } } - // Send all the GUARANTEED Reqs to RM - request.setAskList(partitionedAsks.getGuaranteed()); return allocatedContainers; } @@ -359,8 +353,14 @@ private static Token newContainerToken(NodeId nodeId, byte[] password, return containerToken; } - private PartitionedResourceRequests partitionAskList(List - askList) { + /** + * Partitions a list of ResourceRequest to two separate lists, one for + * GUARANTEED and one for OPPORTUNISTIC ResourceRequests. + * @param askList the list of ResourceRequests to be partitioned + * @return the partitioned ResourceRequests + */ + public PartitionedResourceRequests partitionAskList( + List askList) { PartitionedResourceRequests partitionedRequests = new PartitionedResourceRequests(); for (ResourceRequest rr : askList) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index a12d16a..0f47c93 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -220,16 +220,27 @@ private void setNodeList(List nodeList) { public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( DistributedSchedulingAllocateRequest request) throws YarnException, IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Forwarding allocate request to the" + - "Distributed Scheduler Service on YARN RM"); - } + + // Partition requests to GUARANTEED and OPPORTUNISTIC. + OpportunisticContainerAllocator.PartitionedResourceRequests + partitionedAsks = containerAllocator + .partitionAskList(request.getAllocateRequest().getAskList()); + + // Allocate OPPORTUNISTIC containers. + request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic()); List allocatedContainers = containerAllocator.allocateContainers( request.getAllocateRequest(), applicationAttemptId, oppContainerContext, rmIdentifier, appSubmitter); + // Prepare request for sending to RM for scheduling GUARANTEED containers. request.setAllocatedContainers(allocatedContainers); + request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Forwarding allocate request to the" + + "Distributed Scheduler Service on YARN RM"); + } DistributedSchedulingAllocateResponse dsResp = getNextInterceptor().allocateForDistributedScheduling(request); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 815d29d..b266a8c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -219,34 +219,50 @@ public long generateContainerId() { public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { + // Partition requests to GUARANTEED and OPPORTUNISTIC. + OpportunisticContainerAllocator.PartitionedResourceRequests + partitionedAsks = + oppContainerAllocator.partitionAskList(request.getAskList()); + + // Allocate GUARANTEED containers. + request.setAskList(partitionedAsks.getGuaranteed()); + AllocateResponse allocateResp = super.allocate(request); + + if (partitionedAsks.getOpportunistic().isEmpty()) { + return allocateResp; + } + + // Allocate OPPORTUNISTIC containers. + request.setAskList(partitionedAsks.getOpportunistic()); final ApplicationAttemptId appAttemptId = getAppAttemptId(); SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) rmContext.getScheduler()).getApplicationAttempt(appAttemptId); + OpportunisticContainerContext oppCtx = appAttempt.getOpportunisticContainerContext(); + oppCtx.updateCompletedContainers(allocateResp); oppCtx.updateNodeList(getLeastLoadedNodes()); + List oppContainers = oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx, ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); + // Create RMContainers, update the NMTokens and add them together with the + // allocated OPPORTUNISTIC containers to the AllocateResponse. if (!oppContainers.isEmpty()) { handleNewContainers(oppContainers, false); appAttempt.updateNMTokens(oppContainers); + allocateResp.getNMTokens().addAll(appAttempt.pullUpdatedNMTokens()); + allocateResp.getAllocatedContainers().addAll(oppContainers); } - // Allocate all guaranteed containers - AllocateResponse allocateResp = super.allocate(request); - - oppCtx.updateCompletedContainers(allocateResp); - // Add all opportunistic containers - allocateResp.getAllocatedContainers().addAll(oppContainers); return allocateResp; } @Override public RegisterDistributedSchedulingAMResponse - registerApplicationMasterForDistributedScheduling( + registerApplicationMasterForDistributedScheduling( RegisterApplicationMasterRequest request) throws YarnException, IOException { RegisterApplicationMasterResponse response =