diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 16436bd..80626f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -192,7 +192,8 @@ public OpportunisticContainerAllocator( /** * Allocate OPPORTUNISTIC containers. - * @param request AllocateRequest + * @param blackList Resource BlackList Request + * @param oppResourceReqs Opportunistic Resource Requests * @param applicationAttemptId ApplicationAttemptId * @param opportContext App specific OpportunisticContainerContext * @param rmIdentifier RM Identifier @@ -200,28 +201,20 @@ public OpportunisticContainerAllocator( * @return List of Containers. * @throws YarnException YarnException */ - public List allocateContainers( - AllocateRequest request, ApplicationAttemptId applicationAttemptId, + public List allocateContainers(ResourceBlacklistRequest blackList, + List oppResourceReqs, + ApplicationAttemptId applicationAttemptId, OpportunisticContainerContext opportContext, long rmIdentifier, String appSubmitter) throws YarnException { - // Update released containers. - List releasedContainers = request.getReleaseList(); - int numReleasedContainers = releasedContainers.size(); - if (numReleasedContainers > 0) { - LOG.info("AttemptID: " + applicationAttemptId + " released: " - + numReleasedContainers); - opportContext.getContainersAllocated().removeAll(releasedContainers); - } // Update black list. - ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); - if (rbr != null) { - opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); - opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); + if (blackList != null) { + opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals()); + opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions()); } // Add OPPORTUNISTIC requests to the outstanding ones. - opportContext.addToOutstandingReqs(request.getAskList()); + opportContext.addToOutstandingReqs(oppResourceReqs); // Satisfy the outstanding OPPORTUNISTIC requests. List allocatedContainers = new ArrayList<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 725e2d9..449c9f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -52,9 +52,6 @@ private static final Logger LOG = LoggerFactory .getLogger(OpportunisticContainerContext.class); - // Currently just used to keep track of allocated containers. - // Can be used for reporting stats later. - private Set containersAllocated = new HashSet<>(); private AllocationParams appParams = new AllocationParams(); private ContainerIdGenerator containerIdGenerator = @@ -72,10 +69,6 @@ private final TreeMap> outstandingOpReqs = new TreeMap<>(); - public Set getContainersAllocated() { - return containersAllocated; - } - public AllocationParams getAppParams() { return appParams; } @@ -124,15 +117,6 @@ public void updateAllocationParams(Resource minResource, Resource maxResource, return outstandingOpReqs; } - public void updateCompletedContainers(AllocateResponse allocateResponse) { - for (ContainerStatus cs : - allocateResponse.getCompletedContainersStatuses()) { - if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - containersAllocated.remove(cs.getContainerId()); - } - } - } - /** * Takes a list of ResourceRequests (asks), extracts the key information viz. * (Priority, ResourceName, Capability) and adds to the outstanding @@ -187,7 +171,6 @@ public void addToOutstandingReqs(List resourceAsks) { public void matchAllocationToOutstandingRequest(Resource capability, List allocatedContainers) { for (Container c : allocatedContainers) { - containersAllocated.add(c.getId()); Map asks = outstandingOpReqs.get(c.getPriority()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index 0f47c93..a9b5ed4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -227,10 +227,10 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( .partitionAskList(request.getAllocateRequest().getAskList()); // Allocate OPPORTUNISTIC containers. - request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic()); List allocatedContainers = containerAllocator.allocateContainers( - request.getAllocateRequest(), applicationAttemptId, + request.getAllocateRequest().getResourceBlacklistRequest(), + partitionedAsks.getOpportunistic(), applicationAttemptId, oppContainerContext, rmIdentifier, appSubmitter); // Prepare request for sending to RM for scheduling GUARANTEED containers. @@ -252,18 +252,11 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( nodeTokens.put(nmToken.getNodeId(), nmToken); } - oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse()); - // Check if we have NM tokens for all the allocated containers. If not // generate one and update the response. updateAllocateResponse( dsResp.getAllocateResponse(), nmTokens, allocatedContainers); - if (LOG.isDebugEnabled()) { - LOG.debug("Number of opportunistic containers currently" + - "allocated by application: " + oppContainerContext - .getContainersAllocated().size()); - } return dsResp; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 3d7b2b1..8bd01b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -405,7 +405,6 @@ public AllocateResponse allocate(AllocateRequest request) ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId(); - ApplicationId applicationId = appAttemptId.getApplicationId(); this.amLivelinessMonitor.receivedPing(appAttemptId); @@ -422,8 +421,10 @@ public AllocateResponse allocate(AllocateRequest request) AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { String message = - "AM is not registered for known application attempt: " + appAttemptId - + " or RM had restarted after AM registered . AM should re-register."; + "AM is not registered for known application attempt: " + + appAttemptId + + " or RM had restarted after AM registered. " + + " AM should re-register."; throw new ApplicationMasterNotRegisteredException(message); } @@ -438,185 +439,10 @@ public AllocateResponse allocate(AllocateRequest request) throw new InvalidApplicationMasterRequestException(message); } - //filter illegal progress values - float filteredProgress = request.getProgress(); - if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY - || filteredProgress < 0) { - request.setProgress(0); - } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) { - request.setProgress(1); - } - - // Send the status update to the appAttempt. - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptStatusupdateEvent(appAttemptId, request - .getProgress())); - - List ask = request.getAskList(); - List release = request.getReleaseList(); - - ResourceBlacklistRequest blacklistRequest = - request.getResourceBlacklistRequest(); - List blacklistAdditions = - (blacklistRequest != null) ? - blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST; - List blacklistRemovals = - (blacklistRequest != null) ? - blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; - RMApp app = - this.rmContext.getRMApps().get(applicationId); - - // set label expression for Resource Requests if resourceName=ANY - ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); - for (ResourceRequest req : ask) { - if (null == req.getNodeLabelExpression() - && ResourceRequest.ANY.equals(req.getResourceName())) { - req.setNodeLabelExpression(asc.getNodeLabelExpression()); - } - } - - Resource maximumCapacity = rScheduler.getMaximumResourceCapability(); - - // sanity check - try { - RMServerUtils.normalizeAndValidateRequests(ask, - maximumCapacity, app.getQueue(), - rScheduler, rmContext); - } catch (InvalidResourceRequestException e) { - LOG.warn("Invalid resource ask by application " + appAttemptId, e); - throw e; - } - - try { - RMServerUtils.validateBlacklistRequest(blacklistRequest); - } catch (InvalidResourceBlacklistRequestException e) { - LOG.warn("Invalid blacklist request by application " + appAttemptId, e); - throw e; - } - - // In the case of work-preserving AM restart, it's possible for the - // AM to release containers from the earlier attempt. - if (!app.getApplicationSubmissionContext() - .getKeepContainersAcrossApplicationAttempts()) { - try { - RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); - } catch (InvalidContainerReleaseException e) { - LOG.warn("Invalid container release by application " + appAttemptId, - e); - throw e; - } - } - - // Split Update Resource Requests into increase and decrease. - // No Exceptions are thrown here. All update errors are aggregated - // and returned to the AM. - List increaseResourceReqs = new ArrayList<>(); - List decreaseResourceReqs = new ArrayList<>(); - List updateContainerErrors = - RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext, - request, maximumCapacity, increaseResourceReqs, - decreaseResourceReqs); - - // Send new requests to appAttempt. - Allocation allocation; - RMAppAttemptState state = - app.getRMAppAttempt(appAttemptId).getAppAttemptState(); - if (state.equals(RMAppAttemptState.FINAL_SAVING) || - state.equals(RMAppAttemptState.FINISHING) || - app.isAppFinalStateStored()) { - LOG.warn(appAttemptId + " is in " + state + - " state, ignore container allocate request."); - allocation = EMPTY_ALLOCATION; - } else { - allocation = - this.rScheduler.allocate(appAttemptId, ask, release, - blacklistAdditions, blacklistRemovals, - increaseResourceReqs, decreaseResourceReqs); - } - - if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { - LOG.info("blacklist are updated in Scheduler." + - "blacklistAdditions: " + blacklistAdditions + ", " + - "blacklistRemovals: " + blacklistRemovals); - } - RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); - AllocateResponse allocateResponse = + AllocateResponse response = recordFactory.newRecordInstance(AllocateResponse.class); - if (allocation.getNMTokens() != null && - !allocation.getNMTokens().isEmpty()) { - allocateResponse.setNMTokens(allocation.getNMTokens()); - } - - // Notify the AM of container update errors - if (!updateContainerErrors.isEmpty()) { - allocateResponse.setUpdateErrors(updateContainerErrors); - } - // update the response with the deltas of node status changes - List updatedNodes = new ArrayList(); - if(app.pullRMNodeUpdates(updatedNodes) > 0) { - List updatedNodeReports = new ArrayList(); - for(RMNode rmNode: updatedNodes) { - SchedulerNodeReport schedulerNodeReport = - rScheduler.getNodeReport(rmNode.getNodeID()); - Resource used = BuilderUtils.newResource(0, 0); - int numContainers = 0; - if (schedulerNodeReport != null) { - used = schedulerNodeReport.getUsedResource(); - numContainers = schedulerNodeReport.getNumContainers(); - } - NodeId nodeId = rmNode.getNodeID(); - NodeReport report = - BuilderUtils.newNodeReport(nodeId, rmNode.getState(), - rmNode.getHttpAddress(), rmNode.getRackName(), used, - rmNode.getTotalCapability(), numContainers, - rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), - rmNode.getNodeLabels()); - - updatedNodeReports.add(report); - } - allocateResponse.setUpdatedNodes(updatedNodeReports); - } - - allocateResponse.setAllocatedContainers(allocation.getContainers()); - allocateResponse.setCompletedContainersStatuses(appAttempt - .pullJustFinishedContainers()); - allocateResponse.setResponseId(lastResponse.getResponseId() + 1); - allocateResponse.setAvailableResources(allocation.getResourceLimit()); - - // Handling increased/decreased containers - List updatedContainers = new ArrayList<>(); - if (allocation.getIncreasedContainers() != null) { - for (Container c : allocation.getIncreasedContainers()) { - updatedContainers.add( - UpdatedContainer.newInstance( - ContainerUpdateType.INCREASE_RESOURCE, c)); - } - } - if (allocation.getDecreasedContainers() != null) { - for (Container c : allocation.getDecreasedContainers()) { - updatedContainers.add( - UpdatedContainer.newInstance( - ContainerUpdateType.DECREASE_RESOURCE, c)); - } - } - - allocateResponse.setUpdatedContainers(updatedContainers); - - allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); - - // add collector address for this application - if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - allocateResponse.setCollectorAddr( - this.rmContext.getRMApps().get(applicationId).getCollectorAddr()); - } - - // add preemption to the allocateResponse message (if any) - allocateResponse - .setPreemptionMessage(generatePreemptionMessage(allocation)); - - // Set application priority - allocateResponse.setApplicationPriority(app - .getApplicationPriority()); + allocate(amrmTokenIdentifier.getApplicationAttemptId(), + request, response); // update AMRMToken if the token is rolled-up MasterKeyData nextMasterKey = @@ -624,21 +450,24 @@ public AllocateResponse allocate(AllocateRequest request) if (nextMasterKey != null && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier - .getKeyId()) { + .getKeyId()) { + RMApp app = + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); + RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt; Token amrmToken = appAttempt.getAMRMToken(); if (nextMasterKey.getMasterKey().getKeyId() != appAttemptImpl.getAMRMTokenKeyId()) { LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back" - + " to application: " + applicationId); + + " to application: " + appAttemptId.getApplicationId()); amrmToken = rmContext.getAMRMTokenSecretManager() .createAndGetAMRMToken(appAttemptId); appAttemptImpl.setAMRMToken(amrmToken); } - allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token - .newInstance(amrmToken.getIdentifier(), amrmToken.getKind() - .toString(), amrmToken.getPassword(), amrmToken.getService() - .toString())); + response.setAMRMToken(org.apache.hadoop.yarn.api.records.Token + .newInstance(amrmToken.getIdentifier(), amrmToken.getKind() + .toString(), amrmToken.getPassword(), amrmToken.getService() + .toString())); } /* @@ -646,11 +475,212 @@ public AllocateResponse allocate(AllocateRequest request) * need to worry about unregister call occurring in between (which * removes the lock object). */ - lock.setAllocateResponse(allocateResponse); - return allocateResponse; + response.setResponseId(lastResponse.getResponseId() + 1); + lock.setAllocateResponse(response); + return response; } } + protected void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse allocateResponse) + throws YarnException { + + //filter illegal progress values + float filteredProgress = request.getProgress(); + if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY + || filteredProgress < 0) { + request.setProgress(0); + } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) { + request.setProgress(1); + } + + // Send the status update to the appAttempt. + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptStatusupdateEvent(appAttemptId, request + .getProgress())); + + List ask = request.getAskList(); + List release = request.getReleaseList(); + + ResourceBlacklistRequest blacklistRequest = + request.getResourceBlacklistRequest(); + List blacklistAdditions = + (blacklistRequest != null) ? + blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST; + List blacklistRemovals = + (blacklistRequest != null) ? + blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; + RMApp app = + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); + + // set label expression for Resource Requests if resourceName=ANY + ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); + for (ResourceRequest req : ask) { + if (null == req.getNodeLabelExpression() + && ResourceRequest.ANY.equals(req.getResourceName())) { + req.setNodeLabelExpression(asc.getNodeLabelExpression()); + } + } + + Resource maximumCapacity = rScheduler.getMaximumResourceCapability(); + + // sanity check + try { + RMServerUtils.normalizeAndValidateRequests(ask, + maximumCapacity, app.getQueue(), + rScheduler, rmContext); + } catch (InvalidResourceRequestException e) { + LOG.warn("Invalid resource ask by application " + appAttemptId, e); + throw e; + } + + try { + RMServerUtils.validateBlacklistRequest(blacklistRequest); + } catch (InvalidResourceBlacklistRequestException e) { + LOG.warn("Invalid blacklist request by application " + appAttemptId, e); + throw e; + } + + // In the case of work-preserving AM restart, it's possible for the + // AM to release containers from the earlier attempt. + if (!app.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + try { + RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); + } catch (InvalidContainerReleaseException e) { + LOG.warn("Invalid container release by application " + appAttemptId, + e); + throw e; + } + } + + // Split Update Resource Requests into increase and decrease. + // No Exceptions are thrown here. All update errors are aggregated + // and returned to the AM. + List increaseResourceReqs = new ArrayList<>(); + List decreaseResourceReqs = new ArrayList<>(); + List updateContainerErrors = + RMServerUtils.validateAndSplitUpdateResourceRequests( + rmContext, request, maximumCapacity, + increaseResourceReqs, decreaseResourceReqs); + + // Send new requests to appAttempt. + Allocation allocation; + RMAppAttemptState state = + app.getRMAppAttempt(appAttemptId).getAppAttemptState(); + if (state.equals(RMAppAttemptState.FINAL_SAVING) || + state.equals(RMAppAttemptState.FINISHING) || + app.isAppFinalStateStored()) { + LOG.warn(appAttemptId + " is in " + state + + " state, ignore container allocate request."); + allocation = EMPTY_ALLOCATION; + } else { + allocation = + this.rScheduler.allocate(appAttemptId, ask, release, + blacklistAdditions, blacklistRemovals, + increaseResourceReqs, decreaseResourceReqs); + } + + if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { + LOG.info("blacklist are updated in Scheduler." + + "blacklistAdditions: " + blacklistAdditions + ", " + + "blacklistRemovals: " + blacklistRemovals); + } + RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); + + if (allocation.getNMTokens() != null && + !allocation.getNMTokens().isEmpty()) { + allocateResponse.setNMTokens(allocation.getNMTokens()); + } + + // Notify the AM of container update errors + if (!updateContainerErrors.isEmpty()) { + allocateResponse.setUpdateErrors(updateContainerErrors); + } + // update the response with the deltas of node status changes + List updatedNodes = new ArrayList(); + if(app.pullRMNodeUpdates(updatedNodes) > 0) { + List updatedNodeReports = new ArrayList(); + for(RMNode rmNode: updatedNodes) { + SchedulerNodeReport schedulerNodeReport = + rScheduler.getNodeReport(rmNode.getNodeID()); + Resource used = BuilderUtils.newResource(0, 0); + int numContainers = 0; + if (schedulerNodeReport != null) { + used = schedulerNodeReport.getUsedResource(); + numContainers = schedulerNodeReport.getNumContainers(); + } + NodeId nodeId = rmNode.getNodeID(); + NodeReport report = + BuilderUtils.newNodeReport(nodeId, rmNode.getState(), + rmNode.getHttpAddress(), rmNode.getRackName(), used, + rmNode.getTotalCapability(), numContainers, + rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), + rmNode.getNodeLabels()); + + updatedNodeReports.add(report); + } + allocateResponse.setUpdatedNodes(updatedNodeReports); + } + + addToAllocatedContainers(allocateResponse, allocation.getContainers()); + + allocateResponse.setCompletedContainersStatuses(appAttempt + .pullJustFinishedContainers()); + allocateResponse.setAvailableResources(allocation.getResourceLimit()); + + // Handling increased containers + addToUpdatedContainers( + allocateResponse, ContainerUpdateType.INCREASE_RESOURCE, + allocation.getIncreasedContainers()); + + // Handling decreased containers + addToUpdatedContainers( + allocateResponse, ContainerUpdateType.DECREASE_RESOURCE, + allocation.getDecreasedContainers()); + + allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); + + // add collector address for this application + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + allocateResponse.setCollectorAddr( + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) + .getCollectorAddr()); + } + + // add preemption to the allocateResponse message (if any) + allocateResponse + .setPreemptionMessage(generatePreemptionMessage(allocation)); + + // Set application priority + allocateResponse.setApplicationPriority(app + .getApplicationPriority()); + } + + private void addToUpdatedContainers(AllocateResponse allocateResponse, + ContainerUpdateType updateType, List updatedContainers) { + ArrayList containersToSet = new ArrayList<>(); + if (allocateResponse.getUpdatedContainers() != null && + allocateResponse.getUpdatedContainers().size() > 0) { + containersToSet.addAll(allocateResponse.getUpdatedContainers()); + } + for (Container updatedContainer : updatedContainers) { + containersToSet.add( + UpdatedContainer.newInstance(updateType, updatedContainer)); + } + allocateResponse.setUpdatedContainers(containersToSet); + } + + protected void addToAllocatedContainers(AllocateResponse allocateResponse, + List allocatedContainers) { + if (allocateResponse.getAllocatedContainers() != null + && allocateResponse.getAllocatedContainers().size() > 0) { + allocatedContainers = new ArrayList<>(allocatedContainers); + allocatedContainers.addAll(allocateResponse.getAllocatedContainers()); + } + allocateResponse.setAllocatedContainers(allocatedContainers); + } + private PreemptionMessage generatePreemptionMessage(Allocation allocation){ PreemptionMessage pMsg = null; // assemble strict preemption request diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 7814b84..83409e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -218,8 +218,9 @@ public long generateContainerId() { } @Override - public AllocateResponse allocate(AllocateRequest request) throws - YarnException, IOException { + protected void allocate(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse allocateResponse) + throws YarnException { // Partition requests to GUARANTEED and OPPORTUNISTIC. OpportunisticContainerAllocator.PartitionedResourceRequests @@ -227,40 +228,30 @@ public AllocateResponse allocate(AllocateRequest request) throws oppContainerAllocator.partitionAskList(request.getAskList()); // Allocate OPPORTUNISTIC containers. - request.setAskList(partitionedAsks.getOpportunistic()); - final ApplicationAttemptId appAttemptId = getAppAttemptId(); - SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) - rmContext.getScheduler()).getApplicationAttempt(appAttemptId); + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler)rmContext.getScheduler()) + .getApplicationAttempt(appAttemptId); OpportunisticContainerContext oppCtx = appAttempt.getOpportunisticContainerContext(); oppCtx.updateNodeList(getLeastLoadedNodes()); List oppContainers = - oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx, - ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); + oppContainerAllocator.allocateContainers( + request.getResourceBlacklistRequest(), + partitionedAsks.getOpportunistic(), appAttemptId, oppCtx, + ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); // Create RMContainers and update the NMTokens. if (!oppContainers.isEmpty()) { handleNewContainers(oppContainers, false); appAttempt.updateNMTokens(oppContainers); + addToAllocatedContainers(allocateResponse, oppContainers); } // Allocate GUARANTEED containers. request.setAskList(partitionedAsks.getGuaranteed()); - AllocateResponse allocateResp = super.allocate(request); - - // Add allocated OPPORTUNISTIC containers to the AllocateResponse. - if (!oppContainers.isEmpty()) { - allocateResp.getAllocatedContainers().addAll(oppContainers); - } - - // Update opportunistic container context with the allocated GUARANTEED - // containers. - oppCtx.updateCompletedContainers(allocateResp); - - // Add all opportunistic containers - return allocateResp; + super.allocate(appAttemptId, request, allocateResponse); } @Override @@ -304,7 +295,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( } private void handleNewContainers(List allocContainers, - boolean isRemotelyAllocated) { + boolean isRemotelyAllocated) { for (Container container : allocContainers) { // Create RMContainer SchedulerApplicationAttempt appAttempt =