diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index f070f28..e0c7abf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionContainer; import org.apache.hadoop.yarn.api.records.PreemptionContract; @@ -391,7 +392,9 @@ public AllocateResponse allocate(AllocateRequest request) blacklistRequest.getBlacklistAdditions() : null; List blacklistRemovals = (blacklistRequest != null) ? - blacklistRequest.getBlacklistRemovals() : null; + blacklistRequest.getBlacklistRemovals() : null; + List increaseRequests = + request.getIncreaseRequests(); // sanity check try { @@ -419,7 +422,7 @@ public AllocateResponse allocate(AllocateRequest request) // Send new requests to appAttempt. Allocation allocation = this.rScheduler.allocate(appAttemptId, ask, release, - blacklistAdditions, blacklistRemovals); + blacklistAdditions, blacklistRemovals, increaseRequests); RMApp app = this.rmContext.getRMApps().get( appAttemptId.getApplicationId()); @@ -458,7 +461,7 @@ public AllocateResponse allocate(AllocateRequest request) .pullJustFinishedContainers()); allocateResponse.setResponseId(lastResponse.getResponseId() + 1); allocateResponse.setAvailableResources(allocation.getResourceLimit()); - + allocateResponse.setIncreasedContainers(allocation.getIncreasedContainers()); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); // add preemption to the allocateResponse message (if any) @@ -578,4 +581,4 @@ public synchronized void setAllocateResponse(AllocateResponse response) { this.response = response; } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index f80ce85..22083c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -371,9 +371,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // 4. Send status to RMNode, saving the latest response. this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), - remoteNodeStatus.getContainersStatuses(), - remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); + new RMNodeStatusEvent(nodeId, remoteNodeStatus + .getNodeHealthStatus(), remoteNodeStatus + .getContainersStatuses(), remoteNodeStatus + .getKeepAliveApplications(), nodeHeartBeatResponse, + remoteNodeStatus.getDecreasedContainers())); return nodeHeartBeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 67a2e41..bfadfe6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -802,7 +802,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, // AM resource has been checked when submission Allocation amContainerAllocation = appAttempt.scheduler.allocate( appAttempt.applicationAttemptId, - Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, null, null); + Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, null, null, null); if (amContainerAllocation != null && amContainerAllocation.getContainers() != null) { assert (amContainerAllocation.getContainers().size() == 0); @@ -824,7 +824,7 @@ public void transition(RMAppAttemptImpl appAttempt, // Acquire the AM container from the scheduler. Allocation amContainerAllocation = appAttempt.scheduler.allocate( appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST, - EMPTY_CONTAINER_RELEASE_LIST, null, null); + EMPTY_CONTAINER_RELEASE_LIST, null, null, null); // There must be at least one container allocated, because a // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed, // and is put in SchedulerApplication#newlyAllocatedContainers. Then, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 24793e8..51df0cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -24,6 +24,7 @@ import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; @@ -147,4 +148,10 @@ * @return containerUpdates accumulated across NM heartbeats. */ public List pullContainerUpdates(); + + /** + * Get and clear list of decreased containers accumulated across NM heartbeats + * @return decreased containers accumulated across NM heartbeats + */ + public List pullDecreasedContainers(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 52bc285..57c6612 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -38,6 +38,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -84,6 +85,8 @@ private final WriteLock writeLock; private final ConcurrentLinkedQueue nodeUpdateQueue; + private final ConcurrentLinkedQueue + decreasedContainerQueue; private volatile boolean nextHeartBeat = true; private final NodeId nodeId; @@ -196,7 +199,9 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.stateMachine = stateMachineFactory.make(this); - this.nodeUpdateQueue = new ConcurrentLinkedQueue(); + this.nodeUpdateQueue = new ConcurrentLinkedQueue(); + this.decreasedContainerQueue = + new ConcurrentLinkedQueue(); } @Override @@ -591,6 +596,14 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { NodeState.UNHEALTHY); return NodeState.UNHEALTHY; } + + // add decreased containers + if (null != statusEvent.getDecreasedContaienrs()) { + for (ContainerResourceDecrease c : + statusEvent.getDecreasedContaienrs()) { + rmNode.decreasedContainerQueue.add(c); + } + } // Filter the map to only obtain just launched containers and finished // containers. @@ -693,6 +706,17 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { this.nextHeartBeat = true; return latestContainerInfoList; } + + @Override + public List pullDecreasedContainers() { + List latestDecreasedContainers = + new ArrayList(); + while (decreasedContainerQueue.peek() != null) { + latestDecreasedContainers.add(decreasedContainerQueue.poll()); + } + this.nextHeartBeat = true; + return latestDecreasedContainers; + } @VisibleForTesting public void setNextHeartBeat(boolean nextHeartBeat) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index abfacbb..b16bf6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -32,15 +33,25 @@ private final List containersCollection; private final NodeHeartbeatResponse latestResponse; private final List keepAliveAppIds; + private final List decreasedContainers; public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, List collection, List keepAliveAppIds, NodeHeartbeatResponse latestResponse) { + this(nodeId, nodeHealthStatus, collection, keepAliveAppIds, latestResponse, + null); + } + + public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, + List collection, List keepAliveAppIds, + NodeHeartbeatResponse latestResponse, + List decreasedContainers) { super(nodeId, RMNodeEventType.STATUS_UPDATE); this.nodeHealthStatus = nodeHealthStatus; this.containersCollection = collection; this.keepAliveAppIds = keepAliveAppIds; this.latestResponse = latestResponse; + this.decreasedContainers = decreasedContainers; } public NodeHealthStatus getNodeHealthStatus() { @@ -58,4 +69,8 @@ public NodeHeartbeatResponse getLatestResponse() { public List getKeepAliveAppIds() { return this.keepAliveAppIds; } + + public List getDecreasedContaienrs() { + return this.decreasedContainers; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index c03e31d..146e675 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -22,10 +22,9 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; public class Allocation { @@ -34,24 +33,34 @@ final Set strictContainers; final Set fungibleContainers; final List fungibleResources; + final List increasedContainers; public Allocation(List containers, Resource resourceLimit) { - this(containers, resourceLimit, null, null, null); + this(containers, resourceLimit, null, null, null, null); } public Allocation(List containers, Resource resourceLimit, Set strictContainers) { - this(containers, resourceLimit, strictContainers, null, null); + this(containers, resourceLimit, strictContainers, null, null, null); } public Allocation(List containers, Resource resourceLimit, Set strictContainers, Set fungibleContainers, List fungibleResources) { + this(containers, resourceLimit, strictContainers, fungibleContainers, + fungibleResources, null); + } + + public Allocation(List containers, Resource resourceLimit, + Set strictContainers, Set fungibleContainers, + List fungibleResources, + List increasedContainers) { this.containers = containers; this.resourceLimit = resourceLimit; this.strictContainers = strictContainers; this.fungibleContainers = fungibleContainers; this.fungibleResources = fungibleResources; + this.increasedContainers = increasedContainers; } public List getContainers() { @@ -73,5 +82,8 @@ public Resource getResourceLimit() { public List getResourcePreemptions() { return fungibleResources; } - + + public List getIncreasedContainers() { + return increasedContainers; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index b5b22b6..e0b81a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; @@ -35,6 +36,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -62,6 +66,9 @@ final Map> requests = new HashMap>(); final Set blacklist = new HashSet(); + final Map> + increaseRequestMap = + new HashMap>(); //private final ApplicationStore store; private final ActiveUsersManager activeUsersManager; @@ -111,6 +118,36 @@ private synchronized void clearRequests() { public int getNewContainerId() { return this.containerIdCounter.incrementAndGet(); } + + synchronized public void addIncreaseRequests(NodeId nodeId, + ContainerResourceIncreaseRequest increaseRequest, Resource required) { + ContainerId cid = increaseRequest.getContainerId(); + if (!increaseRequestMap.containsKey(nodeId)) { + increaseRequestMap.put(nodeId, + new HashMap()); + } + + Map cidToReq = + increaseRequestMap.get(nodeId); + + if (cidToReq.get(cid) != null) { + throw new IllegalStateException( + "illegal state, try to put increase request to map, " + + "but a same container-id already in increase request map, " + + "containerid=" + cid.toString()); + } + cidToReq.put(cid, increaseRequest); + + // update queue metrics + QueueMetrics metrics = queue.getMetrics(); + metrics.incrPendingResources(user, required); + } + + synchronized public void decreaseContainerResource(Resource released) { + // update queue metrics + QueueMetrics metrics = queue.getMetrics(); + metrics.releaseResources(user, released); + } /** * The ApplicationMaster is updating resource requirements for the @@ -221,6 +258,28 @@ synchronized public ResourceRequest getResourceRequest(Priority priority, Map nodeRequests = requests.get(priority); return (nodeRequests == null) ? null : nodeRequests.get(resourceName); } + + synchronized public List + getResourceIncreaseRequests(NodeId nodeId) { + if (increaseRequestMap.get(nodeId) == null) { + return new ArrayList(); + } + List requests = + new ArrayList(); + for (Entry e : + increaseRequestMap.get(nodeId).entrySet()) { + requests.add(e.getValue()); + } + return requests; + } + + synchronized public ContainerResourceIncreaseRequest + getResourceIncreaseRequests(NodeId nodeId, ContainerId containerId) { + if (increaseRequestMap.get(nodeId) == null) { + return null; + } + return increaseRequestMap.get(nodeId).get(containerId); + } public synchronized Resource getResource(Priority priority) { ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY); @@ -266,6 +325,23 @@ synchronized public void allocate(NodeType type, SchedulerNode node, + request.getCapability()); metrics.allocateResources(user, 1, request.getCapability()); } + + synchronized public void allocateIncreaseRequest(Resource required) { + QueueMetrics metrics = queue.getMetrics(); + LOG.debug("allocate: user: " + user + ", memory: " + required); + metrics.allocateResources(user, required); + } + + synchronized public void removeIncreaseRequest(NodeId nodeId, + ContainerId containerId) { + if (increaseRequestMap.get(nodeId) != null) { + Map reqMap = + increaseRequestMap.get(nodeId); + if (reqMap.containsKey(containerId)) { + reqMap.remove(containerId); + } + } + } /** * The {@link ResourceScheduler} is allocating data-local resources to the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 8a03095..dbe9ab9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -305,6 +305,22 @@ public void incrPendingResources(String user, int containers, Resource res) { parent.incrPendingResources(user, containers, res); } } + + public void incrPendingResources(String user, Resource res) { + _incrPendingResources(res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.incrPendingResources(user, res); + } + if (parent != null) { + parent.incrPendingResources(user, res); + } + } + + private void _incrPendingResources(Resource res) { + pendingMB.incr(res.getMemory()); + pendingVCores.incr(res.getVirtualCores()); + } private void _incrPendingResources(int containers, Resource res) { pendingContainers.incr(containers); @@ -328,6 +344,11 @@ private void _decrPendingResources(int containers, Resource res) { pendingMB.decr(res.getMemory()); pendingVCores.decr(res.getVirtualCores()); } + + private void _decrPendingResources(Resource res) { + pendingMB.decr(res.getMemory()); + pendingVCores.decr(res.getVirtualCores()); + } public void allocateResources(String user, int containers, Resource res) { allocatedContainers.incr(containers); @@ -343,6 +364,19 @@ public void allocateResources(String user, int containers, Resource res) { parent.allocateResources(user, containers, res); } } + + public void allocateResources(String user, Resource res) { + allocatedMB.incr(res.getMemory()); + allocatedVCores.incr(res.getVirtualCores()); + _decrPendingResources(res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.allocateResources(user, res); + } + if (parent != null) { + parent.allocateResources(user, res); + } + } public void releaseResources(String user, int containers, Resource res) { allocatedContainers.decr(containers); @@ -357,6 +391,18 @@ public void releaseResources(String user, int containers, Resource res) { parent.releaseResources(user, containers, res); } } + + public void releaseResources(String user, Resource res) { + allocatedMB.decr(res.getMemory()); + allocatedVCores.decr(res.getVirtualCores()); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.releaseResources(user, res); + } + if (parent != null) { + parent.releaseResources(user, res); + } + } public void reserveResource(String user, Resource res) { reservedContainers.incr(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index b0a56a4..9ce8892 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; -import java.util.Collection; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -30,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -110,7 +110,8 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, List ask, List release, List blacklistAdditions, - List blacklistRemovals); + List blacklistRemovals, + List increaseRequests); /** * Get node resource usage report. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index c317df5..b34be7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -28,6 +28,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; @@ -35,7 +36,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -237,4 +237,11 @@ public void recoverContainer(Resource clusterResource, FiCaSchedulerApp applicat * @param apps the collection to add the applications to */ public void collectSchedulerApplications(Collection apps); + + + public void cancelIncreaseRequestReservation(Resource clusterResource, + ContainerResourceIncreaseRequest changeRequest, Resource required); + + public void decreaseResource(FiCaSchedulerApp application, + Resource clusterResource, Resource released); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 3a2b353..dceb9ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -523,7 +526,8 @@ private synchronized void doneApplication( @Lock(Lock.NoLock.class) public Allocation allocate(ApplicationAttemptId applicationAttemptId, List ask, List release, - List blacklistAdditions, List blacklistRemovals) { + List blacklistAdditions, List blacklistRemovals, + List increaseRequests) { FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { @@ -555,6 +559,55 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, } synchronized (application) { + // check increaseRequests + if (increaseRequests != null) { + for (ContainerResourceIncreaseRequest incReq : increaseRequests) { + RMContainer rmContainer = + getRMContainer(incReq.getContainerId()); + // check state of rmContainer + if (rmContainer.getState() != RMContainerState.ACQUIRED + && rmContainer.getState() != RMContainerState.RUNNING) { + LOG.info("try to increase a container not in correct state," + + " ignore, containerid=" + + rmContainer.getContainerId().toString()); + continue; + } + + // check if we already have increase request, we need to replace the + // original one + ContainerResourceIncreaseRequest originalRequest = + application.getResourceIncreaseRequest(rmContainer.getContainer() + .getNodeId(), rmContainer.getContainerId()); + if (originalRequest != null) { + if (originalRequest.getCapability(). + equals(incReq.getCapability())) { + // user ask a same increase request before, we just ignore this + // new request + continue; + } + // remove the increase request before add it + removeIncreaseRequest(incReq.getContainerId()); + } + + // check size of it + if (!Resources.greaterThan(getResourceCalculator(), + clusterResource, + incReq.getCapability(), + rmContainer.getContainer().getResource())) { + LOG.info("the target size of increase request is less or equal" + + " to existing contianer size, containerid=" + + rmContainer.getContainerId().toString()); + continue; + } + + // get required resource + Resource required = + Resources.subtract(incReq.getCapability(), rmContainer + .getContainer().getResource()); + application.addIncreaseRequests(rmContainer.getContainer() + .getNodeId(), incReq, required); + } + } // make sure we aren't stopping/removing the application // when the allocate comes in @@ -637,6 +690,8 @@ private synchronized void nodeUpdate(RMNode nm) { List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); List completedContainers = new ArrayList(); + List decreasedContainers = + nm.pullDecreasedContainers(); for(UpdatedContainerInfo containerInfo : containerInfoList) { newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); completedContainers.addAll(containerInfo.getCompletedContainers()); @@ -654,6 +709,27 @@ private synchronized void nodeUpdate(RMNode nm) { completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } + + // Process decreased containers + for (ContainerResourceDecrease c : decreasedContainers) { + ContainerId containerId = c.getContainerId(); + Resource resource = c.getCapability(); + RMContainer rmContainer = getRMContainer(containerId); + + // Check state of this container, we only handle container is RUNNING + if (RMContainerState.RUNNING != rmContainer.getState()) { + LOG.info("Received decreased container=" + containerId.toString() + + ", but it's state not RUNNING, ignore."); + continue; + } + + // Remove increase request on this container if it exists + removeIncreaseRequest(containerId); + + Resource releasedResource = Resources.subtract( + rmContainer.getContainer().getResource(), resource); + decreaseContainerResource(rmContainer, releasedResource); + } // Now node data structures are upto date and ready for scheduling. if(LOG.isDebugEnabled()) { @@ -662,43 +738,70 @@ private synchronized void nodeUpdate(RMNode nm) { } // Assign new containers... - // 1. Check for reserved applications + // 1. Check for reserved applications (include increase request) // 2. Schedule if there are no reservations - - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - FiCaSchedulerApp reservedApplication = - getApplication(reservedContainer.getApplicationAttemptId()); - - // Try to fulfill the reservation - LOG.info("Trying to fulfill reservation for application " + - reservedApplication.getApplicationId() + " on node: " + nm); - - LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); - CSAssignment assignment = queue.assignContainers(clusterResource, node); - - RMContainer excessReservation = assignment.getExcessReservation(); - if (excessReservation != null) { - Container container = excessReservation.getContainer(); - queue.completedContainer( - clusterResource, assignment.getApplication(), node, - excessReservation, - SchedulerUtils.createAbnormalContainerStatus( - container.getId(), - SchedulerUtils.UNRESERVED_CONTAINER), - RMContainerEventType.RELEASED, null); + if (node.isReserved()) { + // we will either reserve resource for increasing or new container + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + // this is reserved for new container + FiCaSchedulerApp reservedApplication = + getApplication(reservedContainer.getApplicationAttemptId()); + + // Try to fulfill the reservation + LOG.info("Trying to fulfill reservation for application " + + reservedApplication.getApplicationId() + " on node: " + nm); + + LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); + CSAssignment assignment = queue.assignContainers(clusterResource, node); + + RMContainer excessReservation = assignment.getExcessReservation(); + if (excessReservation != null) { + Container container = excessReservation.getContainer(); + queue.completedContainer( + clusterResource, assignment.getApplication(), node, + excessReservation, + SchedulerUtils.createAbnormalContainerStatus( + container.getId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.RELEASED, null); + } + } else { + // this is reserved for increasing + ContainerResourceIncreaseRequest increaseRequest = + node.getReservedIncreaseRequest(); + if (increaseRequest != null) { + // get reserved application + FiCaSchedulerApp reservedApplication = getApplication(increaseRequest + .getContainerId().getApplicationAttemptId()); + + // try to fulfill the reservation + LOG.info("Trying to fulfill increase reservation for application " + + reservedApplication.getApplicationId() + " on node: " + nm); + + LeafQueue queue = (LeafQueue)reservedApplication.getQueue(); + queue.assignReservedIncreaseRequest(node, clusterResource, increaseRequest); + } } - } // Try to schedule more if there are no reservations to fulfill - if (node.getReservedContainer() == null) { + if (!node.isReserved()) { root.assignContainers(clusterResource, node); } else { - LOG.info("Skipping scheduling since node " + nm + - " is reserved by application " + - node.getReservedContainer().getContainerId().getApplicationAttemptId() - ); + if (node.getReservedContainer() != null) { + LOG.info("Skipping scheduling since node " + + nm + + " is reserved by application " + + node.getReservedContainer().getContainerId() + .getApplicationAttemptId()); + } else { + LOG.info("Skipping scheduling since node " + + nm + + " is reserved by application " + + node.getReservedIncreaseRequest().getContainerId() + .getApplicationAttemptId()); + } } } @@ -780,6 +883,56 @@ private synchronized void addNode(RMNode nodeManager) { LOG.info("Added node " + nodeManager.getNodeAddress() + " clusterResource: " + clusterResource); } + + private synchronized void decreaseContainerResource(RMContainer rmContainer, + Resource releasedResource) { + FiCaSchedulerApp application = + getApplication(rmContainer.getApplicationAttemptId()); + FiCaSchedulerNode node = getNode(rmContainer.getContainer().getNodeId()); + application.decreaseContainerResource(releasedResource); + node.decreaseContainerResource(releasedResource); + CSQueue leafQueue = (CSQueue) application.getQueue(); + leafQueue.decreaseResource(application, releasedResource, releasedResource); + } + + private synchronized void removeIncreaseRequest( + ContainerId containerId) { + // we need do following clean-ups + // 1) remove this request in queues + // 2) remove this request in app/node, etc. + RMContainer rmContainer = getRMContainer(containerId); + if (null == rmContainer) { + LOG.warn("rmContaienr state is null, ignore, containerid = " + + containerId.toString()); + return; + } + + Container container = rmContainer.getContainer(); + NodeId nodeId = container.getNodeId(); + + // remove in app/node + boolean reserved = false; + FiCaSchedulerApp application = + getApplication(rmContainer.getApplicationAttemptId()); + FiCaSchedulerNode node = getNode(rmContainer.getContainer().getNodeId()); + ContainerResourceIncreaseRequest increaseRequestSave = application + .getResourceIncreaseRequest(nodeId, containerId); + application.removeIncreaseRequest(nodeId, containerId); + if (node.isReserved() + && node.getReservedIncreaseRequest().getContainerId() + .equals(containerId)) { + node.unreserveIncreaseResource(containerId); + reserved = true; + } + + if (reserved) { + Resource required = Resources.subtract(increaseRequestSave + .getCapability(), rmContainer.getContainer().getResource()); + CSQueue leafQueue = (CSQueue) application.getQueue(); + leafQueue.cancelIncreaseRequestReservation(clusterResource, + increaseRequestSave, required); + } + } private synchronized void removeNode(RMNode nodeInfo) { FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); @@ -825,6 +978,9 @@ private synchronized void completedContainer(RMContainer rmContainer, Container container = rmContainer.getContainer(); + // remove increase request if exceeds + removeIncreaseRequest(rmContainer.getContainerId()); + // Get the application for the finished container ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index db7db60..df71ef4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -40,6 +40,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -59,7 +61,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -794,6 +795,21 @@ private synchronized FiCaSchedulerApp getApplication( private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); + public synchronized CSAssignment assignReservedIncreaseRequest( + FiCaSchedulerNode node, Resource clusterResource, + ContainerResourceIncreaseRequest increaseRequest) { + ContainerId containerId = increaseRequest.getContainerId(); + FiCaSchedulerApp application = getApplication(containerId + .getApplicationAttemptId()); + RMContainer container = application.getRMContainer(containerId); + // get required resource + Resource required = getRequiredResourceForIncreaseRequest(increaseRequest, + container.getContainer()); + Resource allocatedResource = assignIncreaseResourceOnNode(application, + node, clusterResource, container, increaseRequest, true, required); + return new CSAssignment(allocatedResource, NodeType.NODE_LOCAL); + } + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node) { @@ -829,6 +845,14 @@ private synchronized FiCaSchedulerApp getApplication( continue; } + // Schedule increase container first + Resource increaseAssigned = assignIncreaseResourcesOnNode(application, node, clusterResource); + if (Resources.greaterThan( + resourceCalculator, clusterResource, increaseAssigned, Resources.none())) { + allocateResource(clusterResource, application, increaseAssigned, false); + return new CSAssignment(increaseAssigned, NodeType.NODE_LOCAL); + } + // Schedule in priority order for (Priority priority : application.getPriorities()) { // Required resource @@ -883,7 +907,7 @@ private synchronized FiCaSchedulerApp getApplication( // Book-keeping // Note: Update headroom to account for current allocation too... - allocateResource(clusterResource, application, assigned); + allocateResource(clusterResource, application, assigned, true); // Don't reset scheduling opportunities for non-local assignments // otherwise the app will be delayed for each non-local assignment. @@ -911,6 +935,133 @@ private synchronized FiCaSchedulerApp getApplication( return NULL_ASSIGNMENT; } + + private Resource getRequiredResourceForIncreaseRequest( + ContainerResourceIncreaseRequest increaseRequest, Container container) { + Resource required = Resources.subtract( + increaseRequest.getCapability(), container.getResource()); + return required; + } + + private synchronized Resource assignIncreaseResourceOnNode( + FiCaSchedulerApp application, FiCaSchedulerNode node, + Resource clusterResource, RMContainer container, + ContainerResourceIncreaseRequest increaseRequest, boolean isReserved, + Resource required) { + // check if required resource is be able to allocate in this node + Resource available = node.getAvailableResource(); + Resource totalResource = node.getTotalResource(); + + if (!Resources.fitsIn(required, totalResource)) { + LOG.warn("Node : " + node.getNodeID() + + " does not have sufficient resource for request : " + required + + " node total capability : " + node.getTotalResource()); + return Resources.none(); + } + + // available should at least larger than none, otherwise there's something + // wrong + assert Resources.greaterThan(resourceCalculator, clusterResource, + available, Resources.none()); + + // check if we can allocate required resource + boolean canBeAllocated = Resources.greaterThanOrEqual(resourceCalculator, + clusterResource, available, required); + if (canBeAllocated) { + // un-reserve this allocation if this is an allocation for reservation + if (isReserved) { + node.unreserveIncreaseResource(container.getContainerId()); + application.unreserveIncreaseRequest(container.getContainerId()); + // Update reserved metrics + getMetrics().unreserveResource( + application.getUser(), required); + } + + // set container resource size and add it to new increased container + container.getContainer().setResource(increaseRequest.getCapability()); + + // make increased container context + Token newToken = createContainerToken(application, container.getContainer()); + if (newToken == null) { + // something goes wrong + throw new IllegalStateException( + "something goes wrong when creating token for containerid=" + + container.getContainerId().toString()); + } + container.getContainer().setContainerToken(newToken); + + // add new assignment to application + ContainerResourceIncrease newAssignment = ContainerResourceIncrease.newInstance(increaseRequest.getContainerId(), increaseRequest.getCapability(), newToken); + + application.allocateIncreaseResource(newAssignment, required); + node.increaseResource(required); + + // clean-up potential reservations + application.removeIncreaseRequest(node.getNodeID(), + increaseRequest.getContainerId()); + + return required; + } + + return Resources.none(); + } + + // allocate increase resources in a specified node/app, this method will try + // to allocate increase request sorted according to the time of this request + // added. It will stop by meet the first increase request cannot by allocated + // on this node or all increase requests of this app/node allocated. It will + // mark the require cannot be satisfy to reserved in both app/node + private synchronized Resource assignIncreaseResourcesOnNode( + FiCaSchedulerApp application, FiCaSchedulerNode node, + Resource clusterResource) { + List increaseRequests = application + .getResourceIncreaseRequest(node.getNodeID()); + if (increaseRequests == null) { + return Resources.none(); + } + Resource totalAllocated = Resources.none(); + for (ContainerResourceIncreaseRequest increaseRequest : increaseRequests) { + ContainerId containerId = increaseRequest.getContainerId(); + RMContainer container = application.getRMContainer(containerId); + // check null + if (null == container) { + // this is finished container, we will handle this case in + // containerComplete event + continue; + } + + // get required resource + Resource required = getRequiredResourceForIncreaseRequest(increaseRequest, + container.getContainer()); + + Resource userLimit = computeUserLimitAndSetHeadroom(application, + clusterResource, required); + + // Check queue max-capacity limit + if (!assignToQueue(clusterResource, required)) { + return Resources.none(); + } + + // Check user limit + if (!assignToUser(clusterResource, application.getUser(), userLimit)) { + return Resources.none(); + } + + Resource assigned = assignIncreaseResourceOnNode(application, node, + clusterResource, container, increaseRequest, false, required); + if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, + Resources.none())) { + totalAllocated = Resources.add(totalAllocated, assigned); + } else { + // reserve this allocation + application.reserveIncreaseContainer(increaseRequest); + node.reserveIncreaseResource(increaseRequest); + totalAllocated = Resources.add(totalAllocated, required); + return totalAllocated; + } + } + return totalAllocated; + } private synchronized CSAssignment assignReservedContainer(FiCaSchedulerApp application, @@ -1372,6 +1523,15 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod return container.getResource(); } else { + // Check if reserved increase request exceeds + // why not directly return if isReserved is false? because + // FiCaSchedulerApp will count re-reservation count + if (node.isReserved()) { + if (node.getReservedIncreaseRequest() != null) { + return Resources.none(); + } + } + // Reserve by 'charging' in advance... reserve(application, priority, node, rmContainer, container); @@ -1444,7 +1604,7 @@ public void completedContainer(Resource clusterResource, // Book-keeping if (removed) { releaseResource(clusterResource, - application, container.getResource()); + application, container.getResource(), true); LOG.info("completedContainer" + " container=" + container + " resource=" + container.getResource() + @@ -1464,12 +1624,15 @@ public void completedContainer(Resource clusterResource, } synchronized void allocateResource(Resource clusterResource, - FiCaSchedulerApp application, Resource resource) { + FiCaSchedulerApp application, Resource resource, boolean newContainer) { // Update queue metrics Resources.addTo(usedResources, resource); CSQueueUtils.updateQueueStatistics( resourceCalculator, this, getParent(), clusterResource, minimumAllocation); - ++numContainers; + + if (newContainer) { + ++numContainers; + } // Update user metrics String userName = application.getUser(); @@ -1489,13 +1652,15 @@ synchronized void allocateResource(Resource clusterResource, } synchronized void releaseResource(Resource clusterResource, - FiCaSchedulerApp application, Resource resource) { + FiCaSchedulerApp application, Resource resource, boolean completeContainer) { // Update queue metrics Resources.subtractFrom(usedResources, resource); CSQueueUtils.updateQueueStatistics( resourceCalculator, this, getParent(), clusterResource, minimumAllocation); - --numContainers; + if (completeContainer) { + --numContainers; + } // Update user metrics String userName = application.getUser(); @@ -1601,7 +1766,7 @@ public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, Container container) { // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource, application, container.getResource()); + allocateResource(clusterResource, application, container.getResource(), true); } getParent().recoverContainer(clusterResource, application, container); @@ -1631,5 +1796,22 @@ public void collectSchedulerApplications( apps.add(app.getApplicationAttemptId()); } } - + + @Override + public void cancelIncreaseRequestReservation(Resource clusterResource, + ContainerResourceIncreaseRequest changeRequest, Resource required) { + FiCaSchedulerApp application = + getApplication(changeRequest.getContainerId().getApplicationAttemptId()); + getMetrics().unreserveResource(application.getUser(), required); + releaseResource(clusterResource, application, required, false); + getParent().cancelIncreaseRequestReservation(clusterResource, + changeRequest, required); + } + + @Override + public void decreaseResource(FiCaSchedulerApp application, + Resource clusterResource, Resource released) { + releaseResource(clusterResource, application, released, false); + getParent().decreaseResource(application, clusterResource, released); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index b22b24e..a2a43a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -38,6 +38,7 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -51,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -562,6 +562,8 @@ public synchronized CSAssignment assignContainers( resourceCalculator, clusterResource, assignedToChild.getResource(), Resources.none())) { // Track resource utilization for the parent-queue + // TODO, in our case, we cannot simply ++numContainer because our + // reservation will not create new container allocateResource(clusterResource, assignedToChild.getResource()); // Track resource utilization in this pass of the scheduler @@ -620,7 +622,7 @@ private synchronized boolean assignToQueue(Resource clusterResource) { } private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { - return (node.getReservedContainer() == null) && + return (!node.isReserved()) && Resources.greaterThanOrEqual(resourceCalculator, clusterResource, node.getAvailableResource(), minimumAllocation); } @@ -721,6 +723,23 @@ public void completedContainer(Resource clusterResource, } } + @Override + public synchronized void cancelIncreaseRequestReservation( + Resource clusterResource, ContainerResourceIncreaseRequest changeRequest, + Resource required) { + Resources.subtractFrom(usedResources, required); + CSQueueUtils.updateQueueStatistics(resourceCalculator, this, parent, + clusterResource, minimumAllocation); + } + + @Override + public synchronized void decreaseResource(FiCaSchedulerApp application, + Resource clusterResource, Resource released) { + Resources.subtractFrom(usedResources, released); + CSQueueUtils.updateQueueStatistics(resourceCalculator, this, parent, + clusterResource, minimumAllocation); + } + synchronized void allocateResource(Resource clusterResource, Resource resource) { Resources.addTo(usedResources, resource); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 7f51126..60c4200 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -18,8 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -30,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -48,8 +53,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -61,14 +66,41 @@ private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class); + private Map reservedIncreaseRequests = + new HashMap(); private final Set containersToPreempt = new HashSet(); + private List newlyIncreasedContainers = + new ArrayList(); public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { super(applicationAttemptId, user, queue, activeUsersManager, rmContext); } + + synchronized public List + getResourceIncreaseRequest(NodeId nodeId) { + return this.appSchedulingInfo.getResourceIncreaseRequests(nodeId); + } + + synchronized public ContainerResourceIncreaseRequest + getResourceIncreaseRequest(NodeId nodeId, ContainerId containerId) { + return this.appSchedulingInfo.getResourceIncreaseRequests(nodeId, + containerId); + } + + synchronized public void removeIncreaseRequest(NodeId nodeId, + ContainerId containerId) { + this.appSchedulingInfo.removeIncreaseRequest(nodeId, containerId); + if (reservedIncreaseRequests.containsKey(containerId)) { + unreserveIncreaseRequest(containerId); + } + } + + synchronized public void decreaseContainerResource(Resource released) { + this.appSchedulingInfo.decreaseContainerResource(released); + } synchronized public boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { @@ -104,6 +136,12 @@ synchronized public boolean containerCompleted(RMContainer rmContainer, return true; } + + synchronized public void addIncreaseRequests(NodeId nodeId, + ContainerResourceIncreaseRequest increaseRequest, Resource required) { + this.appSchedulingInfo.addIncreaseRequests(nodeId, increaseRequest, + required); + } synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, Priority priority, ResourceRequest request, @@ -149,7 +187,62 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, return rmContainer; } + + synchronized public List + pullNewlyIncreasedContainers() { + List list = + new ArrayList( + newlyIncreasedContainers.size()); + for (ContainerResourceIncrease ctx : newlyIncreasedContainers) { + list.add(ctx); + } + newlyIncreasedContainers.clear(); + return list; + } + + public synchronized void reserveIncreaseContainer( + ContainerResourceIncreaseRequest increaseRequest) { + if (reservedIncreaseRequests.containsKey(increaseRequest.getContainerId())) { + throw new IllegalStateException( + "failed to reserve this request because a request" + + " with same container-id is already reserved, container-id:" + + increaseRequest.getContainerId().toString()); + } + reservedIncreaseRequests.put(increaseRequest.getContainerId(), + increaseRequest); + } + + public synchronized void allocateIncreaseResource( + ContainerResourceIncrease newAssignment, Resource requiredResource) { + Resources.addTo(currentConsumption, requiredResource); + newlyIncreasedContainers.add(newAssignment); + } + public synchronized void unreserveIncreaseRequest(ContainerId containerId) { + if (!reservedIncreaseRequests.containsKey(containerId)) { + throw new IllegalStateException("unable to unreserve increase request, " + + "req not existed in reserved pool, containerid=" + + containerId.toString()); + } + reservedIncreaseRequests.remove(containerId); + } + + /** + * Has the application reserved the given node at the + * given priority? + * @param node node to be checked + * @param priority priority of reserved container + * @return true is reserved, false if not + */ + public synchronized boolean isReserved(FiCaSchedulerNode node, Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + if (reservedContainers != null) { + return reservedContainers.containsKey(node.getNodeID()); + } + return false; + } + public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) { Map reservedContainers = this.reservedContainers.get(priority); @@ -165,7 +258,7 @@ public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) if (reservedContainers.isEmpty()) { this.reservedContainers.remove(priority); - } + } // Reset the re-reservation count resetReReservations(priority); @@ -177,8 +270,8 @@ public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) + " at priority " + priority + "; currentReservation " + currentReservation); return true; - } - } + } + } return false; } @@ -238,9 +331,9 @@ public synchronized Allocation getAllocation(ResourceCalculator rc, ResourceRequest rr = ResourceRequest.newInstance( Priority.UNDEFINED, ResourceRequest.ANY, minimumAllocation, numCont); - return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(), - null, currentContPreemption, - Collections.singletonList(rr)); + return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(), null, + currentContPreemption, Collections.singletonList(rr), + pullNewlyIncreasedContainers()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 23068fe..c8012f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -54,6 +55,7 @@ private volatile int numContainers; private RMContainer reservedContainer; + private ContainerResourceIncreaseRequest reservedIncreaseRequest; /* set of containers that are allocated containers */ private final Map launchedContainers = @@ -119,6 +121,10 @@ public synchronized void allocateContainer(ApplicationId applicationId, getUsedResource() + " used and " + getAvailableResource() + " available"); } + + public synchronized void increaseResource(Resource resource) { + deductAvailableResource(resource); + } @Override public synchronized Resource getAvailableResource() { @@ -146,6 +152,10 @@ private synchronized void updateResource(Container container) { --numContainers; } + public synchronized void decreaseContainerResource(Resource resource) { + deductAvailableResource(resource); + } + /** * Release an allocated container on this node. * @param container container to be released @@ -239,10 +249,28 @@ public synchronized void reserveResource( } this.reservedContainer = reservedContainer; } + + public synchronized void reserveIncreaseResource( + ContainerResourceIncreaseRequest increaseRequest) { + if (isReserved()) { + throw new IllegalStateException( + "failed to reserve increase resource, this node is already reserved"); + } + reservedIncreaseRequest = increaseRequest; + } + + public synchronized void unreserveIncreaseResource(ContainerId containerId) { + if (reservedIncreaseRequest != null) { + if (reservedIncreaseRequest.getContainerId().equals(containerId)) { + reservedIncreaseRequest = null; + return; + } + } + throw new IllegalStateException("failed to remove reserved increase resource"); + } public synchronized void unreserveResource( - SchedulerApplication application) { - + SchedulerApplication application) { // adding NP checks as this can now be called for preemption if (reservedContainer != null && reservedContainer.getContainer() != null @@ -267,6 +295,11 @@ public synchronized void unreserveResource( public synchronized RMContainer getReservedContainer() { return reservedContainer; } + + public synchronized ContainerResourceIncreaseRequest + getReservedIncreaseRequest() { + return reservedIncreaseRequest; + } @Override public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { @@ -274,4 +307,7 @@ public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { Resources.addTo(this.availableResource, deltaResource); } + public synchronized boolean isReserved() { + return reservedContainer != null || reservedIncreaseRequest != null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 861fad8..0f4f55b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -808,7 +809,9 @@ private synchronized void removeNode(RMNode rmNode) { @Override public Allocation allocate(ApplicationAttemptId appAttemptId, - List ask, List release, List blacklistAdditions, List blacklistRemovals) { + List ask, List release, + List blacklistAdditions, List blacklistRemovals, + List increaseRequests) { // Make sure this application exists FSSchedulerApp application = applications.get(appAttemptId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 956cb49..de9f323 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -248,9 +249,10 @@ public Resource getMaximumResourceCapability() { private static final Allocation EMPTY_ALLOCATION = new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0)); @Override - public Allocation allocate( - ApplicationAttemptId applicationAttemptId, List ask, - List release, List blacklistAdditions, List blacklistRemovals) { + public Allocation allocate(ApplicationAttemptId applicationAttemptId, + List ask, List release, + List blacklistAdditions, List blacklistRemovals, + List increaseRequests) { FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.error("Calling allocate on removed " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index cbce6e3..66b6ca6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -302,7 +302,7 @@ private synchronized void addResourceRequest( // Get resources from the ResourceManager Allocation allocation = resourceManager.getResourceScheduler().allocate( applicationAttemptId, new ArrayList(ask), - new ArrayList(), null, null); + new ArrayList(), null, null, null); System.out.println("-=======" + applicationAttemptId); System.out.println("----------" + resourceManager.getRMContext().getRMApps() .get(applicationId).getRMAppAttempt(applicationAttemptId)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 2e65b0c..6357346 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -54,6 +55,8 @@ private final List requests = new ArrayList(); private final List releases = new ArrayList(); + private final List increaseRequests = + new ArrayList(); public MockAM(RMContext context, ApplicationMasterProtocol amRMProtocol, ApplicationAttemptId attemptId) { @@ -122,11 +125,17 @@ public void addRequests(String[] hosts, int memory, int priority, int containers) throws Exception { requests.addAll(createReq(hosts, memory, priority, containers)); } + + public void addIncreaseRequest(ContainerId containerId, Resource targetCapability) { + increaseRequests.add(ContainerResourceIncreaseRequest.newInstance( + containerId, targetCapability)); + } public AllocateResponse schedule() throws Exception { - AllocateResponse response = allocate(requests, releases); + AllocateResponse response = allocate(requests, releases, increaseRequests); requests.clear(); releases.clear(); + increaseRequests.clear(); return response; } @@ -137,7 +146,7 @@ public AllocateResponse allocate( String host, int memory, int numContainers, List releases) throws Exception { List reqs = createReq(new String[]{host}, memory, 1, numContainers); - return allocate(reqs, releases); + return allocate(reqs, releases, null); } public List createReq(String[] hosts, int memory, int priority, @@ -172,13 +181,18 @@ public ResourceRequest createResourceReq(String resource, int memory, int priori req.setCapability(capability); return req; } + + public AllocateResponse allocate(List resourceRequest, + List releases) throws Exception { + return allocate(resourceRequest, releases, null); + } - public AllocateResponse allocate( - List resourceRequest, List releases) - throws Exception { + public AllocateResponse allocate(List resourceRequest, + List releases, + List increaseRequests) throws Exception { final AllocateRequest req = AllocateRequest.newInstance(++responseId, 0F, resourceRequest, - releases, null); + releases, null, increaseRequests); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(attemptId.toString()); Token token = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 8ef01d9..2ce2f59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -24,6 +24,7 @@ import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; @@ -213,7 +214,11 @@ public void setResourceOption(ResourceOption resourceOption) { public ResourceOption getResourceOption(){ return this.perNode; } - + + @Override + public List pullDecreasedContainers() { + return null; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index c57a752..3515313 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -313,19 +313,19 @@ public void testBlackListNodes() throws Exception { "rack1", BuilderUtils.newResource(GB, 1), 1)); ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); - fs.allocate(appAttemptId1, ask1, emptyId, Collections.singletonList(host_1_0), null); + fs.allocate(appAttemptId1, ask1, emptyId, Collections.singletonList(host_1_0), null, null); // Trigger container assignment fs.handle(new NodeUpdateSchedulerEvent(n3)); // Get the allocation for the application and verify no allocation on blacklist node - Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null); Assert.assertEquals("allocation1", 0, allocation1.getContainers().size()); // verify host_1_1 can get allocated as not in blacklist fs.handle(new NodeUpdateSchedulerEvent(n4)); - Allocation allocation2 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Allocation allocation2 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null); Assert.assertEquals("allocation2", 1, allocation2.getContainers().size()); List containerList = allocation2.getContainers(); for (Container container : containerList) { @@ -339,25 +339,25 @@ public void testBlackListNodes() throws Exception { // be assigned ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); - fs.allocate(appAttemptId1, ask2, emptyId, Collections.singletonList("rack0"), null); + fs.allocate(appAttemptId1, ask2, emptyId, Collections.singletonList("rack0"), null, null); // verify n1 is not qualified to be allocated fs.handle(new NodeUpdateSchedulerEvent(n1)); - Allocation allocation3 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Allocation allocation3 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null); Assert.assertEquals("allocation3", 0, allocation3.getContainers().size()); // verify n2 is not qualified to be allocated fs.handle(new NodeUpdateSchedulerEvent(n2)); - Allocation allocation4 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Allocation allocation4 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null); Assert.assertEquals("allocation4", 0, allocation4.getContainers().size()); // verify n3 is not qualified to be allocated fs.handle(new NodeUpdateSchedulerEvent(n3)); - Allocation allocation5 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Allocation allocation5 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null); Assert.assertEquals("allocation5", 0, allocation5.getContainers().size()); fs.handle(new NodeUpdateSchedulerEvent(n4)); - Allocation allocation6 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Allocation allocation6 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null); Assert.assertEquals("allocation6", 1, allocation6.getContainers().size()); containerList = allocation6.getContainers(); @@ -408,23 +408,23 @@ public void testHeadroom() throws Exception { List ask1 = new ArrayList(); ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); - fs.allocate(appAttemptId1, ask1, emptyId, null, null); + fs.allocate(appAttemptId1, ask1, emptyId, null, null, null); // Ask for a 2 GB container for app 2 List ask2 = new ArrayList(); ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1)); - fs.allocate(appAttemptId2, ask2, emptyId, null, null); + fs.allocate(appAttemptId2, ask2, emptyId, null, null, null); // Trigger container assignment fs.handle(new NodeUpdateSchedulerEvent(n1)); // Get the allocation for the applications and verify headroom - Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null); + Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId, null, null, null); Assert.assertEquals("Allocation headroom", 1 * GB, allocation1.getResourceLimit().getMemory()); - Allocation allocation2 = fs.allocate(appAttemptId2, emptyAsk, emptyId, null, null); + Allocation allocation2 = fs.allocate(appAttemptId2, emptyAsk, emptyId, null, null, null); Assert.assertEquals("Allocation headroom", 1 * GB, allocation2.getResourceLimit().getMemory()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 140b53e..f9a7dc8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -401,7 +401,7 @@ private void testAppAttemptScheduledState() { applicationAttempt.getAppAttemptState()); verify(scheduler, times(expectedAllocateCount)). allocate(any(ApplicationAttemptId.class), - any(List.class), any(List.class), any(List.class), any(List.class)); + any(List.class), any(List.class), any(List.class), any(List.class), any(List.class)); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); @@ -428,7 +428,7 @@ private void testAppAttemptAllocatedState(Container amContainer) { allocate( any( ApplicationAttemptId.class), any(List.class), any(List.class), - any(List.class), any(List.class)); + any(List.class), any(List.class), any(List.class)); } /** @@ -576,6 +576,7 @@ private Container allocateApplicationAttempt() { any(List.class), any(List.class), any(List.class), + any(List.class), any(List.class))). thenReturn(allocation); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index a804138..847acc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -562,11 +562,11 @@ public void testBlackListNodes() throws Exception { // Verify the blacklist can be updated independent of requesting containers cs.allocate(appAttemptId, Collections.emptyList(), Collections.emptyList(), - Collections.singletonList(host), null); + Collections.singletonList(host), null, null); Assert.assertTrue(cs.getApplication(appAttemptId).isBlacklisted(host)); cs.allocate(appAttemptId, Collections.emptyList(), Collections.emptyList(), null, - Collections.singletonList(host)); + Collections.singletonList(host), null); Assert.assertFalse(cs.getApplication(appAttemptId).isBlacklisted(host)); rm.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 3c55b42..64e6250 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -134,7 +134,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, - allocatedResource); + allocatedResource, true); } // Next call - nothing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseContainerResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseContainerResource.java new file mode 100644 index 0000000..00bb304 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseContainerResource.java @@ -0,0 +1,615 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Test; + +public class TestIncreaseContainerResource { + private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); + + private final int GB = 1024; + + @Test(timeout = 3000000) + public void testIncreaseContainerResource() throws Exception { + /* + * This test case is pretty strait-forward, we allocate a container, then + * increase resources for it + */ + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 3 * GB, 4); + + nm1.nodeHeartbeat(true); + + // wait.. + int waitCount = 20; + int size = rm.getRMContext().getRMNodes().size(); + while ((size = rm.getRMContext().getRMNodes().size()) != 2 + && waitCount-- > 0) { + LOG.info("Waiting for node managers to register : " + size); + Thread.sleep(100); + } + Assert.assertEquals(1, rm.getRMContext().getRMNodes().size()); + // Submit an application + RMApp app1 = rm.submitApp(128); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // get existing container id + ContainerId existingContainerId = checkAllocateResource(am1, nm1, "*", GB); + Resource targetCapability = Resource.newInstance(2 * GB, 1); + checkIncreaseResource(am1, nm1, existingContainerId, targetCapability); + + // check resource in node + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(3 * GB, 2)); + + rm.stop(); + } + + @Test(timeout = 3000000) + public void testReserveIncreaseContainerResource() throws Exception { + /* + * This test case is allocate two containers c_1 and c_2 in a node, no more + * spaces on this node Then we will try to increase capa of c_1, this + * increase request will be reserved and satisfied after c_2 released. We + * will check queue's usedResource after each operation + */ + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5 * GB, 4); + + nm1.nodeHeartbeat(true); + + // wait.. + int waitCount = 20; + int size = rm.getRMContext().getRMNodes().size(); + while ((size = rm.getRMContext().getRMNodes().size()) != 2 + && waitCount-- > 0) { + LOG.info("Waiting for node managers to register : " + size); + Thread.sleep(100); + } + Assert.assertEquals(1, rm.getRMContext().getRMNodes().size()); + // Submit an application + RMApp app1 = rm.submitApp(128); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // get existing container id + ContainerId containerId1 = checkAllocateResource(am1, nm1, "*", GB); + + // do allocation for a 2G's container + ContainerId containerId2 = checkAllocateResource(am1, nm1, "*", 2 * GB); + + // here we register another nm to increase cluster resource to make resource + // can be reserved but can not allocated (I do it here because LeafQueue + // will invoke "assignToQueue" to make sure this. + MockNM nm2 = rm.registerNode("127.0.0.1:3399", 2 * GB, 1); + nm2.nodeHeartbeat(true); + + // we will do a check on used resource here for queue + checkQueueResource(cs, Resource.newInstance(4 * GB, 3)); + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(4 * GB, 3)); + + // try to increase, but this will be failed + Resource targetCapability = Resource.newInstance(3 * GB, 1); + checkIncreaseResource(am1, nm1, containerId1, targetCapability, false); + + // in this case, the increase will be reserved, we will check this + FiCaSchedulerNode node = cs.getNode(nm1.getNodeId()); + Assert.assertTrue(node.isReserved()); + Assert.assertTrue(node.getReservedIncreaseRequest() != null + && node.getReservedIncreaseRequest().getContainerId() + .equals(containerId1)); + + // reserved container will not deduct node resource + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(4 * GB, 3)); + + // we will do a check on used resource here for queue + checkQueueResource(cs, Resource.newInstance(6 * GB, 3)); + + // release containerId2, we should get this increase resource allocated + am1.addContainerToBeReleased(containerId2); + int waitCounter = 20; + AllocateResponse alloc1Response = am1.schedule(); + LOG.info("heartbeating nm1"); + while (alloc1Response.getIncreasedContainers().size() < 1 + && waitCounter-- > 0) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(500); + alloc1Response = am1.schedule(); + nm1.nodeHeartbeat(true); + } + Assert.assertEquals(1, alloc1Response.getIncreasedContainers().size()); + ContainerResourceIncrease increasedContext = + alloc1Response.getIncreasedContainers().get(0); + checkIncreaseContext(increasedContext, targetCapability); + checkQueueResource(cs, Resource.newInstance(4 * GB, 2)); + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(4 * GB, 2)); + + // release containerId1, we should free other resources + am1.addContainerToBeReleased(containerId1); + alloc1Response = am1.schedule(); + checkQueueResource(cs, Resource.newInstance(1 * GB, 1)); + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(1 * GB, 1)); + + rm.stop(); + } + + @Test(timeout = 3000000) + public void testReleaseContainerWithReservedIncreaseRequest() + throws Exception { + /* + * We will test a case that like the test above, we create two container c_1 + * and c_2, and let a increase request for c_1. This will be reserved like + * above. Then we will release c_1, we will check if reserved spaces are + * freed + */ + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5 * GB, 4); + + nm1.nodeHeartbeat(true); + + // wait.. + int waitCount = 20; + int size = rm.getRMContext().getRMNodes().size(); + while ((size = rm.getRMContext().getRMNodes().size()) != 2 + && waitCount-- > 0) { + LOG.info("Waiting for node managers to register : " + size); + Thread.sleep(100); + } + Assert.assertEquals(1, rm.getRMContext().getRMNodes().size()); + // Submit an application + RMApp app1 = rm.submitApp(128); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // get existing container id + ContainerId containerId1 = checkAllocateResource(am1, nm1, "*", GB); + + // do allocation for a 2G's container + ContainerId containerId2 = checkAllocateResource(am1, nm1, "*", 2 * GB); + + // here we register another nm to increase cluster resource to make resource + // can be reserved but can not allocated (I do it here because LeafQueue + // will invoke "assignToQueue" to make sure this. + MockNM nm2 = rm.registerNode("127.0.0.1:3399", 2 * GB, 1); + nm2.nodeHeartbeat(true); + + // we will do a check on used resource here for queue + checkQueueResource(cs, Resource.newInstance(4 * GB, 3)); + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(4 * GB, 3)); + + // try to increase, but this will be failed + Resource targetCapability = Resource.newInstance(3 * GB, 1); + checkIncreaseResource(am1, nm1, containerId1, targetCapability, false); + + // in this case, the increase will be reserved, we will check this + FiCaSchedulerNode node = cs.getNode(nm1.getNodeId()); + Assert.assertTrue(node.isReserved()); + Assert.assertTrue(node.getReservedIncreaseRequest() != null + && node.getReservedIncreaseRequest().getContainerId() + .equals(containerId1)); + + // we will do a check on used resource here for queue + checkQueueResource(cs, Resource.newInstance(6 * GB, 3)); + // reserved resource not reflect in node + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(4 * GB, 3)); + + // release containerId1, we should free other resources + am1.addContainerToBeReleased(containerId1); + am1.schedule(); + checkQueueResource(cs, Resource.newInstance(3 * GB, 2)); + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(3 * GB, 2)); + + // we will free c_2 at last + am1.addContainerToBeReleased(containerId2); + am1.schedule(); + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(1 * GB, 1)); + checkQueueResource(cs, Resource.newInstance(1 * GB, 1)); + + rm.stop(); + } + + @Test(timeout = 3000000) + public void testUpdateResourceIncreaseRequestForReserved() throws Exception { + /* + * We will test a test to see if our update resource increase request works, + * 1) Create a reserved increase request in a node, this shouldn't be + * satisfied 2) Submit a increase request that can be satisfied, see if it's + * successfully increased + */ + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5 * GB, 4); + + nm1.nodeHeartbeat(true); + + // wait.. + int waitCount = 20; + int size = rm.getRMContext().getRMNodes().size(); + while ((size = rm.getRMContext().getRMNodes().size()) != 2 + && waitCount-- > 0) { + LOG.info("Waiting for node managers to register : " + size); + Thread.sleep(100); + } + Assert.assertEquals(1, rm.getRMContext().getRMNodes().size()); + // Submit an application + RMApp app1 = rm.submitApp(128); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // get existing container id + ContainerId containerId1 = checkAllocateResource(am1, nm1, "*", GB); + + // here we register another nm to increase cluster resource to make resource + // can be reserved but can not allocated (I do it here because LeafQueue + // will invoke "assignToQueue" to make sure this. + MockNM nm2 = rm.registerNode("127.0.0.1:3399", 2 * GB, 1); + nm2.nodeHeartbeat(true); + + // we will do a check on used resource here for queue + checkQueueResource(cs, Resource.newInstance(2 * GB, 2)); + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(2 * GB, 2)); + + // try to increase, but this will be failed + Resource targetCapability = Resource.newInstance(5 * GB, 1); + checkIncreaseResource(am1, nm1, containerId1, targetCapability, false); + + // in this case, the increase will be reserved, we will check this + FiCaSchedulerNode node = cs.getNode(nm1.getNodeId()); + Assert.assertTrue(node.isReserved()); + Assert.assertTrue(node.getReservedIncreaseRequest() != null + && node.getReservedIncreaseRequest().getContainerId() + .equals(containerId1)); + + // we will do a check on used resource here for queue + checkQueueResource(cs, Resource.newInstance(6 * GB, 2)); + // not reflect in node + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(2 * GB, 2)); + + // try to increase, and this should be succeed + targetCapability = Resource.newInstance(4 * GB, 1); + checkIncreaseResource(am1, nm1, containerId1, targetCapability, true); + checkQueueResource(cs, Resource.newInstance(5 * GB, 2)); + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(5 * GB, 2)); + + rm.stop(); + } + + @Test(timeout = 3000000) + public void testUpdateResourceToCancelReservedResource() throws Exception { + /* + * We will test a test to see if our update resource increase request works, + * 1) Create a reserved increase request in a node, this shouldn't be + * satisfied 2) Submit a increase request that can be satisfied, see if it's + * successfully increased + */ + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5 * GB, 4); + + nm1.nodeHeartbeat(true); + + // wait.. + int waitCount = 20; + int size = rm.getRMContext().getRMNodes().size(); + while ((size = rm.getRMContext().getRMNodes().size()) != 2 + && waitCount-- > 0) { + LOG.info("Waiting for node managers to register : " + size); + Thread.sleep(100); + } + Assert.assertEquals(1, rm.getRMContext().getRMNodes().size()); + // Submit an application + RMApp app1 = rm.submitApp(128); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // get existing container id + ContainerId containerId1 = checkAllocateResource(am1, nm1, "*", GB); + + // here we register another nm to increase cluster resource to make resource + // can be reserved but can not allocated (I do it here because LeafQueue + // will invoke "assignToQueue" to make sure this. + MockNM nm2 = rm.registerNode("127.0.0.1:3399", 2 * GB, 1); + nm2.nodeHeartbeat(true); + + // we will do a check on used resource here for queue + checkQueueResource(cs, Resource.newInstance(2 * GB, 2)); + + // try to increase, but this will be failed + Resource targetCapability = Resource.newInstance(5 * GB, 1); + checkIncreaseResource(am1, nm1, containerId1, targetCapability, false); + + // in this case, the increase will be reserved, we will check this + FiCaSchedulerApp application = + cs.getApplication(containerId1.getApplicationAttemptId()); + FiCaSchedulerNode node = cs.getNode(nm1.getNodeId()); + Assert.assertTrue(node.isReserved()); + Assert.assertTrue(node.getReservedIncreaseRequest() != null + && node.getReservedIncreaseRequest().getContainerId() + .equals(containerId1)); + Assert.assertNotNull(application.getResourceIncreaseRequest( + node.getNodeID(), containerId1)); + + // we will do a check on used resource here for queue + checkQueueResource(cs, Resource.newInstance(6 * GB, 2)); + + // try to increase, and this should cancel previous request + targetCapability = Resource.newInstance(1 * GB, 1); + checkIncreaseResource(am1, nm1, containerId1, targetCapability, false); + checkQueueResource(cs, Resource.newInstance(2 * GB, 2)); + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(2 * GB, 2)); + + // and we will check if this request are removed from app/node + Assert.assertFalse(node.isReserved()); + Assert.assertFalse(node.getReservedIncreaseRequest() != null + && node.getReservedIncreaseRequest().getContainerId() + .equals(containerId1)); + Assert.assertNull(application.getResourceIncreaseRequest(node.getNodeID(), + containerId1)); + + rm.stop(); + } + + @Test(timeout = 3000000) + public void testSubmitInvalidIncreaseRequest() throws Exception { + /* + * We will test a test to see if our update resource increase request works, + * 1) Create a reserved increase request in a node, this shouldn't be + * satisfied 2) Submit a increase request that can be satisfied, see if it's + * successfully increased + */ + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5 * GB, 4); + + nm1.nodeHeartbeat(true); + + // wait.. + int waitCount = 20; + int size = rm.getRMContext().getRMNodes().size(); + while ((size = rm.getRMContext().getRMNodes().size()) != 2 + && waitCount-- > 0) { + LOG.info("Waiting for node managers to register : " + size); + Thread.sleep(100); + } + Assert.assertEquals(1, rm.getRMContext().getRMNodes().size()); + // Submit an application + RMApp app1 = rm.submitApp(128); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // get existing container id + ContainerId containerId1 = checkAllocateResource(am1, nm1, "*", GB); + + // here we register another nm to increase cluster resource to make resource + // can be reserved but can not allocated (I do it here because LeafQueue + // will invoke "assignToQueue" to make sure this. + MockNM nm2 = rm.registerNode("127.0.0.1:3399", 2 * GB, 1); + nm2.nodeHeartbeat(true); + + // we will do a check on used resource here for queue + checkQueueResource(cs, Resource.newInstance(2 * GB, 2)); + + // in this case, the increase will be reserved, we will check this + FiCaSchedulerApp application = + cs.getApplication(containerId1.getApplicationAttemptId()); + FiCaSchedulerNode node = cs.getNode(nm1.getNodeId()); + + // and we will check if this request are removed from app/node + Assert.assertFalse(node.isReserved()); + Assert.assertFalse(node.getReservedIncreaseRequest() != null + && node.getReservedIncreaseRequest().getContainerId() + .equals(containerId1)); + Assert.assertNull(application.getResourceIncreaseRequest(node.getNodeID(), + containerId1)); + + // try to increase, but this will be failed + Resource targetCapability = Resource.newInstance(1 * GB, 0); + checkIncreaseResource(am1, nm1, containerId1, targetCapability, false); + + // we will do a check on used resource here for queue + checkQueueResource(cs, Resource.newInstance(2 * GB, 2)); + + // and we will check if this request are removed from app/node + Assert.assertFalse(node.isReserved()); + Assert.assertFalse(node.getReservedIncreaseRequest() != null + && node.getReservedIncreaseRequest().getContainerId() + .equals(containerId1)); + Assert.assertNull(application.getResourceIncreaseRequest(node.getNodeID(), + containerId1)); + + // try to increase, but this will be failed + targetCapability = Resource.newInstance(GB / 2, 1); + checkIncreaseResource(am1, nm1, containerId1, targetCapability, false); + + // we will do a check on used resource here for queue + checkQueueResource(cs, Resource.newInstance(2 * GB, 2)); + checkNodeUsedResource(cs, nm1.getNodeId(), Resource.newInstance(2 * GB, 2)); + + // and we will check if this request are removed from app/node + Assert.assertFalse(node.isReserved()); + Assert.assertFalse(node.getReservedIncreaseRequest() != null + && node.getReservedIncreaseRequest().getContainerId() + .equals(containerId1)); + Assert.assertNull(application.getResourceIncreaseRequest(node.getNodeID(), + containerId1)); + + rm.stop(); + } + + private ContainerId checkAllocateResource(MockAM am, MockNM nm, + String resourceName, int memory) throws Exception { + return checkAllocateResource(am, nm, resourceName, memory, true); + } + + private void checkNodeUsedResource(CapacityScheduler scheduler, + NodeId nodeId, Resource resource) { + FiCaSchedulerNode node = scheduler.getNode(nodeId); + Assert.assertEquals(resource, node.getUsedResource()); + } + + private void checkQueueResource(CapacityScheduler scheduler, + Resource usedResource) { + CSQueue root = scheduler.getRootQueue(); + Assert.assertTrue("not equal, expected=" + usedResource.toString() + + " actual=" + root.getUsedResources().toString(), + Resources.equals(usedResource, root.getUsedResources())); + // assert child queue as well + for (CSQueue child : root.getChildQueues()) { + Assert.assertTrue("not equal, expected=" + usedResource.toString() + + " actual=" + child.getUsedResources().toString(), + Resources.equals(usedResource, child.getUsedResources())); + } + } + + private ContainerId checkAllocateResource(MockAM am, MockNM nm, + String resourceName, int memory, boolean expectAllocated) + throws Exception { + LOG.info("sending container requests "); + am.addRequests(new String[] { "*" }, memory, 1, 1); + AllocateResponse alloc1Response = am.schedule(); // send the request + + // kick the scheduler + nm.nodeHeartbeat(true); + int waitCounter = 20; + LOG.info("heartbeating nm1"); + while (alloc1Response.getAllocatedContainers().size() < 1 + && waitCounter-- > 0) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(500); + alloc1Response = am.schedule(); + } + LOG.info("received container : " + + alloc1Response.getAllocatedContainers().size()); + + // There should be one container allocated + // Internally it should not been reserved. + if (expectAllocated) { + Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 1); + + // get existing container id + ContainerId containerId = + alloc1Response.getAllocatedContainers().get(0).getId(); + + return containerId; + } else { + Assert.assertTrue(alloc1Response.getAllocatedContainers().size() == 0); + return null; + } + } + + private void checkIncreaseResource(MockAM am, MockNM nm, + ContainerId containerId, Resource targetResource) throws Exception { + checkIncreaseResource(am, nm, containerId, targetResource, true); + } + + private void + checkIncreaseResource(MockAM am, MockNM nm, ContainerId containerId, + Resource targetResource, boolean expectIncreased) throws Exception { + // add increase request + am.addIncreaseRequest(containerId, targetResource); + AllocateResponse alloc1Response = am.schedule(); + + // kick the scheduling again + nm.nodeHeartbeat(true); + int waitCounter = 20; + LOG.info("heartbeating nm1"); + while (alloc1Response.getIncreasedContainers().size() < 1 + && waitCounter-- > 0) { + LOG.info("Waitting for containers to be increased for app 1..."); + Thread.sleep(500); + alloc1Response = am.schedule(); + } + LOG.info("received increased container:" + + alloc1Response.getIncreasedContainers().size()); + + if (expectIncreased) { + Assert.assertEquals(1, alloc1Response.getIncreasedContainers().size()); + ContainerResourceIncrease increasedContainer = + alloc1Response.getIncreasedContainers().get(0); + checkIncreaseContext(increasedContainer, targetResource); + } else { + Assert.assertTrue(0 == alloc1Response.getIncreasedContainers().size()); + } + } + + private void checkIncreaseContext(ContainerResourceIncrease increaseContext, + Resource expectedResource) throws Exception { + Assert.assertEquals(expectedResource, increaseContext.getCapability()); + + // check token received + ContainerTokenIdentifier ti = + BuilderUtils.newContainerTokenIdentifier(increaseContext + .getContainerToken()); + Assert.assertEquals(expectedResource, ti.getResource()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 0348081..05df0ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -147,7 +147,7 @@ public CSAssignment answer(InvocationOnMock invocation) throws Throwable { } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, - allocatedResource); + allocatedResource, true); } // Next call - nothing diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 84ffbc1..39ea692 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -261,7 +261,7 @@ private ApplicationAttemptId createSchedulingRequest(int memory, int vcores, ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, priority, numContainers, true); ask.add(request); - scheduler.allocate(id, ask, new ArrayList(), null, null); + scheduler.allocate(id, ask, new ArrayList(), null, null, null); return id; } @@ -283,7 +283,7 @@ private void createSchedulingRequestExistingApplication(ResourceRequest request, ApplicationAttemptId attId) { List ask = new ArrayList(); ask.add(request); - scheduler.allocate(attId, ask, new ArrayList(), null, null); + scheduler.allocate(attId, ask, new ArrayList(), null, null, null); } // TESTS @@ -801,7 +801,7 @@ public void testQueueDemandCalculation() throws Exception { ResourceRequest request1 = createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true); ask1.add(request1); - scheduler.allocate(id11, ask1, new ArrayList(), null, null); + scheduler.allocate(id11, ask1, new ArrayList(), null, null, null); // Second ask, queue2 requests 1 large + (2 * minReqSize) List ask2 = new ArrayList(); @@ -811,14 +811,14 @@ public void testQueueDemandCalculation() throws Exception { false); ask2.add(request2); ask2.add(request3); - scheduler.allocate(id21, ask2, new ArrayList(), null, null); + scheduler.allocate(id21, ask2, new ArrayList(), null, null, null); // Third ask, queue2 requests 1 large List ask3 = new ArrayList(); ResourceRequest request4 = createResourceRequest(2 * minReqSize, ResourceRequest.ANY, 1, 1, true); ask3.add(request4); - scheduler.allocate(id22, ask3, new ArrayList(), null, null); + scheduler.allocate(id22, ask3, new ArrayList(), null, null, null); scheduler.update(); @@ -1457,7 +1457,7 @@ public void testReservationWhileMultiplePriorities() throws IOException { // Complete container scheduler.allocate(attId, new ArrayList(), - Arrays.asList(containerId), null, null); + Arrays.asList(containerId), null, null, null); assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(4, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); @@ -1538,7 +1538,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception { asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true)); asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true)); - scheduler.allocate(appId, asks, new ArrayList(), null, null); + scheduler.allocate(appId, asks, new ArrayList(), null, null, null); // node 1 checks in scheduler.update(); @@ -1911,7 +1911,7 @@ public void testCancelStrictLocality() throws IOException { createResourceRequest(1024, node1.getHostName(), 1, 0, true), createResourceRequest(1024, "rack1", 1, 0, true), createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true)); - scheduler.allocate(attId1, update, new ArrayList(), null, null); + scheduler.allocate(attId1, update, new ArrayList(), null, null, null); // then node2 should get the container scheduler.handle(node2UpdateEvent); @@ -1956,7 +1956,7 @@ public void testReservationsStrictLocality() throws IOException { anyRequest = createResourceRequest(1024, ResourceRequest.ANY, 1, 1, false); scheduler.allocate(attId, Arrays.asList(rackRequest, anyRequest), - new ArrayList(), null, null); + new ArrayList(), null, null, null); scheduler.handle(nodeUpdateEvent); assertEquals(0, app.getReservedContainers().size()); @@ -2361,7 +2361,7 @@ public void testContinuousScheduling() throws Exception { ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); ask.add(request); - fs.allocate(appAttemptId, ask, new ArrayList(), null, null); + fs.allocate(appAttemptId, ask, new ArrayList(), null, null, null); // waiting for continuous_scheduler_sleep_time // at least one pass @@ -2380,7 +2380,7 @@ public void testContinuousScheduling() throws Exception { createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true); ask.clear(); ask.add(request); - fs.allocate(appAttemptId, ask, new ArrayList(), null, null); + fs.allocate(appAttemptId, ask, new ArrayList(), null, null, null); // Wait until app gets resources while (app.getCurrentConsumption() @@ -2460,11 +2460,11 @@ public void testBlacklistNodes() throws Exception { // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.emptyList(), Collections.emptyList(), - Collections.singletonList(host), null); + Collections.singletonList(host), null, null); assertTrue(app.isBlacklisted(host)); scheduler.allocate(appAttemptId, Collections.emptyList(), Collections.emptyList(), null, - Collections.singletonList(host)); + Collections.singletonList(host), null); assertFalse(scheduler.applications.get(appAttemptId).isBlacklisted(host)); List update = Arrays.asList( @@ -2473,7 +2473,7 @@ public void testBlacklistNodes() throws Exception { // Verify a container does not actually get placed on the blacklisted host scheduler.allocate(appAttemptId, update, Collections.emptyList(), - Collections.singletonList(host), null); + Collections.singletonList(host), null, null); assertTrue(app.isBlacklisted(host)); scheduler.update(); scheduler.handle(updateEvent); @@ -2483,7 +2483,7 @@ public void testBlacklistNodes() throws Exception { // Verify a container gets placed on the empty blacklist scheduler.allocate(appAttemptId, update, Collections.emptyList(), null, - Collections.singletonList(host)); + Collections.singletonList(host), null); assertFalse(app.isBlacklisted(host)); createSchedulingRequest(GB, "root.default", "user", 1); scheduler.update(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index e700704..9909c3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -206,7 +206,7 @@ public void testNodeLocalAssignment() throws Exception { ask.add(nodeLocal); ask.add(rackLocal); ask.add(any); - scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null); + scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null, null); NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); @@ -291,7 +291,7 @@ public void testUpdateResourceOnNode() throws Exception { ask.add(nodeLocal); ask.add(rackLocal); ask.add(any); - scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null); + scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null, null); // Before the node update event, there are one local request Assert.assertEquals(1, nodeLocal.getNumContainers()); @@ -548,11 +548,11 @@ public void testBlackListNodes() throws Exception { // Verify the blacklist can be updated independent of requesting containers fs.allocate(appAttemptId, Collections.emptyList(), Collections.emptyList(), - Collections.singletonList(host), null); + Collections.singletonList(host), null, null); Assert.assertTrue(fs.getApplication(appAttemptId).isBlacklisted(host)); fs.allocate(appAttemptId, Collections.emptyList(), Collections.emptyList(), null, - Collections.singletonList(host)); + Collections.singletonList(host), null); Assert.assertFalse(fs.getApplication(appAttemptId).isBlacklisted(host)); rm.stop(); }