diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java index 200dea3..e4f7a82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java @@ -159,6 +159,17 @@ public int hashCode() { } @Override + public String toString() { + return "UpdateReq{" + + "containerId=" + getContainerId() + ", " + + "containerVersion=" + getContainerVersion() + ", " + + "targetExecType=" + getExecutionType() + ", " + + "targetCapability=" + getCapability() + ", " + + "updateType=" + getContainerUpdateType() + ", " + + "}"; + } + + @Override public boolean equals(Object obj) { if (this == obj) { return true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index c0d52a6..de3fc47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -282,8 +282,8 @@ public synchronized void setAllocatedContainers( final List containers) { if (containers == null) return; - // this looks like a bug because it results in append and not set initLocalNewContainerList(); + allocatedContainers.clear(); allocatedContainers.addAll(containers); } @@ -299,6 +299,7 @@ public synchronized void setUpdatedContainers( if (containers == null) return; initLocalUpdatedContainerList(); + updatedContainers.clear(); updatedContainers.addAll(containers); } @@ -315,6 +316,7 @@ public synchronized void setCompletedContainersStatuses( if (containers == null) return; initLocalFinishedContainerList(); + completedContainersStatuses.clear(); completedContainersStatuses.addAll(containers); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java index 2b76257..e403a12 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java @@ -87,4 +87,11 @@ public static RemoteNode newInstance(NodeId nodeId, String httpAddress) { public int compareTo(RemoteNode other) { return this.getNodeId().compareTo(other.getNodeId()); } + + @Override + public String toString() { + return "RemoteNode{" + + "nodeId=" + getNodeId() + ", " + + "httpAddress=" + getHttpAddress() + "}"; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 16436bd..2b9a336 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -192,7 +192,8 @@ public OpportunisticContainerAllocator( /** * Allocate OPPORTUNISTIC containers. - * @param request AllocateRequest + * @param blackList Resource BlackList Request + * @param oppResourceReqs Opportunistic Resource Requests * @param applicationAttemptId ApplicationAttemptId * @param opportContext App specific OpportunisticContainerContext * @param rmIdentifier RM Identifier @@ -200,32 +201,24 @@ public OpportunisticContainerAllocator( * @return List of Containers. * @throws YarnException YarnException */ - public List allocateContainers( - AllocateRequest request, ApplicationAttemptId applicationAttemptId, + public List allocateContainers(ResourceBlacklistRequest blackList, + List oppResourceReqs, + ApplicationAttemptId applicationAttemptId, OpportunisticContainerContext opportContext, long rmIdentifier, String appSubmitter) throws YarnException { - // Update released containers. - List releasedContainers = request.getReleaseList(); - int numReleasedContainers = releasedContainers.size(); - if (numReleasedContainers > 0) { - LOG.info("AttemptID: " + applicationAttemptId + " released: " - + numReleasedContainers); - opportContext.getContainersAllocated().removeAll(releasedContainers); - } // Update black list. - ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); - if (rbr != null) { - opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals()); - opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions()); + if (blackList != null) { + opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals()); + opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions()); } // Add OPPORTUNISTIC requests to the outstanding ones. - opportContext.addToOutstandingReqs(request.getAskList()); + opportContext.addToOutstandingReqs(oppResourceReqs); // Satisfy the outstanding OPPORTUNISTIC requests. List allocatedContainers = new ArrayList<>(); - for (Priority priority : + for (SchedulerRequestKey schedulerKey : opportContext.getOutstandingOpReqs().descendingKeySet()) { // Allocated containers : // Key = Requested Capability, @@ -234,7 +227,7 @@ public OpportunisticContainerAllocator( // we need the requested capability (key) to match against // the outstanding reqs) Map> allocated = allocate(rmIdentifier, - opportContext, priority, applicationAttemptId, appSubmitter); + opportContext, schedulerKey, applicationAttemptId, appSubmitter); for (Map.Entry> e : allocated.entrySet()) { opportContext.matchAllocationToOutstandingRequest( e.getKey(), e.getValue()); @@ -246,19 +239,22 @@ public OpportunisticContainerAllocator( } private Map> allocate(long rmIdentifier, - OpportunisticContainerContext appContext, Priority priority, + OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, ApplicationAttemptId appAttId, String userName) throws YarnException { Map> containers = new HashMap<>(); for (ResourceRequest anyAsk : - appContext.getOutstandingOpReqs().get(priority).values()) { + appContext.getOutstandingOpReqs().get(schedKey).values()) { allocateContainersInternal(rmIdentifier, appContext.getAppParams(), appContext.getContainerIdGenerator(), appContext.getBlacklist(), appAttId, appContext.getNodeMap(), userName, containers, anyAsk); - LOG.info("Opportunistic allocation requested for [" - + "priority=" + anyAsk.getPriority() - + ", num_containers=" + anyAsk.getNumContainers() - + ", capability=" + anyAsk.getCapability() + "]" - + " allocated = " + containers.get(anyAsk.getCapability()).size()); + if (!containers.isEmpty()) { + LOG.info("Opportunistic allocation requested for [" + + "priority=" + anyAsk.getPriority() + + ", allocationRequestId=" + anyAsk.getAllocationRequestId() + + ", num_containers=" + anyAsk.getNumContainers() + + ", capability=" + anyAsk.getCapability() + "]" + + " allocated = " + containers.keySet()); + } } return containers; } @@ -282,7 +278,9 @@ private void allocateContainersInternal(long rmIdentifier, nodesForScheduling.add(nodeEntry.getValue()); } if (nodesForScheduling.isEmpty()) { - LOG.warn("No nodes available for allocating opportunistic containers."); + LOG.warn("No nodes available for allocating opportunistic containers. [" + + "allNodes=" + allNodes + ", " + + "blacklist=" + blacklist + "]"); return; } int numAllocated = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 725e2d9..a2f9f4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -18,12 +18,7 @@ package org.apache.hadoop.yarn.server.scheduler; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; @@ -52,9 +47,6 @@ private static final Logger LOG = LoggerFactory .getLogger(OpportunisticContainerContext.class); - // Currently just used to keep track of allocated containers. - // Can be used for reporting stats later. - private Set containersAllocated = new HashSet<>(); private AllocationParams appParams = new AllocationParams(); private ContainerIdGenerator containerIdGenerator = @@ -69,13 +61,9 @@ // Resource Name (host/rack/any) and capability. This mapping is required // to match a received Container to an outstanding OPPORTUNISTIC // ResourceRequest (ask). - private final TreeMap> + private final TreeMap> outstandingOpReqs = new TreeMap<>(); - public Set getContainersAllocated() { - return containersAllocated; - } - public AllocationParams getAppParams() { return appParams; } @@ -119,20 +107,11 @@ public void updateAllocationParams(Resource minResource, Resource maxResource, return blacklist; } - public TreeMap> + public TreeMap> getOutstandingOpReqs() { return outstandingOpReqs; } - public void updateCompletedContainers(AllocateResponse allocateResponse) { - for (ContainerStatus cs : - allocateResponse.getCompletedContainersStatuses()) { - if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - containersAllocated.remove(cs.getContainerId()); - } - } - } - /** * Takes a list of ResourceRequests (asks), extracts the key information viz. * (Priority, ResourceName, Capability) and adds to the outstanding @@ -144,7 +123,7 @@ public void updateCompletedContainers(AllocateResponse allocateResponse) { */ public void addToOutstandingReqs(List resourceAsks) { for (ResourceRequest request : resourceAsks) { - Priority priority = request.getPriority(); + SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); // TODO: Extend for Node/Rack locality. We only handle ANY requests now if (!ResourceRequest.isAnyLocation(request.getResourceName())) { @@ -156,10 +135,10 @@ public void addToOutstandingReqs(List resourceAsks) { } Map reqMap = - outstandingOpReqs.get(priority); + outstandingOpReqs.get(schedulerKey); if (reqMap == null) { reqMap = new HashMap<>(); - outstandingOpReqs.put(priority, reqMap); + outstandingOpReqs.put(schedulerKey, reqMap); } ResourceRequest resourceRequest = reqMap.get(request.getCapability()); @@ -171,7 +150,8 @@ public void addToOutstandingReqs(List resourceAsks) { resourceRequest.getNumContainers() + request.getNumContainers()); } if (ResourceRequest.isAnyLocation(request.getResourceName())) { - LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority + LOG.info("# of outstandingOpReqs in ANY (at" + + "priority = "+ schedulerKey.getPriority() + ", with capability = " + request.getCapability() + " ) : " + resourceRequest.getNumContainers()); } @@ -187,9 +167,9 @@ public void addToOutstandingReqs(List resourceAsks) { public void matchAllocationToOutstandingRequest(Resource capability, List allocatedContainers) { for (Container c : allocatedContainers) { - containersAllocated.add(c.getId()); + SchedulerRequestKey schedulerKey = SchedulerRequestKey.extractFrom(c); Map asks = - outstandingOpReqs.get(c.getPriority()); + outstandingOpReqs.get(schedulerKey); if (asks == null) { continue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java similarity index 92% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java index 4b640ae..9b7edbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerRequestKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +package org.apache.hadoop.yarn.server.scheduler; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Priority; @@ -53,7 +53,7 @@ public static SchedulerRequestKey extractFrom(Container container) { container.getAllocationRequestId()); } - private SchedulerRequestKey(Priority priority, long allocationRequestId) { + SchedulerRequestKey(Priority priority, long allocationRequestId) { this.priority = priority; this.allocationRequestId = allocationRequestId; } @@ -119,4 +119,12 @@ public int hashCode() { getAllocationRequestId() >>> 32)); return result; } + + @Override + public String toString() { + return "SchedulerRequestKey{" + + "priority=" + priority + + ", allocationRequestId=" + allocationRequestId + + '}'; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java index 0f47c93..a9b5ed4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java @@ -227,10 +227,10 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( .partitionAskList(request.getAllocateRequest().getAskList()); // Allocate OPPORTUNISTIC containers. - request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic()); List allocatedContainers = containerAllocator.allocateContainers( - request.getAllocateRequest(), applicationAttemptId, + request.getAllocateRequest().getResourceBlacklistRequest(), + partitionedAsks.getOpportunistic(), applicationAttemptId, oppContainerContext, rmIdentifier, appSubmitter); // Prepare request for sending to RM for scheduling GUARANTEED containers. @@ -252,18 +252,11 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( nodeTokens.put(nmToken.getNodeId(), nmToken); } - oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse()); - // Check if we have NM tokens for all the allocated containers. If not // generate one and update the response. updateAllocateResponse( dsResp.getAllocateResponse(), nmTokens, allocatedContainers); - if (LOG.isDebugEnabled()) { - LOG.debug("Number of opportunistic containers currently" + - "allocated by application: " + oppContainerContext - .getContainersAllocated().size()); - } return dsResp; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 3d7b2b1..99d1a4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -405,7 +405,6 @@ public AllocateResponse allocate(AllocateRequest request) ApplicationAttemptId appAttemptId = amrmTokenIdentifier.getApplicationAttemptId(); - ApplicationId applicationId = appAttemptId.getApplicationId(); this.amLivelinessMonitor.receivedPing(appAttemptId); @@ -422,8 +421,10 @@ public AllocateResponse allocate(AllocateRequest request) AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { String message = - "AM is not registered for known application attempt: " + appAttemptId - + " or RM had restarted after AM registered . AM should re-register."; + "AM is not registered for known application attempt: " + + appAttemptId + + " or RM had restarted after AM registered. " + + " AM should re-register."; throw new ApplicationMasterNotRegisteredException(message); } @@ -438,185 +439,10 @@ public AllocateResponse allocate(AllocateRequest request) throw new InvalidApplicationMasterRequestException(message); } - //filter illegal progress values - float filteredProgress = request.getProgress(); - if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY - || filteredProgress < 0) { - request.setProgress(0); - } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) { - request.setProgress(1); - } - - // Send the status update to the appAttempt. - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptStatusupdateEvent(appAttemptId, request - .getProgress())); - - List ask = request.getAskList(); - List release = request.getReleaseList(); - - ResourceBlacklistRequest blacklistRequest = - request.getResourceBlacklistRequest(); - List blacklistAdditions = - (blacklistRequest != null) ? - blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST; - List blacklistRemovals = - (blacklistRequest != null) ? - blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; - RMApp app = - this.rmContext.getRMApps().get(applicationId); - - // set label expression for Resource Requests if resourceName=ANY - ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); - for (ResourceRequest req : ask) { - if (null == req.getNodeLabelExpression() - && ResourceRequest.ANY.equals(req.getResourceName())) { - req.setNodeLabelExpression(asc.getNodeLabelExpression()); - } - } - - Resource maximumCapacity = rScheduler.getMaximumResourceCapability(); - - // sanity check - try { - RMServerUtils.normalizeAndValidateRequests(ask, - maximumCapacity, app.getQueue(), - rScheduler, rmContext); - } catch (InvalidResourceRequestException e) { - LOG.warn("Invalid resource ask by application " + appAttemptId, e); - throw e; - } - - try { - RMServerUtils.validateBlacklistRequest(blacklistRequest); - } catch (InvalidResourceBlacklistRequestException e) { - LOG.warn("Invalid blacklist request by application " + appAttemptId, e); - throw e; - } - - // In the case of work-preserving AM restart, it's possible for the - // AM to release containers from the earlier attempt. - if (!app.getApplicationSubmissionContext() - .getKeepContainersAcrossApplicationAttempts()) { - try { - RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); - } catch (InvalidContainerReleaseException e) { - LOG.warn("Invalid container release by application " + appAttemptId, - e); - throw e; - } - } - - // Split Update Resource Requests into increase and decrease. - // No Exceptions are thrown here. All update errors are aggregated - // and returned to the AM. - List increaseResourceReqs = new ArrayList<>(); - List decreaseResourceReqs = new ArrayList<>(); - List updateContainerErrors = - RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext, - request, maximumCapacity, increaseResourceReqs, - decreaseResourceReqs); - - // Send new requests to appAttempt. - Allocation allocation; - RMAppAttemptState state = - app.getRMAppAttempt(appAttemptId).getAppAttemptState(); - if (state.equals(RMAppAttemptState.FINAL_SAVING) || - state.equals(RMAppAttemptState.FINISHING) || - app.isAppFinalStateStored()) { - LOG.warn(appAttemptId + " is in " + state + - " state, ignore container allocate request."); - allocation = EMPTY_ALLOCATION; - } else { - allocation = - this.rScheduler.allocate(appAttemptId, ask, release, - blacklistAdditions, blacklistRemovals, - increaseResourceReqs, decreaseResourceReqs); - } - - if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { - LOG.info("blacklist are updated in Scheduler." + - "blacklistAdditions: " + blacklistAdditions + ", " + - "blacklistRemovals: " + blacklistRemovals); - } - RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); - AllocateResponse allocateResponse = + AllocateResponse response = recordFactory.newRecordInstance(AllocateResponse.class); - if (allocation.getNMTokens() != null && - !allocation.getNMTokens().isEmpty()) { - allocateResponse.setNMTokens(allocation.getNMTokens()); - } - - // Notify the AM of container update errors - if (!updateContainerErrors.isEmpty()) { - allocateResponse.setUpdateErrors(updateContainerErrors); - } - // update the response with the deltas of node status changes - List updatedNodes = new ArrayList(); - if(app.pullRMNodeUpdates(updatedNodes) > 0) { - List updatedNodeReports = new ArrayList(); - for(RMNode rmNode: updatedNodes) { - SchedulerNodeReport schedulerNodeReport = - rScheduler.getNodeReport(rmNode.getNodeID()); - Resource used = BuilderUtils.newResource(0, 0); - int numContainers = 0; - if (schedulerNodeReport != null) { - used = schedulerNodeReport.getUsedResource(); - numContainers = schedulerNodeReport.getNumContainers(); - } - NodeId nodeId = rmNode.getNodeID(); - NodeReport report = - BuilderUtils.newNodeReport(nodeId, rmNode.getState(), - rmNode.getHttpAddress(), rmNode.getRackName(), used, - rmNode.getTotalCapability(), numContainers, - rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), - rmNode.getNodeLabels()); - - updatedNodeReports.add(report); - } - allocateResponse.setUpdatedNodes(updatedNodeReports); - } - - allocateResponse.setAllocatedContainers(allocation.getContainers()); - allocateResponse.setCompletedContainersStatuses(appAttempt - .pullJustFinishedContainers()); - allocateResponse.setResponseId(lastResponse.getResponseId() + 1); - allocateResponse.setAvailableResources(allocation.getResourceLimit()); - - // Handling increased/decreased containers - List updatedContainers = new ArrayList<>(); - if (allocation.getIncreasedContainers() != null) { - for (Container c : allocation.getIncreasedContainers()) { - updatedContainers.add( - UpdatedContainer.newInstance( - ContainerUpdateType.INCREASE_RESOURCE, c)); - } - } - if (allocation.getDecreasedContainers() != null) { - for (Container c : allocation.getDecreasedContainers()) { - updatedContainers.add( - UpdatedContainer.newInstance( - ContainerUpdateType.DECREASE_RESOURCE, c)); - } - } - - allocateResponse.setUpdatedContainers(updatedContainers); - - allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); - - // add collector address for this application - if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - allocateResponse.setCollectorAddr( - this.rmContext.getRMApps().get(applicationId).getCollectorAddr()); - } - - // add preemption to the allocateResponse message (if any) - allocateResponse - .setPreemptionMessage(generatePreemptionMessage(allocation)); - - // Set application priority - allocateResponse.setApplicationPriority(app - .getApplicationPriority()); + allocateInternal(amrmTokenIdentifier.getApplicationAttemptId(), + request, response); // update AMRMToken if the token is rolled-up MasterKeyData nextMasterKey = @@ -624,21 +450,24 @@ public AllocateResponse allocate(AllocateRequest request) if (nextMasterKey != null && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier - .getKeyId()) { + .getKeyId()) { + RMApp app = + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); + RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); RMAppAttemptImpl appAttemptImpl = (RMAppAttemptImpl)appAttempt; Token amrmToken = appAttempt.getAMRMToken(); if (nextMasterKey.getMasterKey().getKeyId() != appAttemptImpl.getAMRMTokenKeyId()) { LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back" - + " to application: " + applicationId); + + " to application: " + appAttemptId.getApplicationId()); amrmToken = rmContext.getAMRMTokenSecretManager() .createAndGetAMRMToken(appAttemptId); appAttemptImpl.setAMRMToken(amrmToken); } - allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token - .newInstance(amrmToken.getIdentifier(), amrmToken.getKind() - .toString(), amrmToken.getPassword(), amrmToken.getService() - .toString())); + response.setAMRMToken(org.apache.hadoop.yarn.api.records.Token + .newInstance(amrmToken.getIdentifier(), amrmToken.getKind() + .toString(), amrmToken.getPassword(), amrmToken.getService() + .toString())); } /* @@ -646,11 +475,225 @@ public AllocateResponse allocate(AllocateRequest request) * need to worry about unregister call occurring in between (which * removes the lock object). */ - lock.setAllocateResponse(allocateResponse); - return allocateResponse; + response.setResponseId(lastResponse.getResponseId() + 1); + lock.setAllocateResponse(response); + return response; } } + protected void allocateInternal(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse allocateResponse) + throws YarnException { + + //filter illegal progress values + float filteredProgress = request.getProgress(); + if (Float.isNaN(filteredProgress) || filteredProgress == Float.NEGATIVE_INFINITY + || filteredProgress < 0) { + request.setProgress(0); + } else if (filteredProgress > 1 || filteredProgress == Float.POSITIVE_INFINITY) { + request.setProgress(1); + } + + // Send the status update to the appAttempt. + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptStatusupdateEvent(appAttemptId, request + .getProgress())); + + List ask = request.getAskList(); + List release = request.getReleaseList(); + + ResourceBlacklistRequest blacklistRequest = + request.getResourceBlacklistRequest(); + List blacklistAdditions = + (blacklistRequest != null) ? + blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST; + List blacklistRemovals = + (blacklistRequest != null) ? + blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST; + RMApp app = + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); + + // set label expression for Resource Requests if resourceName=ANY + ApplicationSubmissionContext asc = app.getApplicationSubmissionContext(); + for (ResourceRequest req : ask) { + if (null == req.getNodeLabelExpression() + && ResourceRequest.ANY.equals(req.getResourceName())) { + req.setNodeLabelExpression(asc.getNodeLabelExpression()); + } + } + + Resource maximumCapacity = rScheduler.getMaximumResourceCapability(); + + // sanity check + try { + RMServerUtils.normalizeAndValidateRequests(ask, + maximumCapacity, app.getQueue(), + rScheduler, rmContext); + } catch (InvalidResourceRequestException e) { + LOG.warn("Invalid resource ask by application " + appAttemptId, e); + throw e; + } + + try { + RMServerUtils.validateBlacklistRequest(blacklistRequest); + } catch (InvalidResourceBlacklistRequestException e) { + LOG.warn("Invalid blacklist request by application " + appAttemptId, e); + throw e; + } + + // In the case of work-preserving AM restart, it's possible for the + // AM to release containers from the earlier attempt. + if (!app.getApplicationSubmissionContext() + .getKeepContainersAcrossApplicationAttempts()) { + try { + RMServerUtils.validateContainerReleaseRequest(release, appAttemptId); + } catch (InvalidContainerReleaseException e) { + LOG.warn("Invalid container release by application " + appAttemptId, + e); + throw e; + } + } + + // Split Update Resource Requests into increase and decrease. + // No Exceptions are thrown here. All update errors are aggregated + // and returned to the AM. + List increaseResourceReqs = new ArrayList<>(); + List decreaseResourceReqs = new ArrayList<>(); + List updateContainerErrors = + RMServerUtils.validateAndSplitUpdateResourceRequests( + rmContext, request, maximumCapacity, + increaseResourceReqs, decreaseResourceReqs); + + // Send new requests to appAttempt. + Allocation allocation; + RMAppAttemptState state = + app.getRMAppAttempt(appAttemptId).getAppAttemptState(); + if (state.equals(RMAppAttemptState.FINAL_SAVING) || + state.equals(RMAppAttemptState.FINISHING) || + app.isAppFinalStateStored()) { + LOG.warn(appAttemptId + " is in " + state + + " state, ignore container allocate request."); + allocation = EMPTY_ALLOCATION; + } else { + allocation = + this.rScheduler.allocate(appAttemptId, ask, release, + blacklistAdditions, blacklistRemovals, + increaseResourceReqs, decreaseResourceReqs); + } + + if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { + LOG.info("blacklist are updated in Scheduler." + + "blacklistAdditions: " + blacklistAdditions + ", " + + "blacklistRemovals: " + blacklistRemovals); + } + RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); + + if (allocation.getNMTokens() != null && + !allocation.getNMTokens().isEmpty()) { + allocateResponse.setNMTokens(allocation.getNMTokens()); + } + + // Notify the AM of container update errors + addToUpdateContainerErrors(allocateResponse, updateContainerErrors); + + // update the response with the deltas of node status changes + List updatedNodes = new ArrayList(); + if(app.pullRMNodeUpdates(updatedNodes) > 0) { + List updatedNodeReports = new ArrayList(); + for(RMNode rmNode: updatedNodes) { + SchedulerNodeReport schedulerNodeReport = + rScheduler.getNodeReport(rmNode.getNodeID()); + Resource used = BuilderUtils.newResource(0, 0); + int numContainers = 0; + if (schedulerNodeReport != null) { + used = schedulerNodeReport.getUsedResource(); + numContainers = schedulerNodeReport.getNumContainers(); + } + NodeId nodeId = rmNode.getNodeID(); + NodeReport report = + BuilderUtils.newNodeReport(nodeId, rmNode.getState(), + rmNode.getHttpAddress(), rmNode.getRackName(), used, + rmNode.getTotalCapability(), numContainers, + rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), + rmNode.getNodeLabels()); + + updatedNodeReports.add(report); + } + allocateResponse.setUpdatedNodes(updatedNodeReports); + } + + addToAllocatedContainers(allocateResponse, allocation.getContainers()); + + allocateResponse.setCompletedContainersStatuses(appAttempt + .pullJustFinishedContainers()); + allocateResponse.setAvailableResources(allocation.getResourceLimit()); + + // Handling increased containers + addToUpdatedContainers( + allocateResponse, ContainerUpdateType.INCREASE_RESOURCE, + allocation.getIncreasedContainers()); + + // Handling decreased containers + addToUpdatedContainers( + allocateResponse, ContainerUpdateType.DECREASE_RESOURCE, + allocation.getDecreasedContainers()); + + allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); + + // add collector address for this application + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + allocateResponse.setCollectorAddr( + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) + .getCollectorAddr()); + } + + // add preemption to the allocateResponse message (if any) + allocateResponse + .setPreemptionMessage(generatePreemptionMessage(allocation)); + + // Set application priority + allocateResponse.setApplicationPriority(app + .getApplicationPriority()); + } + + protected void addToUpdateContainerErrors(AllocateResponse allocateResponse, + List updateContainerErrors) { + if (!updateContainerErrors.isEmpty()) { + if (allocateResponse.getUpdateErrors() != null + && !allocateResponse.getUpdateErrors().isEmpty()) { + updateContainerErrors = new ArrayList<>(updateContainerErrors); + updateContainerErrors.addAll(allocateResponse.getUpdateErrors()); + } + allocateResponse.setUpdateErrors(updateContainerErrors); + } + } + + protected void addToUpdatedContainers(AllocateResponse allocateResponse, + ContainerUpdateType updateType, List updatedContainers) { + if (updatedContainers != null && updatedContainers.size() > 0) { + ArrayList containersToSet = new ArrayList<>(); + if (allocateResponse.getUpdatedContainers() != null && + !allocateResponse.getUpdatedContainers().isEmpty()) { + containersToSet.addAll(allocateResponse.getUpdatedContainers()); + } + for (Container updatedContainer : updatedContainers) { + containersToSet.add( + UpdatedContainer.newInstance(updateType, updatedContainer)); + } + allocateResponse.setUpdatedContainers(containersToSet); + } + } + + protected void addToAllocatedContainers(AllocateResponse allocateResponse, + List allocatedContainers) { + if (allocateResponse.getAllocatedContainers() != null + && !allocateResponse.getAllocatedContainers().isEmpty()) { + allocatedContainers = new ArrayList<>(allocatedContainers); + allocatedContainers.addAll(allocateResponse.getAllocatedContainers()); + } + allocateResponse.setAllocatedContainers(allocatedContainers); + } + private PreemptionMessage generatePreemptionMessage(Allocation allocation){ PreemptionMessage pMsg = null; // assemble strict preemption request diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index a527d04..9d4c092 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -101,8 +101,8 @@ private final int k; private final long cacheRefreshInterval; - private List cachedNodes; - private long lastCacheUpdateTime; + private volatile List cachedNodes; + private volatile long lastCacheUpdateTime; public OpportunisticContainerAllocatorAMService(RMContext rmContext, YarnScheduler scheduler) { @@ -218,8 +218,9 @@ public long generateContainerId() { } @Override - public AllocateResponse allocate(AllocateRequest request) throws - YarnException, IOException { + protected void allocateInternal(ApplicationAttemptId appAttemptId, + AllocateRequest request, AllocateResponse allocateResponse) + throws YarnException { // Partition requests to GUARANTEED and OPPORTUNISTIC. OpportunisticContainerAllocator.PartitionedResourceRequests @@ -227,40 +228,30 @@ public AllocateResponse allocate(AllocateRequest request) throws oppContainerAllocator.partitionAskList(request.getAskList()); // Allocate OPPORTUNISTIC containers. - request.setAskList(partitionedAsks.getOpportunistic()); - final ApplicationAttemptId appAttemptId = getAppAttemptId(); - SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler) - rmContext.getScheduler()).getApplicationAttempt(appAttemptId); + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler)rmContext.getScheduler()) + .getApplicationAttempt(appAttemptId); OpportunisticContainerContext oppCtx = appAttempt.getOpportunisticContainerContext(); oppCtx.updateNodeList(getLeastLoadedNodes()); List oppContainers = - oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx, - ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); + oppContainerAllocator.allocateContainers( + request.getResourceBlacklistRequest(), + partitionedAsks.getOpportunistic(), appAttemptId, oppCtx, + ResourceManager.getClusterTimeStamp(), appAttempt.getUser()); // Create RMContainers and update the NMTokens. if (!oppContainers.isEmpty()) { handleNewContainers(oppContainers, false); appAttempt.updateNMTokens(oppContainers); + addToAllocatedContainers(allocateResponse, oppContainers); } // Allocate GUARANTEED containers. request.setAskList(partitionedAsks.getGuaranteed()); - AllocateResponse allocateResp = super.allocate(request); - - // Add allocated OPPORTUNISTIC containers to the AllocateResponse. - if (!oppContainers.isEmpty()) { - allocateResp.getAllocatedContainers().addAll(oppContainers); - } - - // Update opportunistic container context with the allocated GUARANTEED - // containers. - oppCtx.updateCompletedContainers(allocateResp); - - // Add all opportunistic containers - return allocateResp; + super.allocateInternal(appAttemptId, request, allocateResponse); } @Override @@ -304,7 +295,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( } private void handleNewContainers(List allocContainers, - boolean isRemotelyAllocated) { + boolean isRemotelyAllocated) { for (Container container : allocContainers) { // Create RMContainer SchedulerApplicationAttempt appAttempt = @@ -387,10 +378,12 @@ public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { private synchronized List getLeastLoadedNodes() { long currTime = System.currentTimeMillis(); if ((currTime - lastCacheUpdateTime > cacheRefreshInterval) - || cachedNodes == null) { + || (cachedNodes == null)) { cachedNodes = convertToRemoteNodes( this.nodeMonitor.selectLeastLoadedNodes(this.k)); - lastCacheUpdateTime = currTime; + if (cachedNodes.size() > 0) { + lastCacheUpdateTime = currTime; + } } return cachedNodes; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java index a244ad8..020764b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java @@ -25,14 +25,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 0cfa1d3..dbc6169 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode .RMNodeDecreaseContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java index d7d1e94..80e7c0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerReservedEvent.java @@ -21,7 +21,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; /** * The event signifying that a container has been reserved. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index feb20ee..30f7ef9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -965,6 +967,7 @@ public void recoverContainer(RMContainer rmContainer) { public ResourceRequest cloneResourceRequest(ResourceRequest request) { ResourceRequest newRequest = ResourceRequest.newBuilder() .priority(request.getPriority()) + .allocationRequestId(request.getAllocationRequestId()) .resourceName(request.getResourceName()) .capability(request.getCapability()) .numContainers(1) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index b3ef471..e94d800 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -51,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -73,12 +72,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 6744c2e..759db05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.ImmutableSet; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1c6471f..93c0693 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -2204,7 +2205,8 @@ public void collectSchedulerApplications( @Override public void attachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { - if (application != null) { + if (application != null && rmContainer != null + && rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, application, rmContainer.getContainer() @@ -2222,7 +2224,8 @@ public void attachContainer(Resource clusterResource, @Override public void detachContainer(Resource clusterResource, FiCaSchedulerApp application, RMContainer rmContainer) { - if (application != null) { + if (application != null && rmContainer != null + && rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); releaseResource(clusterResource, application, rmContainer.getContainer() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java index 74a64c1..0dc527f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 3e8282f..c12bc6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java index 8b4907b..159fb09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java @@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; /** * Contexts for a container inside scheduler diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 498f34f..b14bc20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; @@ -69,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index d79fcaf..344daf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index dec55ca..5069e39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -61,9 +61,9 @@ @Override public int compare(ClusterNode o1, ClusterNode o2) { if (getMetric(o1) == getMetric(o2)) { - return o1.timestamp < o2.timestamp ? +1 : -1; + return (int)(o2.timestamp - o1.timestamp); } - return getMetric(o1) > getMetric(o2) ? +1 : -1; + return getMetric(o1) - getMetric(o2); } public int getMetric(ClusterNode c) { @@ -115,8 +115,13 @@ public void run() { ReentrantReadWriteLock.WriteLock writeLock = sortedNodesLock.writeLock(); writeLock.lock(); try { - sortedNodes.clear(); - sortedNodes.addAll(sortNodes()); + try { + List nodeIds = sortNodes(); + sortedNodes.clear(); + sortedNodes.addAll(nodeIds); + } catch (Exception ex) { + LOG.warn("Got Exception while sorting nodes..", ex); + } if (thresholdCalculator != null) { thresholdCalculator.update(); } @@ -273,7 +278,7 @@ public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { List retVal = ((k < this.sortedNodes.size()) && (k >= 0)) ? new ArrayList<>(this.sortedNodes).subList(0, k) : new ArrayList<>(this.sortedNodes); - return Collections.unmodifiableList(retVal); + return retVal; } finally { readLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 39f4a3d..a9591a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index a27a222..d983ea0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import java.util.Collection; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java index d275bfd..e60f70e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java @@ -33,10 +33,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + import java.util.List; public class FifoAppAttempt extends FiCaSchedulerApp { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b5122c0..81c1f30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -77,7 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -709,7 +709,7 @@ private int assignContainer(FiCaSchedulerNode node, FifoAppAttempt application, // Inform the application RMContainer rmContainer = application.allocate(type, node, schedulerKey, request, container); - + // Inform the node node.allocateContainer(rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java index f87f764..86843b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SchedulingPlacementSet.java @@ -21,7 +21,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import java.util.Iterator; import java.util.List; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index e70c3e0..3288d39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -61,7 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java index 35218bd..31b372e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Task.java @@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .TestUtils; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 8663315..83ff52b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -36,8 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 76d93ee..ad1490a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -36,8 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index 7f9c719..468e760 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.junit.Assert; import org.junit.Test; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java index 3cb668c..ff5dc02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.junit.After; import org.junit.Test; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 3a88cff..27860a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -120,7 +120,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 2ce5fcb..c0fa43d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 3e05456..c9eb8b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 93360b2..4bc5127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 5964d2f..4cc99b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java index 61c5743..46187d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java @@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .TestUtils;