diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index ce85b21..320487a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -58,7 +58,7 @@ */ @Public @Stable -public abstract class ResourceRequest extends AbstractResourceRequest +public abstract class ResourceRequest extends SchedulerResourceRequest implements Comparable{ @Public diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AbstractResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerResourceRequest.java similarity index 76% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AbstractResourceRequest.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerResourceRequest.java index 819a607..0a49881 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AbstractResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulerResourceRequest.java @@ -19,11 +19,10 @@ package org.apache.hadoop.yarn.api.records; import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; /** - * {@code AbstractResourceRequest} represents a generic resource request made + * {@code SchedulerResourceRequest} represents a generic resource request made * by an application to the {@code ResourceManager}. *

* It includes: @@ -35,14 +34,17 @@ */ @Public @Unstable -public abstract class AbstractResourceRequest { +public abstract class SchedulerResourceRequest { + + + protected ContainerId containerToUpdate = null; /** * Set the Resource capability of the request * @param capability Resource capability of the request */ @Public - @Stable + @Unstable public abstract void setCapability(Resource capability); /** @@ -50,6 +52,18 @@ * @return Resource capability of the request */ @Public - @Stable + @Unstable public abstract Resource getCapability(); + + public abstract Priority getPriority(); + + public abstract long getAllocationRequestId(); + + public ContainerId getContainerToUpdate() { + return containerToUpdate; + } + + public void setContainerToUpdate(ContainerId containerToUpdate) { + this.containerToUpdate = containerToUpdate; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java index 7102f7b..e7458cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerError.java @@ -87,6 +87,12 @@ public int hashCode() { } @Override + public String toString() { + return "UpdateContainerError{reason=" + getReason() + ", " + + "req=" + getUpdateContainerRequest() + "}"; + } + + @Override public boolean equals(Object obj) { if (this == obj) { return true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java index e4f7a82..28df3f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateContainerRequest.java @@ -62,7 +62,10 @@ */ @InterfaceAudience.Public @InterfaceStability.Unstable -public abstract class UpdateContainerRequest extends AbstractResourceRequest { +public abstract class UpdateContainerRequest extends SchedulerResourceRequest { + + private Priority priority = Priority.UNDEFINED; + private long allocationRequestId = -1; @InterfaceAudience.Public @InterfaceStability.Unstable @@ -144,17 +147,37 @@ public static UpdateContainerRequest newInstance(int version, public abstract void setExecutionType(ExecutionType executionType); @Override + public Priority getPriority() { + return priority; + } + + public void setPriority(Priority priority) { + this.priority = priority; + } + + @Override + public long getAllocationRequestId() { + return allocationRequestId; + } + + public void setAllocationRequestId(long allocationRequestId) { + this.allocationRequestId = allocationRequestId; + } + + @Override public int hashCode() { final int prime = 2153; int result = 2459; ContainerId cId = getContainerId(); ExecutionType execType = getExecutionType(); Resource capability = getCapability(); + ContainerUpdateType updateType = getContainerUpdateType(); result = prime * result + ((capability == null) ? 0 : capability.hashCode()); result = prime * result + ((cId == null) ? 0 : cId.hashCode()); result = prime * result + getContainerVersion(); result = prime * result + ((execType == null) ? 0 : execType.hashCode()); + result = prime * result + ((updateType== null) ? 0 : updateType.hashCode()); return result; } @@ -208,6 +231,14 @@ public boolean equals(Object obj) { } else if (!execType.equals(other.getExecutionType())) { return false; } + ContainerUpdateType updateType = getContainerUpdateType(); + if (updateType == null) { + if (other.getContainerUpdateType() != null) { + return false; + } + } else if (!updateType.equals(other.getContainerUpdateType())) { + return false; + } return true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java index 2d77671..3c1fcbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java @@ -312,13 +312,21 @@ private Container buildContainer(long rmIdentifier, // before accepting an ask) Resource capability = normalizeCapability(appParams, rr); + return createContainer( + rmIdentifier, appParams.getContainerTokenExpiryInterval(), + SchedulerRequestKey.create(rr), userName, node, cId, capability); + } + + private Container createContainer(long rmIdentifier, long tokenExpiry, + SchedulerRequestKey schedulerKey, String userName, RemoteNode node, + ContainerId cId, Resource capability) { long currTime = System.currentTimeMillis(); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier( cId, 0, node.getNodeId().toString(), userName, - capability, currTime + appParams.containerTokenExpiryInterval, + capability, currTime + tokenExpiry, tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier, - rr.getPriority(), currTime, + schedulerKey.getPriority(), currTime, null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, ExecutionType.OPPORTUNISTIC); byte[] pwd = @@ -327,9 +335,9 @@ private Container buildContainer(long rmIdentifier, containerTokenIdentifier); Container container = BuilderUtils.newContainer( cId, node.getNodeId(), node.getHttpAddress(), - capability, rr.getPriority(), containerToken, + capability, schedulerKey.getPriority(), containerToken, containerTokenIdentifier.getExecutionType(), - rr.getAllocationRequestId()); + schedulerKey.getAllocationRequestId()); return container; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java index 875e166..bea5ecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java @@ -18,14 +18,16 @@ package org.apache.hadoop.yarn.server.scheduler; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulerResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +41,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.AllocationParams; import static org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator.ContainerIdGenerator; @@ -52,6 +55,11 @@ private static final Logger LOG = LoggerFactory .getLogger(OpportunisticContainerContext.class); + public static ContainerId UNDEFINED = + ContainerId.newContainerId( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(-1, -1), -1), -1); + private AllocationParams appParams = new AllocationParams(); private ContainerIdGenerator containerIdGenerator = @@ -69,6 +77,14 @@ private final TreeMap> outstandingOpReqs = new TreeMap<>(); + // Keep track of containers that are undergoing promotion + private final Map>>> outstandingPromotions = new HashMap<>(); + + private final Set outstandingDemotions = new HashSet<>(); + + private AtomicLong promotionCounter = new AtomicLong(-1); + public AllocationParams getAppParams() { return appParams; } @@ -117,6 +133,111 @@ public void updateAllocationParams(Resource minResource, Resource maxResource, return outstandingOpReqs; } + public synchronized boolean isBeingPromoted(Container container) { + Map>> resourceMap = + outstandingPromotions.get( + new SchedulerRequestKey(container.getPriority(), + container.getAllocationRequestId(), container.getId())); + if (resourceMap != null) { + Map> locationMap = + resourceMap.get(container.getResource()); + if (locationMap != null) { + Set containerIds = locationMap.get(container.getNodeId()); + if (containerIds != null && !containerIds.isEmpty()) { + return containerIds.contains(container.getId()); + } + } + } + return false; + } + + public synchronized boolean checkAndAddToOutstandingDemotions( + Container container) { + if (isBeingPromoted(container) + || outstandingDemotions.contains(container.getId())) { + return false; + } + outstandingDemotions.add(container.getId()); + return true; + } + + public SchedulerRequestKey checkAndAddToOutstandingPromotions( + Container container, SchedulerResourceRequest updateRequest) { + SchedulerRequestKey schedulerKey = + SchedulerRequestKey.create(updateRequest); + Map>> resourceMap = + outstandingPromotions.get(SchedulerRequestKey.create(updateRequest)); + if (resourceMap == null) { + resourceMap = new HashMap<>(); + outstandingPromotions.put(schedulerKey, resourceMap); + } + Map> locationMap = + resourceMap.get(container.getResource()); + if (locationMap == null) { + locationMap = new HashMap<>(); + resourceMap.put(container.getResource(), locationMap); + } + Set containerIds = locationMap.get(container.getNodeId()); + if (containerIds == null) { + containerIds = new HashSet<>(); + locationMap.put(container.getNodeId(), containerIds); + } + if (containerIds.contains(container.getId()) + || outstandingDemotions.contains(container.getId())) { + return null; + } + containerIds.add(container.getId()); + return schedulerKey; + } + + public void completeContainerExecTypeUpdate(SchedulerRequestKey schedulerKey, + Container container) { + Map>> resourceMap = + outstandingPromotions.get(schedulerKey); + if (resourceMap != null) { + Map> locationMap = + resourceMap.get(container.getResource()); + if (locationMap != null) { + Set containerIds = locationMap.get(container.getNodeId()); + if (containerIds != null && !containerIds.isEmpty()) { + containerIds.remove(container.getId()); + if (containerIds.isEmpty()) { + locationMap.remove(container.getNodeId()); + } + } + if (locationMap.isEmpty()) { + resourceMap.remove(container.getResource()); + } + } + if (resourceMap.isEmpty()) { + outstandingPromotions.remove(schedulerKey); + } + } + outstandingDemotions.remove(container.getId()); + } + + public ContainerId matchContainerToOutstandingPromotionReq( + SchedulerRequestKey schedulerKey, Container container) { + ContainerId retVal = null; + Map>> resourceMap = + outstandingPromotions.get(schedulerKey); + if (resourceMap != null) { + Map> locationMap = + resourceMap.get(container.getResource()); + if (locationMap != null) { + Set containerIds = locationMap.get(container.getNodeId()); + if (containerIds != null && !containerIds.isEmpty()) { + retVal = containerIds.iterator().next(); + } + } + } + // Allocation happened on NM on the same host, but not on the NM + // we need.. We need to signal that this container has to be released. + if (resourceMap != null && retVal == null) { + return UNDEFINED; + } + return retVal; + } /** * Takes a list of ResourceRequests (asks), extracts the key information viz. * (Priority, ResourceName, Capability) and adds to the outstanding @@ -155,8 +276,9 @@ public void addToOutstandingReqs(List resourceAsks) { resourceRequest.getNumContainers() + request.getNumContainers()); } if (ResourceRequest.isAnyLocation(request.getResourceName())) { - LOG.info("# of outstandingOpReqs in ANY (at" + - "priority = "+ schedulerKey.getPriority() + LOG.info("# of outstandingOpReqs in ANY (at " + + "priority = " + schedulerKey.getPriority() + + ", allocationReqId = " + schedulerKey.getAllocationRequestId() + ", with capability = " + request.getCapability() + " ) : " + resourceRequest.getNumContainers()); } @@ -172,7 +294,8 @@ public void addToOutstandingReqs(List resourceAsks) { public void matchAllocationToOutstandingRequest(Resource capability, List allocatedContainers) { for (Container c : allocatedContainers) { - SchedulerRequestKey schedulerKey = SchedulerRequestKey.extractFrom(c); + SchedulerRequestKey schedulerKey = + SchedulerRequestKey.extractFrom(c); Map asks = outstandingOpReqs.get(schedulerKey); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java index 9b7edbe..1ce8378 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.scheduler; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulerResourceRequest; /** * Composite key for outstanding scheduler requests for any schedulable entity. @@ -31,15 +33,16 @@ private final Priority priority; private final long allocationRequestId; + private final ContainerId containerToUpdate; /** * Factory method to generate a SchedulerRequestKey from a ResourceRequest. * @param req ResourceRequest * @return SchedulerRequestKey */ - public static SchedulerRequestKey create(ResourceRequest req) { + public static SchedulerRequestKey create(SchedulerResourceRequest req) { return new SchedulerRequestKey(req.getPriority(), - req.getAllocationRequestId()); + req.getAllocationRequestId(), req.getContainerToUpdate()); } /** @@ -50,12 +53,16 @@ public static SchedulerRequestKey create(ResourceRequest req) { */ public static SchedulerRequestKey extractFrom(Container container) { return new SchedulerRequestKey(container.getPriority(), - container.getAllocationRequestId()); + container.getAllocationRequestId(), null); } - SchedulerRequestKey(Priority priority, long allocationRequestId) { + + + SchedulerRequestKey(Priority priority, long allocationRequestId, + ContainerId containerToUpdate) { this.priority = priority; this.allocationRequestId = allocationRequestId; + this.containerToUpdate = containerToUpdate; } /** @@ -76,6 +83,10 @@ public long getAllocationRequestId() { return allocationRequestId; } + public ContainerId getContainerToUpdate() { + return containerToUpdate; + } + @Override public int compareTo(SchedulerRequestKey o) { if (o == null) { @@ -85,6 +96,15 @@ public int compareTo(SchedulerRequestKey o) { return 1; } } + + // Ensure updates are ranked higher + if (this.containerToUpdate == null && o.containerToUpdate != null) { + return -1; + } + if (this.containerToUpdate != null && o.containerToUpdate == null) { + return 1; + } + int priorityCompare = o.getPriority().compareTo(priority); // we first sort by priority and then by allocationRequestId if (priorityCompare != 0) { @@ -107,9 +127,12 @@ public boolean equals(Object o) { if (getAllocationRequestId() != that.getAllocationRequestId()) { return false; } - return getPriority() != null ? - getPriority().equals(that.getPriority()) : - that.getPriority() == null; + if (!getPriority().equals(that.getPriority())) { + return false; + } + return containerToUpdate != null ? + containerToUpdate.equals(that.containerToUpdate) : + that.containerToUpdate == null; } @Override @@ -117,6 +140,10 @@ public int hashCode() { int result = getPriority() != null ? getPriority().hashCode() : 0; result = 31 * result + (int) (getAllocationRequestId() ^ ( getAllocationRequestId() >>> 32)); + if (containerToUpdate != null) { + result = 31 * result + (int) (containerToUpdate.hashCode() ^ ( + containerToUpdate.hashCode() >>> 32)); + } return result; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index 68115e2..f09e844 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -26,7 +26,14 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; @@ -69,9 +76,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; - import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import java.io.IOException; @@ -79,6 +87,9 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils + .RECORD_FACTORY; + /** * The OpportunisticContainerAllocatorAMService is started instead of the * ApplicationMasterService if opportunistic scheduling is enabled for the YARN @@ -249,11 +260,168 @@ protected void allocate(ApplicationAttemptId appAttemptId, addToAllocatedContainers(allocateResponse, oppContainers); } + handleExecutionTypeUpdates(appAttempt, request, allocateResponse, + oppCtx, partitionedAsks.getGuaranteed()); + + + List promotedContainers = + appAttempt.pullContainersWithUpdatedExecType(); + addToUpdatedContainers(allocateResponse, + ContainerUpdateType.UPDATE_EXECUTION_TYPE, promotedContainers); + // Allocate GUARANTEED containers. request.setAskList(partitionedAsks.getGuaranteed()); + super.allocate(appAttemptId, request, allocateResponse); } + private void handleExecutionTypeUpdates( + SchedulerApplicationAttempt appAttempt, AllocateRequest request, + AllocateResponse allocateResponse, OpportunisticContainerContext oppCntxt, + List existingGuaranteedReqs) { + List promotionRequests = new ArrayList<>(); + List demotionRequests = new ArrayList<>(); + + List updateContainerErrors = RMServerUtils + .validateAndSplitUpdateExecutionTypeRequests(rmContext, + request, promotionRequests, demotionRequests); + + if (!promotionRequests.isEmpty()) { + LOG.info("Promotion Update requests : " + promotionRequests); + } + if (!demotionRequests.isEmpty()) { + LOG.info("Demotion Update requests : " + demotionRequests); + } + + List resourceReqsForPromotion = + createResourceReqsForPromotion(oppCntxt, promotionRequests, + updateContainerErrors); + + if (!resourceReqsForPromotion.isEmpty() && LOG.isDebugEnabled()) { + LOG.debug("Generated Resource Requests for promotion : " + + resourceReqsForPromotion); + } + + handleDemotionRequests(appAttempt, demotionRequests, updateContainerErrors); + addToUpdateContainerErrors(allocateResponse, updateContainerErrors); + existingGuaranteedReqs.addAll(resourceReqsForPromotion); + } + + private void handleDemotionRequests(SchedulerApplicationAttempt appAttempt, + List demotionRequests, + List updateContainerErrors) { + OpportunisticContainerContext oppCntxt = + appAttempt.getOpportunisticContainerContext(); + for (UpdateContainerRequest uReq : demotionRequests) { + RMContainer rmContainer = + rmContext.getScheduler().getRMContainer(uReq.getContainerId()); + if (rmContainer != null) { + if (oppCntxt.checkAndAddToOutstandingDemotions( + rmContainer.getContainer())) { + RMContainer demotedRMContainer = createDemotedRMContainer + (appAttempt, oppCntxt, rmContainer); + appAttempt.addToNewlyDemotedContainers( + uReq.getContainerId(), demotedRMContainer); + } else { + updateContainerErrors.add(UpdateContainerError.newInstance( + RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq)); + } + } else { + LOG.warn("Cannot demote non-existent (or completed) Container [" + + uReq.getContainerId() + "]"); + } + } + } + + private RMContainer createDemotedRMContainer( + SchedulerApplicationAttempt appAttempt, + OpportunisticContainerContext oppCntxt, + RMContainer rmContainer) { + SchedulerRequestKey sk = + SchedulerRequestKey.extractFrom(rmContainer.getContainer()); + Container demotedContainer = BuilderUtils.newContainer( + ContainerId.newContainerId(appAttempt.getApplicationAttemptId(), + oppCntxt.getContainerIdGenerator().generateContainerId()), + rmContainer.getContainer().getNodeId(), + rmContainer.getContainer().getNodeHttpAddress(), + rmContainer.getContainer().getResource(), + sk.getPriority(), null, ExecutionType.OPPORTUNISTIC, + sk.getAllocationRequestId()); + demotedContainer.setVersion(rmContainer.getContainer().getVersion()); + return createRmContainer(demotedContainer, false); + } + + public List createResourceReqsForPromotion( + OpportunisticContainerContext oppCntxt, + List updateContainerRequests, + List updateContainerErrors) { + List retList = new ArrayList<>(); + for (UpdateContainerRequest uReq : updateContainerRequests) { + RMContainer rmContainer = + rmContext.getScheduler().getRMContainer(uReq.getContainerId()); + // Check if this is a container update + // And not in the middle of a Demotion + if (rmContainer != null) { + // Check if this is an executionType change request + // If so, fix the rr to make it look like a normal rr + // with relaxLocality=false and numContainers=1 + SchedulerNode schedulerNode = rmContext.getScheduler() + .getSchedulerNode(rmContainer.getContainer().getNodeId()); + + // Add only if no outstanding promote requests exist. + SchedulerRequestKey schedulerKey = + oppCntxt.checkAndAddToOutstandingPromotions( + rmContainer.getContainer(), uReq); + if (schedulerKey != null) { + // Create a new Ask + retList.add( + createResourceReqForPromotion(schedulerKey, + RECORD_FACTORY.newRecordInstance(ResourceRequest.class), + rmContainer, + rmContainer.getContainer().getNodeId().getHost())); + + // TODO: The below are also required now.. Since the Schedulers + // actually update demand only for * requests. + + // Add rack local ask + retList.add( + createResourceReqForPromotion(schedulerKey, + RECORD_FACTORY.newRecordInstance(ResourceRequest.class), + rmContainer, schedulerNode.getRackName())); + + // Add ANY ask + retList.add( + createResourceReqForPromotion(schedulerKey, + RECORD_FACTORY.newRecordInstance(ResourceRequest.class), + rmContainer, ResourceRequest.ANY)); + } else { + updateContainerErrors.add(UpdateContainerError.newInstance( + RMServerUtils.UPDATE_OUTSTANDING_ERROR, uReq)); + } + } else { + LOG.warn("Cannot promote non-existent (or completed) Container [" + + uReq.getContainerId() + "]"); + } + } + return retList; + } + + private static ResourceRequest createResourceReqForPromotion( + SchedulerRequestKey schedulerRequestKey, + ResourceRequest rr, RMContainer rmContainer, String resourceName) { + rr.setResourceName(resourceName); + rr.setNumContainers(1); + rr.setRelaxLocality(false); + rr.setPriority(rmContainer.getContainer().getPriority()); + rr.setAllocationRequestId(schedulerRequestKey.getAllocationRequestId()); + rr.setCapability(rmContainer.getContainer().getResource()); + rr.setNodeLabelExpression(rmContainer.getNodeLabelExpression()); + rr.setExecutionTypeRequest(ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true)); + rr.setContainerToUpdate(schedulerRequestKey.getContainerToUpdate()); + return rr; + } + @Override public RegisterDistributedSchedulingAMResponse registerApplicationMasterForDistributedScheduling( @@ -298,21 +466,29 @@ private void handleNewContainers(List allocContainers, boolean isRemotelyAllocated) { for (Container container : allocContainers) { // Create RMContainer - SchedulerApplicationAttempt appAttempt = - ((AbstractYarnScheduler) rmContext.getScheduler()) - .getCurrentAttemptForContainer(container.getId()); - RMContainer rmContainer = new RMContainerImpl(container, - appAttempt.getApplicationAttemptId(), container.getNodeId(), - appAttempt.getUser(), rmContext, isRemotelyAllocated); - appAttempt.addRMContainer(container.getId(), rmContainer); - ((AbstractYarnScheduler) rmContext.getScheduler()).getNode( - container.getNodeId()).allocateContainer(rmContainer); + RMContainer rmContainer = + createRmContainer(container, isRemotelyAllocated); rmContainer.handle( new RMContainerEvent(container.getId(), RMContainerEventType.ACQUIRED)); } } + private RMContainer createRmContainer( + Container container, boolean isRemotelyAllocated) { + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler) rmContext.getScheduler()) + .getCurrentAttemptForContainer(container.getId()); + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), + appAttempt.getApplicationAttemptId(), container.getNodeId(), + appAttempt.getUser(), rmContext, isRemotelyAllocated); + appAttempt.addRMContainer(container.getId(), rmContainer); + ((AbstractYarnScheduler) rmContext.getScheduler()).getNode( + container.getNodeId()).allocateContainer(rmContainer); + return rmContainer; + } + @Override public void handle(SchedulerEvent event) { switch (event.getType()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 74898ca..8e91cba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; @@ -80,7 +82,7 @@ */ public class RMServerUtils { - private static final String UPDATE_OUTSTANDING_ERROR = + public static final String UPDATE_OUTSTANDING_ERROR = "UPDATE_OUTSTANDING_ERROR"; private static final String INCORRECT_CONTAINER_VERSION_ERROR = "INCORRECT_CONTAINER_VERSION_ERROR"; @@ -123,6 +125,52 @@ } /** + * + * @param rmContext RM context + * @param request Allocate Request + * @param promoteExecTypeReqs Promotion requests + * @param demoteExecTypeReqs Demotion requests + * @return List of container update Errors + */ + public static List + validateAndSplitUpdateExecutionTypeRequests(RMContext rmContext, + AllocateRequest request, List promoteExecTypeReqs, + List demoteExecTypeReqs) { + List errors = new ArrayList<>(); + Set outstandingUpdate = new HashSet<>(); + for (UpdateContainerRequest updateReq : request.getUpdateRequests()) { + if (updateReq.getContainerUpdateType() == + ContainerUpdateType.UPDATE_EXECUTION_TYPE) { + RMContainer rmContainer = rmContext.getScheduler().getRMContainer( + updateReq.getContainerId()); + String msg = validateContainerIdAndVersion(outstandingUpdate, + updateReq, rmContainer); + if (msg == null) { + ExecutionType original = rmContainer.getExecutionType(); + ExecutionType target = updateReq.getExecutionType(); + if (target != original) { + if (target == ExecutionType.GUARANTEED && + original == ExecutionType.OPPORTUNISTIC) { + updateReq.setPriority(rmContainer.getAllocatedPriority()); + updateReq.setAllocationRequestId( + rmContainer.getAllocatedSchedulerKey() + .getAllocationRequestId()); + promoteExecTypeReqs.add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else if (target == ExecutionType.OPPORTUNISTIC && + original == ExecutionType.GUARANTEED) { + demoteExecTypeReqs.add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } + } + } + checkAndcreateUpdateError(errors, updateReq, msg); + } + } + return errors; + } + + /** * Check if we have: * - Request for same containerId and different target resource * - If targetResources violates maximum/minimumAllocation @@ -131,7 +179,7 @@ * @param maximumAllocation Maximum Allocation * @param increaseResourceReqs Increase Resource Request * @param decreaseResourceReqs Decrease Resource Request - * @return List of container Errors + * @return List of container update Errors */ public static List validateAndSplitUpdateResourceRequests(RMContext rmContext, @@ -141,59 +189,76 @@ List errors = new ArrayList<>(); Set outstandingUpdate = new HashSet<>(); for (UpdateContainerRequest updateReq : request.getUpdateRequests()) { - RMContainer rmContainer = rmContext.getScheduler().getRMContainer( - updateReq.getContainerId()); - String msg = null; - if (rmContainer == null) { - msg = INVALID_CONTAINER_ID; - } - // Only allow updates if the requested version matches the current - // version - if (msg == null && updateReq.getContainerVersion() != - rmContainer.getContainer().getVersion()) { - msg = INCORRECT_CONTAINER_VERSION_ERROR + "|" - + updateReq.getContainerVersion() + "|" - + rmContainer.getContainer().getVersion(); - } - // No more than 1 container update per request. - if (msg == null && - outstandingUpdate.contains(updateReq.getContainerId())) { - msg = UPDATE_OUTSTANDING_ERROR; - } - if (msg == null) { - Resource original = rmContainer.getContainer().getResource(); - Resource target = updateReq.getCapability(); - if (Resources.fitsIn(target, original)) { - // This is a decrease request - if (validateIncreaseDecreaseRequest(rmContext, updateReq, - maximumAllocation, false)) { - decreaseResourceReqs.add(updateReq); - outstandingUpdate.add(updateReq.getContainerId()); + if (updateReq.getContainerUpdateType() != + ContainerUpdateType.UPDATE_EXECUTION_TYPE) { + RMContainer rmContainer = rmContext.getScheduler().getRMContainer( + updateReq.getContainerId()); + String msg = validateContainerIdAndVersion(outstandingUpdate, + updateReq, rmContainer); + if (msg == null) { + Resource original = rmContainer.getContainer().getResource(); + Resource target = updateReq.getCapability(); + if (Resources.fitsIn(target, original)) { + // This is a decrease request + if (validateIncreaseDecreaseRequest(rmContext, updateReq, + maximumAllocation, false)) { + decreaseResourceReqs.add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else { + msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + } } else { - msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; - } - } else { - // This is an increase request - if (validateIncreaseDecreaseRequest(rmContext, updateReq, - maximumAllocation, true)) { - increaseResourceReqs.add(updateReq); - outstandingUpdate.add(updateReq.getContainerId()); - } else { - msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + // This is an increase request + if (validateIncreaseDecreaseRequest(rmContext, updateReq, + maximumAllocation, true)) { + increaseResourceReqs.add(updateReq); + outstandingUpdate.add(updateReq.getContainerId()); + } else { + msg = RESOURCE_OUTSIDE_ALLOWED_RANGE; + } } } - } - if (msg != null) { - UpdateContainerError updateError = RECORD_FACTORY - .newRecordInstance(UpdateContainerError.class); - updateError.setReason(msg); - updateError.setUpdateContainerRequest(updateReq); - errors.add(updateError); + checkAndcreateUpdateError(errors, updateReq, msg); } } return errors; } + private static void checkAndcreateUpdateError( + List errors, UpdateContainerRequest updateReq, + String msg) { + if (msg != null) { + UpdateContainerError updateError = RECORD_FACTORY + .newRecordInstance(UpdateContainerError.class); + updateError.setReason(msg); + updateError.setUpdateContainerRequest(updateReq); + errors.add(updateError); + } + } + + private static String validateContainerIdAndVersion( + Set outstandingUpdate, UpdateContainerRequest updateReq, + RMContainer rmContainer) { + String msg = null; + if (rmContainer == null) { + msg = INVALID_CONTAINER_ID; + } + // Only allow updates if the requested version matches the current + // version + if (msg == null && updateReq.getContainerVersion() != + rmContainer.getContainer().getVersion()) { + msg = INCORRECT_CONTAINER_VERSION_ERROR + "|" + + updateReq.getContainerVersion() + "|" + + rmContainer.getContainer().getVersion(); + } + // No more than 1 container update per request. + if (msg == null && + outstandingUpdate.contains(updateReq.getContainerId())) { + msg = UPDATE_OUTSTANDING_ERROR; + } + return msg; + } + /** * Utility method to validate a list resource requests, by insuring that the * requested memory/vcore is non-negative and not greater than max diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index dbc6169..c0e55a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -108,6 +108,8 @@ // Transitions from ACQUIRED state .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, RMContainerEventType.LAUNCHED) + .addTransition(RMContainerState.ACQUIRED, RMContainerState.ACQUIRED, + RMContainerEventType.ACQUIRED) .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED, RMContainerEventType.FINISHED, new FinishedTransition()) .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED, @@ -125,6 +127,8 @@ .addTransition(RMContainerState.RUNNING, RMContainerState.RELEASED, RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, + RMContainerEventType.ACQUIRED) + .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.RESERVED, new ContainerReservedTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.CHANGE_RESOURCE, new ChangeResourceTransition()) @@ -163,13 +167,13 @@ private final WriteLock writeLock; private final ApplicationAttemptId appAttemptId; private final NodeId nodeId; - private final Container container; private final RMContext rmContext; private final EventHandler eventHandler; private final ContainerAllocationExpirer containerAllocationExpirer; private final String user; private final String nodeLabelExpression; + private Container container; private Resource reservedResource; private NodeId reservedNode; private SchedulerRequestKey reservedSchedulerKey; @@ -188,44 +192,44 @@ private boolean isExternallyAllocated; private SchedulerRequestKey allocatedSchedulerKey; - public RMContainerImpl(Container container, + public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext) { - this(container, appAttemptId, nodeId, user, rmContext, System + this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System .currentTimeMillis(), ""); } - public RMContainerImpl(Container container, + public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext, boolean isExternallyAllocated) { - this(container, appAttemptId, nodeId, user, rmContext, System + this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System .currentTimeMillis(), "", isExternallyAllocated); } private boolean saveNonAMContainerMetaInfo; - public RMContainerImpl(Container container, + public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext, String nodeLabelExpression) { - this(container, appAttemptId, nodeId, user, rmContext, System + this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, System .currentTimeMillis(), nodeLabelExpression); } - public RMContainerImpl(Container container, + public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext, long creationTime, String nodeLabelExpression) { - this(container, appAttemptId, nodeId, user, rmContext, creationTime, - nodeLabelExpression, false); + this(container, schedulerKey, appAttemptId, nodeId, user, rmContext, + creationTime, nodeLabelExpression, false); } - public RMContainerImpl(Container container, + public RMContainerImpl(Container container, SchedulerRequestKey schedulerKey, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, RMContext rmContext, long creationTime, String nodeLabelExpression, boolean isExternallyAllocated) { this.stateMachine = stateMachineFactory.make(this); this.nodeId = nodeId; this.container = container; - this.allocatedSchedulerKey = SchedulerRequestKey.extractFrom(container); + this.allocatedSchedulerKey = schedulerKey; this.appAttemptId = appAttemptId; this.user = user; this.creationTime = creationTime; @@ -276,6 +280,10 @@ public Container getContainer() { return this.container; } + public synchronized void setContainer(Container container) { + this.container = container; + } + @Override public RMContainerState getState() { this.readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index c0cc6b0..c889931 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; -import org.apache.hadoop.yarn.api.records.AbstractResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulerResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -81,6 +81,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -503,9 +505,11 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status, ApplicationAttemptId attemptId = container.getId().getApplicationAttemptId(); RMContainer rmContainer = - new RMContainerImpl(container, attemptId, node.getNodeID(), - applications.get(attemptId.getApplicationId()).getUser(), rmContext, - status.getCreationTime(), status.getNodeLabelExpression()); + new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), attemptId, + node.getNodeID(), applications.get( + attemptId.getApplicationId()).getUser(), rmContext, + status.getCreationTime(), status.getNodeLabelExpression()); return rmContainer; } @@ -1034,7 +1038,7 @@ protected synchronized void nodeUpdate(RMNode nm) { } @Override - public void normalizeRequest(AbstractResourceRequest ask) { + public void normalizeRequest(SchedulerResourceRequest ask) { SchedulerUtils.normalizeRequest(ask, getResourceCalculator(), getMinimumResourceCapability(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 30f7ef9..650ee91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -973,6 +973,7 @@ public ResourceRequest cloneResourceRequest(ResourceRequest request) { .numContainers(1) .relaxLocality(request.getRelaxLocality()) .nodeLabelExpression(request.getNodeLabelExpression()).build(); + newRequest.setContainerToUpdate(request.getContainerToUpdate()); return newRequest; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index e94d800..1d06478 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NMToken; @@ -54,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -133,6 +136,9 @@ private AtomicLong firstContainerAllocatedTime = new AtomicLong(0); protected List newlyAllocatedContainers = new ArrayList<>(); + protected Map newlyPromotedContainers = new HashMap<>(); + protected Map newlyDemotedContainers = new HashMap<>(); + protected List tempContainerToKill = new ArrayList<>(); protected Map newlyDecreasedContainers = new HashMap<>(); protected Map newlyIncreasedContainers = new HashMap<>(); protected Set updatedNMTokens = new HashSet<>(); @@ -537,8 +543,9 @@ public RMContainer reserve(SchedulerNode node, writeLock.lock(); // Create RMContainer if necessary if (rmContainer == null) { - rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), - node.getNodeID(), appSchedulingInfo.getUser(), rmContext); + rmContainer = new RMContainerImpl(container, schedulerKey, + getApplicationAttemptId(), node.getNodeID(), + appSchedulingInfo.getUser(), rmContext); } if (rmContainer.getState() == RMContainerState.NEW) { attemptResourceUsage.incReserved(node.getPartition(), @@ -635,10 +642,10 @@ public Resource getCurrentConsumption() { } private Container updateContainerAndNMToken(RMContainer rmContainer, - boolean newContainer, boolean increasedContainer) { + ContainerUpdateType updateType) { Container container = rmContainer.getContainer(); ContainerType containerType = ContainerType.TASK; - if (!newContainer) { + if (updateType != null) { container.setVersion(container.getVersion() + 1); } // The working knowledge is that masterContainer for AM is null as it @@ -662,12 +669,14 @@ private Container updateContainerAndNMToken(RMContainer rmContainer, return null; } - if (newContainer) { + if (updateType == null || + ContainerUpdateType.UPDATE_EXECUTION_TYPE == updateType) { rmContainer.handle(new RMContainerEvent( rmContainer.getContainerId(), RMContainerEventType.ACQUIRED)); } else { rmContainer.handle(new RMContainerUpdatesAcquiredEvent( - rmContainer.getContainerId(), increasedContainer)); + rmContainer.getContainerId(), + ContainerUpdateType.INCREASE_RESOURCE == updateType)); } return container; } @@ -699,8 +708,8 @@ private void updateNMToken(Container container) { Iterator i = newlyAllocatedContainers.iterator(); while (i.hasNext()) { RMContainer rmContainer = i.next(); - Container updatedContainer = updateContainerAndNMToken(rmContainer, - true, false); + Container updatedContainer = + updateContainerAndNMToken(rmContainer, null); // Only add container to return list when it's not null. // updatedContainer could be null when generate token failed, it can be // caused by DNS resolving failed. @@ -713,9 +722,121 @@ private void updateNMToken(Container container) { } finally { writeLock.unlock(); } + } + public void addToNewlyDemotedContainers(ContainerId containerId, + RMContainer rmContainer) { + newlyDemotedContainers.put(containerId, rmContainer); } - + + protected synchronized void addToNewlyAllocatedContainers( + RMContainer rmContainer) { + if (oppContainerContext == null) { + newlyAllocatedContainers.add(rmContainer); + return; + } + ContainerId matchedContainerId = + oppContainerContext.matchContainerToOutstandingPromotionReq( + rmContainer.getAllocatedSchedulerKey(), rmContainer.getContainer()); + if (matchedContainerId != null) { + if (OpportunisticContainerContext.UNDEFINED == matchedContainerId) { + // This is a spurious allocation (relaxLocality = false + // resulted in the Container being allocated on an NM on the same host + // but not on the NM running the container to be updated. Can + // happen if more than one NM exists on the same host.. usually + // occurs when using MiniYARNCluster to test). + tempContainerToKill.add(rmContainer); + } else { + newlyPromotedContainers.put(matchedContainerId, rmContainer); + } + } else { + newlyAllocatedContainers.add(rmContainer); + } + } + + /** + * A container is promoted if its executionType is changed from + * OPPORTUNISTIC to GUARANTEED. It id demoted if the change is from + * GUARANTEED to OPPORTUNISTIC. + * @return Newly Promoted and Demoted containers + */ + public List pullContainersWithUpdatedExecType() { + List updatedContainers = new ArrayList<>(); + if (oppContainerContext == null) { + return updatedContainers; + } + try { + writeLock.lock(); + for (Map newlyUpdatedContainers : + Arrays.asList(newlyPromotedContainers, newlyDemotedContainers)) { + Iterator> i = + newlyUpdatedContainers.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry entry = i.next(); + ContainerId matchedContainerId = entry.getKey(); + RMContainer rmContainer = entry.getValue(); + + // swap containers + RMContainer existingRMContainer = swapContainer( + rmContainer, matchedContainerId); + oppContainerContext.completeContainerExecTypeUpdate( + rmContainer.getAllocatedSchedulerKey(), + existingRMContainer.getContainer()); + Container updatedContainer = updateContainerAndNMToken( + existingRMContainer, ContainerUpdateType.UPDATE_EXECUTION_TYPE); + updatedContainers.add(updatedContainer); + + // Mark container for release (set RRs to null, so RM does not think + // it is a recoverable container) + ((RMContainerImpl) rmContainer).setResourceRequests(null); + tempContainerToKill.add(rmContainer); + i.remove(); + } + } + // Release all temporary containers + Iterator tempIter = tempContainerToKill.iterator(); + while (tempIter.hasNext()) { + RMContainer c = tempIter.next(); + ((AbstractYarnScheduler) rmContext.getScheduler()).completedContainer(c, + SchedulerUtils.createAbnormalContainerStatus(c.getContainerId(), + SchedulerUtils.UPDATED_CONTAINER), + RMContainerEventType.KILL); + tempIter.remove(); + } + return updatedContainers; + } finally { + writeLock.unlock(); + } + } + + private RMContainer swapContainer(RMContainer rmContainer, ContainerId + matchedContainerId) { + RMContainer existingRMContainer = + getRMContainer(matchedContainerId); + if (existingRMContainer != null) { + // Swap updated container with the existing container + Container updatedContainer = rmContainer.getContainer(); + + Container newContainer = Container.newInstance(matchedContainerId, + existingRMContainer.getContainer().getNodeId(), + existingRMContainer.getContainer().getNodeHttpAddress(), + updatedContainer.getResource(), + existingRMContainer.getContainer().getPriority(), null, + updatedContainer.getExecutionType()); + newContainer.setAllocationRequestId( + existingRMContainer.getContainer().getAllocationRequestId()); + newContainer.setVersion(existingRMContainer.getContainer().getVersion()); + + rmContainer.getContainer().setResource( + existingRMContainer.getContainer().getResource()); + rmContainer.getContainer().setExecutionType( + existingRMContainer.getContainer().getExecutionType()); + + ((RMContainerImpl)existingRMContainer).setContainer(newContainer); + } + return existingRMContainer; + } + private List pullNewlyUpdatedContainers( Map updatedContainerMap, boolean increase) { try { @@ -728,7 +849,8 @@ private void updateNMToken(Container container) { while (i.hasNext()) { RMContainer rmContainer = i.next().getValue(); Container updatedContainer = updateContainerAndNMToken(rmContainer, - false, increase); + increase ? ContainerUpdateType.INCREASE_RESOURCE : + ContainerUpdateType.DECREASE_RESOURCE); if (updatedContainer != null) { returnContainerList.add(updatedContainer); i.remove(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 6f905b9..28c1841 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.AbstractResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulerResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; @@ -57,6 +57,9 @@ public static final String RELEASED_CONTAINER = "Container released by application"; + + public static final String UPDATED_CONTAINER = + "Temporary container killed by application for ExeutionType update"; public static final String LOST_CONTAINER = "Container released on a *lost* node"; @@ -141,7 +144,7 @@ public static void normalizeRequest( * requested memory is a multiple of increment resource and is not zero. */ public static void normalizeRequest( - AbstractResourceRequest ask, + SchedulerResourceRequest ask, ResourceCalculator resourceCalculator, Resource minimumResource, Resource maximumResource, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index ea1ae60..823bd1c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; -import org.apache.hadoop.yarn.api.records.AbstractResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulerResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -386,5 +386,5 @@ public Priority updateApplicationPriority(Priority newPriority, * * @param request the resource request to be normalized */ - void normalizeRequest(AbstractResourceRequest request); + void normalizeRequest(SchedulerResourceRequest request); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index c12bc6a..65bd435 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -746,7 +746,7 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, // When reserving container RMContainer updatedContainer = reservedContainer; if (updatedContainer == null) { - updatedContainer = new RMContainerImpl(container, + updatedContainer = new RMContainerImpl(container, schedulerKey, application.getApplicationAttemptId(), node.getNodeID(), application.getAppSchedulingInfo().getUser(), rmContext); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index b14bc20..7d4dc4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -222,7 +222,7 @@ public RMContainer allocate(FiCaSchedulerNode node, } // Create RMContainer - RMContainer rmContainer = new RMContainerImpl(container, + RMContainer rmContainer = new RMContainerImpl(container, schedulerKey, this.getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext, request.getNodeLabelExpression()); @@ -554,7 +554,7 @@ public void apply(Resource cluster, // Update this application for the allocated container if (!allocation.isIncreasedAllocation()) { // Allocate a new container - newlyAllocatedContainers.add(rmContainer); + addToNewlyAllocatedContainers(rmContainer); liveContainers.put(containerId, rmContainer); // Deduct pending resource requests diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index a9591a5..7c69249 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -448,13 +448,13 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node, } // Create RMContainer - rmContainer = new RMContainerImpl(container, + rmContainer = new RMContainerImpl(container, schedulerKey, getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), rmContext); ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); + addToNewlyAllocatedContainers(rmContainer); liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index e790bc2..b34b565 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; -import org.apache.hadoop.yarn.api.records.AbstractResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulerResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -800,7 +800,7 @@ private void removeNode(RMNode rmNode) { } @Override - public void normalizeRequest(AbstractResourceRequest ask) { + public void normalizeRequest(SchedulerResourceRequest ask) { SchedulerUtils.normalizeRequest(ask, DOMINANT_RESOURCE_CALCULATOR, minimumAllocation, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java index e60f70e..af5efc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java @@ -68,7 +68,7 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node, // Create RMContainer RMContainer rmContainer = new RMContainerImpl(container, - this.getApplicationAttemptId(), node.getNodeID(), + schedulerKey, this.getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext, request.getNodeLabelExpression()); ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); @@ -76,7 +76,7 @@ public RMContainer allocate(NodeType type, FiCaSchedulerNode node, updateAMContainerDiagnostics(AMState.ASSIGNED, null); // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); + addToNewlyAllocatedContainers(rmContainer); ContainerId containerId = container.getId(); liveContainers.put(containerId, rmContainer); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 593de08..fbeca7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -251,6 +251,13 @@ public AllocateResponse sendContainerResizingRequest( return allocate(req); } + public AllocateResponse sendContainerUpdateRequest( + List updateRequests) throws Exception { + final AllocateRequest req = AllocateRequest.newInstance(0, 0F, null, null, + null, updateRequests); + return allocate(req); + } + public AllocateResponse allocate(AllocateRequest allocateRequest) throws Exception { UserGroupInformation ugi = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 32cdb1b..2d76127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -195,6 +195,12 @@ public NodeHeartbeatResponse nodeHeartbeat(Map updatedStats, boolean isHealthy) throws Exception { + return nodeHeartbeat(updatedStats, Collections.emptyList(), + isHealthy, ++responseId); + } + public NodeHeartbeatResponse nodeHeartbeat(List updatedStats, List increasedConts, boolean isHealthy, int resId) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index cb57f39..c135384 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -142,6 +142,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; @@ -1072,7 +1074,8 @@ public ApplicationReport createAndGetApplicationReport( Container container = Container.newInstance( ContainerId.newContainerId(attemptId, 1), null, "", null, null, null); RMContainerImpl containerimpl = spy(new RMContainerImpl(container, - attemptId, null, "", rmContext)); + SchedulerRequestKey.extractFrom(container), attemptId, null, "", + rmContext)); Map attempts = new HashMap(); attempts.put(attemptId, rmAppAttemptImpl); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 73d9e5c..1fcfdee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -34,11 +34,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ContainerUpdateType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -64,8 +68,11 @@ import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; @@ -75,13 +82,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; /** @@ -91,8 +102,10 @@ private static final int GB = 1024; - @Test(timeout = 60000) - public void testNodeRemovalDuringAllocate() throws Exception { + private MockRM rm; + + @Before + public void createAndStartRM() { CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); YarnConfiguration conf = new YarnConfiguration(csConf); @@ -102,8 +115,445 @@ public void testNodeRemovalDuringAllocate() throws Exception { YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); conf.setInt( YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100); - MockRM rm = new MockRM(conf); + rm = new MockRM(conf); rm.start(); + } + + @After + public void stopRM() { + if (rm != null) { + rm.stop(); + } + } + + @Test(timeout = 600000) + public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h1:4321", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h2:4321", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId()); + RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode2) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode3) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode4) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) + .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode3)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode4)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode3)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode4)); + // All nodes 1 - 4 will be applicable for scheduling. + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + + Thread.sleep(1000); + + QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue() + .getMetrics(); + + // Verify Metrics + verifyMetrics(metrics, 15360, 15, 1024, 1, 1); + + AllocateResponse allocateResponse = am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))), + null); + List allocatedContainers = allocateResponse + .getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + MockNM sameHostDiffNode = null; + for (NodeId n : nodes.keySet()) { + if (n.getHost().equals(allocNode.getNodeId().getHost()) && + n.getPort() != allocNode.getNodeId().getPort()) { + sameHostDiffNode = nodes.get(n); + } + } + + // Verify Metrics After OPP allocation (Nothing should change) + verifyMetrics(metrics, 15360, 15, 1024, 1, 1); + + am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + // Node on same host should not result in allocation + sameHostDiffNode.nodeHeartbeat(true); + Thread.sleep(200); + allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + + // Verify Metrics After OPP allocation (Nothing should change again) + verifyMetrics(metrics, 15360, 15, 1024, 1, 1); + + // Send Promotion req again... this should result in update error + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); + Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", + allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(container.getId(), + allocateResponse.getUpdateErrors().get(0) + .getUpdateContainerRequest().getContainerId()); + + // Send Promotion req again with incorrect version... + // this should also result in update error + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(1, + container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); + Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR|1|0", + allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(container.getId(), + allocateResponse.getUpdateErrors().get(0) + .getUpdateContainerRequest().getContainerId()); + + // Ensure after correct node heartbeats, we should get the allocation + allocNode.nodeHeartbeat(true); + Thread.sleep(200); + allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + Container uc = + allocateResponse.getUpdatedContainers().get(0).getContainer(); + Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType()); + Assert.assertEquals(uc.getId(), container.getId()); + Assert.assertEquals(uc.getVersion(), container.getVersion() + 1); + + // Verify Metrics After OPP allocation : + // Allocated cores+mem should have increased, available should decrease + verifyMetrics(metrics, 14336, 14, 2048, 2, 2); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + Thread.sleep(200); + + // Verify that the container is still in ACQUIRED state wrt the RM. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId()); + Assert.assertEquals(RMContainerState.ACQUIRED, rmContainer.getState()); + + // Now demote the container back.. + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(uc.getVersion(), + uc.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.OPPORTUNISTIC))); + // This should happen in the same heartbeat.. + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + uc = allocateResponse.getUpdatedContainers().get(0).getContainer(); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, uc.getExecutionType()); + Assert.assertEquals(uc.getId(), container.getId()); + Assert.assertEquals(uc.getVersion(), container.getVersion() + 2); + + // Verify Metrics After OPP allocation : + // Everything should have reverted to what it was + verifyMetrics(metrics, 15360, 15, 1024, 1, 1); + } + + @Test(timeout = 60000) + public void testContainerPromoteAfterContainerStart() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNode(); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode2) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) + .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // All nodes 1 to 2 will be applicable for scheduling. + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + Thread.sleep(1000); + + QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue() + .getMetrics(); + + // Verify Metrics + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + AllocateResponse allocateResponse = am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))), + null); + List allocatedContainers = allocateResponse + .getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), + true); + Thread.sleep(200); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + container.getId().getApplicationAttemptId()).getRMContainer( + container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Verify Metrics After OPP allocation (Nothing should change) + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + + // Verify Metrics After OPP allocation (Nothing should change again) + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + // Send Promotion req again... this should result in update error + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); + Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", + allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(container.getId(), + allocateResponse.getUpdateErrors().get(0) + .getUpdateContainerRequest().getContainerId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), + true); + Thread.sleep(200); + + allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); + Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); + Container uc = + allocateResponse.getUpdatedContainers().get(0).getContainer(); + Assert.assertEquals(ExecutionType.GUARANTEED, uc.getExecutionType()); + Assert.assertEquals(uc.getId(), container.getId()); + Assert.assertEquals(uc.getVersion(), container.getVersion() + 1); + + // Verify that the Container is still in RUNNING state wrt RM.. + rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + uc.getId().getApplicationAttemptId()).getRMContainer(uc.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Verify Metrics After OPP allocation : + // Allocated cores+mem should have increased, available should decrease + verifyMetrics(metrics, 6144, 6, 2048, 2, 2); + } + + @Test(timeout = 600000) + public void testContainerPromoteAfterContainerComplete() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + nm1.registerNode(); + nm2.registerNode(); + + OpportunisticContainerAllocatorAMService amservice = + (OpportunisticContainerAllocatorAMService) rm + .getApplicationMasterService(); + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + ApplicationAttemptId attemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + ResourceScheduler scheduler = rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + ((RMNodeImpl) rmNode1) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + ((RMNodeImpl) rmNode2) + .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100)); + + OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler) + .getApplicationAttempt(attemptId).getOpportunisticContainerContext(); + // Send add and update node events to AM Service. + amservice.handle(new NodeAddedSchedulerEvent(rmNode1)); + amservice.handle(new NodeAddedSchedulerEvent(rmNode2)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode1)); + amservice.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // All nodes 1 to 2 will be applicable for scheduling. + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + + Thread.sleep(1000); + + QueueMetrics metrics = ((CapacityScheduler) scheduler).getRootQueue() + .getMetrics(); + + // Verify Metrics + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + AllocateResponse allocateResponse = am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))), + null); + List allocatedContainers = allocateResponse + .getAllocatedContainers(); + Assert.assertEquals(2, allocatedContainers.size()); + Container container = allocatedContainers.get(0); + MockNM allocNode = nodes.get(container.getNodeId()); + + // Start Container in NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), + true); + Thread.sleep(200); + + // Verify that container is actually running wrt the RM.. + RMContainer rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + container.getId().getApplicationAttemptId()).getRMContainer( + container.getId()); + Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState()); + + // Container Completed in the NM + allocNode.nodeHeartbeat(Arrays.asList( + ContainerStatus.newInstance(container.getId(), + ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)), + true); + Thread.sleep(200); + + // Verify that container has been removed.. + rmContainer = ((CapacityScheduler) scheduler) + .getApplicationAttempt( + container.getId().getApplicationAttemptId()).getRMContainer( + container.getId()); + Assert.assertNull(rmContainer); + + // Verify Metrics After OPP allocation (Nothing should change) + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + + // Send Promotion req... this should result in update error + // Since the container doesn't exist anymore.. + allocateResponse = am1.sendContainerUpdateRequest( + Arrays.asList(UpdateContainerRequest.newInstance(0, + container.getId(), ContainerUpdateType.UPDATE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED))); + + Assert.assertEquals(1, + allocateResponse.getCompletedContainersStatuses().size()); + Assert.assertEquals(container.getId(), + allocateResponse.getCompletedContainersStatuses().get(0) + .getContainerId()); + Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); + Assert.assertEquals(1, allocateResponse.getUpdateErrors().size()); + Assert.assertEquals("INVALID_CONTAINER_ID", + allocateResponse.getUpdateErrors().get(0).getReason()); + Assert.assertEquals(container.getId(), + allocateResponse.getUpdateErrors().get(0) + .getUpdateContainerRequest().getContainerId()); + + // Verify Metrics After OPP allocation (Nothing should change again) + verifyMetrics(metrics, 7168, 7, 1024, 1, 1); + } + + private void verifyMetrics(QueueMetrics metrics, long availableMB, + int availableVirtualCores, long allocatedMB, + int allocatedVirtualCores, int allocatedContainers) { + Assert.assertEquals(availableMB, metrics.getAvailableMB()); + Assert.assertEquals(availableVirtualCores, metrics.getAvailableVirtualCores()); + Assert.assertEquals(allocatedMB, metrics.getAllocatedMB()); + Assert.assertEquals(allocatedVirtualCores, metrics.getAllocatedVirtualCores()); + Assert.assertEquals(allocatedContainers, metrics.getAllocatedContainers()); + } + + @Test(timeout = 60000) + public void testNodeRemovalDuringAllocate() throws Exception { MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); nm1.registerNode(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index e737a84..893f802 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Test; @@ -114,7 +115,8 @@ public void testReleaseWhileRunning() { YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO, true); when(rmContext.getYarnConfiguration()).thenReturn(conf); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, nodeId, "user", rmContext); assertEquals(RMContainerState.NEW, rmContainer.getState()); @@ -216,7 +218,8 @@ public void testExpireWhileRunning() { when(rmContext.getYarnConfiguration()).thenReturn(conf); when(rmContext.getRMApps()).thenReturn(appMap); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, nodeId, "user", rmContext); assertEquals(RMContainerState.NEW, rmContainer.getState()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index a6ae0c2..cf91841 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; + +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -280,7 +282,8 @@ public void testSortedQueues() throws Exception { ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); Container container=TestUtils.getMockContainer(containerId, node_0.getNodeID(), Resources.createResource(1*GB), priority); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, node_0.getNodeID(), "user", rmContext); // Assign {1,2,3,4} 1GB containers respectively to queues diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index c9eb8b3..f9bf89d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -923,13 +923,15 @@ public void testGetAppToUnreserve() throws Exception { Container container = TestUtils.getMockContainer(containerId, node_1.getNodeID(), Resources.createResource(2*GB), priorityMap.getPriority()); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, node_1.getNodeID(), "user", rmContext); Container container_1 = TestUtils.getMockContainer(containerId, node_0.getNodeID(), Resources.createResource(1*GB), priorityMap.getPriority()); - RMContainer rmContainer_1 = new RMContainerImpl(container_1, appAttemptId, + RMContainer rmContainer_1 = new RMContainerImpl(container_1, + SchedulerRequestKey.extractFrom(container_1), appAttemptId, node_0.getNodeID(), "user", rmContext); // no reserved containers @@ -996,7 +998,8 @@ public void testFindNodeToUnreserve() throws Exception { Container container = TestUtils.getMockContainer(containerId, node_1.getNodeID(), Resources.createResource(2*GB), priorityMap.getPriority()); - RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), appAttemptId, node_1.getNodeID(), "user", rmContext); // nothing reserved