diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java index e9de05227ec..add44a39103 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java @@ -55,12 +55,16 @@ public static ResourceOption newInstance(Resource resource, * Get timeout for tolerant of resource over-commitment * Note: negative value means no timeout so that allocated containers will * keep running until the end even under resource over-commitment cases. - * @return overCommitTimeout of the ResourceOption + * @return overCommitTimeout of the ResourceOption in milliseconds */ @Private @Evolving public abstract int getOverCommitTimeout(); - + + /** + * Set the over commit timeout. + * @param overCommitTimeout Timeout in ms. Negative means no timeout. + */ @Private @Evolving protected abstract void setOverCommitTimeout(int overCommitTimeout); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index c50950bbd9a..e82cd3cff15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -677,6 +677,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) if (capability != null) { nodeHeartBeatResponse.setResource(capability); } + // Check if we got an event (AdminService) that updated the resources + if (rmNode.isUpdatedCapability()) { + nodeHeartBeatResponse.setResource(rmNode.getTotalCapability()); + rmNode.resetUpdatedCapability(); + } // 7. Send Container Queuing Limits back to the Node. This will be used by // the node to truncate the number of Containers queued for execution. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index c77d29c89ae..d3b515e8241 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -104,6 +104,17 @@ */ public Resource getTotalCapability(); + /** + * If the total available resources has been updated. + * @return If the capability has been updated. + */ + boolean isUpdatedCapability(); + + /** + * Mark that the updated event has been processed. + */ + void resetUpdatedCapability(); + /** * the aggregated resource utilization of the containers. * @return the aggregated resource utilization of the containers. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index d33ee44de4d..3b4ccc60524 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -125,6 +125,7 @@ /* Snapshot of total resources before receiving decommissioning command */ private volatile Resource originalTotalCapability; private volatile Resource totalCapability; + private volatile boolean updatedCapability = false; private final Node node; private String healthReport; @@ -455,6 +456,16 @@ public Resource getTotalCapability() { return this.totalCapability; } + @Override + public boolean isUpdatedCapability() { + return this.updatedCapability; + } + + @Override + public void resetUpdatedCapability() { + this.updatedCapability = false; + } + @Override public String getRackName() { return node.getNetworkLocation(); @@ -813,11 +824,12 @@ private static void handleRunningAppOnNode(RMNodeImpl rmNode, .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); } - private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, - RMNodeResourceUpdateEvent event){ - ResourceOption resourceOption = event.getResourceOption(); - // Set resource on RMNode - rmNode.totalCapability = resourceOption.getResource(); + private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, + RMNodeResourceUpdateEvent event){ + ResourceOption resourceOption = event.getResourceOption(); + // Set resource on RMNode + rmNode.totalCapability = resourceOption.getResource(); + rmNode.updatedCapability = true; } private static NodeHealthStatus updateRMNodeFromStatusEvents( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index a798b97af5f..55df40e879e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -92,16 +92,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; - - - +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -118,6 +118,8 @@ private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); + private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); + protected final ClusterNodeTracker nodeTracker = new ClusterNodeTracker<>(); @@ -830,6 +832,7 @@ public void updateNodeResource(RMNode nm, writeLock.lock(); SchedulerNode node = getSchedulerNode(nm.getNodeID()); Resource newResource = resourceOption.getResource(); + final int timeout = resourceOption.getOverCommitTimeout(); Resource oldResource = node.getTotalResource(); if (!oldResource.equals(newResource)) { // Notify NodeLabelsManager about this change @@ -838,14 +841,17 @@ public void updateNodeResource(RMNode nm, // Log resource change LOG.info("Update resource on node: " + node.getNodeName() + " from: " - + oldResource + ", to: " + newResource); + + oldResource + ", to: " + newResource + " in " + timeout + " ms"); nodeTracker.removeNode(nm.getNodeID()); // update resource to node node.updateTotalResource(newResource); + node.setOvercommitTimeOut(timeout); + markContainersIfOvercommitted(node); nodeTracker.addNode((N) node); + } else{ // Log resource change LOG.warn("Update resource on node: " + node.getNodeName() @@ -1188,6 +1194,10 @@ protected void nodeUpdate(RMNode nm) { updateNodeResourceUtilization(nm, schedulerNode); } + if (schedulerNode != null) { + markContainersIfOvercommitted(schedulerNode); + } + // Now node data structures are up-to-date and ready for scheduling. if(LOG.isDebugEnabled()) { LOG.debug( @@ -1197,6 +1207,52 @@ protected void nodeUpdate(RMNode nm) { } } + /** + * Check if the node is over committed and needs to remove containers. It + * allows marking them for PREEMPT (notify the AM) or KILL. + * @param schedulerNode The node to check whether is over committed. + * @param kill If the container should be killed or just notify the AM. + */ + private void markContainersIfOvercommitted(SchedulerNode schedulerNode) { + + // If there is no time out, we don't mark anything + if (!schedulerNode.isOvercommitTimeOutSet()) { + return; + } + + // Check if the node is over committed (negative resources) + ResourceCalculator rc = getResourceCalculator(); + Resource unallocated = Resource.newInstance( + schedulerNode.getUnallocatedResource()); + if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) { + return; + } + + // Only send KILL requests if we passed the time out + SchedulerEventType eventType = + SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; + if (schedulerNode.isOvercommitTimedOut()) { + eventType = SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; + } + + LOG.debug(schedulerNode.getNodeID() + " is over committed (" + + schedulerNode.getUnallocatedResource() + "), free up resources"); + for (RMContainer container : schedulerNode.getContainersToKill()) { + LOG.info("Send " + eventType + " to " + container.getContainerId() + + " to free up " + container.getAllocatedResource()); + ApplicationAttemptId appId = container.getApplicationAttemptId(); + ContainerPreemptEvent event = + new ContainerPreemptEvent(appId, container, eventType); + this.rmContext.getDispatcher().getEventHandler().handle(event); + Resources.addTo(unallocated, container.getAllocatedResource()); + + if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) { + LOG.debug("Enough free resources " + unallocated); + break; + } + } + } + @Override public Resource getNormalizedResource(Resource requestedResource, Resource maxResourceCapability) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index b35aeba83b3..4db9c45d1d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.HashMap; import java.util.List; @@ -26,6 +27,8 @@ import java.util.Set; import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.lang3.builder.CompareToBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -69,6 +72,8 @@ ResourceUtilization.newInstance(0, 0, 0f); private volatile ResourceUtilization nodeUtilization = ResourceUtilization.newInstance(0, 0, 0f); + /** Time stamp for over committed resources to time out. */ + private long overcommitTimeout = -1; /* set of containers that are allocated containers */ private final Map launchedContainers = @@ -118,6 +123,38 @@ public synchronized void updateTotalResource(Resource resource){ this.allocatedResource); } + /** + * Set the timeout for the node to stop over committing the resources. After + * this time the scheduler will start killing containers until the resources + * are not over committed anymore. This may reset a previous timeout. + * @param timeOut Time out in milliseconds. + */ + public synchronized void setOvercommitTimeOut(long timeOut) { + if (timeOut >= 0) { + if (this.overcommitTimeout != -1) { + LOG.debug("The over commit timeout for " + getNodeID() + + " was already set to " + this.overcommitTimeout); + } + this.overcommitTimeout = Time.now() + timeOut; + } + } + + /** + * Check if the time out has passed. + * @return If the node is over committed. + */ + public synchronized boolean isOvercommitTimedOut() { + return this.overcommitTimeout >= 0 && Time.now() >= this.overcommitTimeout; + } + + /** + * Check if the node has a time out for over commit resources. + * @return If the node has a time out for over commit resources. + */ + public synchronized boolean isOvercommitTimeOutSet() { + return this.overcommitTimeout >= 0; + } + /** * Get the ID of the node which contains both its hostname and port. * @return The ID of the node. @@ -371,6 +408,36 @@ public int getNumContainers() { return result; } + /** + * Get the containers running on the node ordered by which to kill first. It + * tries to kill AMs last, then GUARANTEED containers, and it kills + * OPPORTUNISTIC first. If the same time, it uses the creation time. + * @return A copy of the running containers ordered by which to kill first. + */ + public List getContainersToKill() { + List result = getLaunchedContainers(); + Collections.sort(result, (c1, c2) -> { + return new CompareToBuilder() + .append(c1.isAMContainer(), c2.isAMContainer()) + .append(c2.getExecutionType(), c1.getExecutionType()) // reversed + .append(c2.getCreationTime(), c1.getCreationTime()) // reversed + .toComparison(); + }); + return result; + } + + /** + * Get the containers launched in the node. + * @return List of containers launched. + */ + protected synchronized List getLaunchedContainers() { + List result = new ArrayList<>(); + for (ContainerInfo info : launchedContainers.values()) { + result.add(info.container); + } + return result; + } + /** * Get the container for the specified container ID. * @param containerId The container ID diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index c0af0413a0f..3b72ca1c0e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -190,6 +190,15 @@ public Resource getTotalCapability() { return this.perNode; } + @Override + public boolean isUpdatedCapability() { + return false; + } + + @Override + public void resetUpdatedCapability() { + } + @Override public String getRackName() { return this.rackName; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index ba409b1386b..7e1680e6c98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -18,12 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.*; @@ -1018,4 +1021,94 @@ public void testContainerRecoveredByNode() throws Exception { System.out.println("Stopping testContainerRecoveredByNode"); } } + + /** + * Test the order we get the containers to kill. It should respect the order + * described in {@link SchedulerNode#getContainersToKill()}. + */ + @Test + public void testGetRunningContainersToKill() { + final SchedulerNode node = new MockSchedulerNode(); + assertEquals(Collections.emptyList(), node.getContainersToKill()); + + // AM + RMContainer am0 = newMockRMContainer( + true, ExecutionType.GUARANTEED, "AM0"); + node.allocateContainer(am0); + assertEquals(Arrays.asList(am0), node.getContainersToKill()); + + // OPPORTUNISTIC, AM0 + RMContainer opp0 = newMockRMContainer( + false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC0"); + node.allocateContainer(opp0); + assertEquals(Arrays.asList(opp0, am0), node.getContainersToKill()); + + // OPPORTUNISTIC, GUARANTEED, AM0 + RMContainer regular0 = newMockRMContainer( + false, ExecutionType.GUARANTEED, "GUARANTEED0"); + node.allocateContainer(regular0); + assertEquals(Arrays.asList(opp0, regular0, am0), + node.getContainersToKill()); + + // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM0 + RMContainer opp1 = newMockRMContainer( + false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC1"); + node.allocateContainer(opp1); + assertEquals(Arrays.asList(opp1, opp0, regular0, am0), + node.getContainersToKill()); + + // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM1, AM0 + RMContainer am1 = newMockRMContainer( + true, ExecutionType.GUARANTEED, "AM1"); + node.allocateContainer(am1); + assertEquals(Arrays.asList(opp1, opp0, regular0, am1, am0), + node.getContainersToKill()); + + // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED1, GUARANTEED0, AM1, AM0 + RMContainer regular1 = newMockRMContainer( + false, ExecutionType.GUARANTEED, "GUARANTEED0"); + node.allocateContainer(regular1); + assertEquals(Arrays.asList(opp1, opp0, regular1, regular0, am1, am0), + node.getContainersToKill()); + } + + private static RMContainer newMockRMContainer(boolean isAMContainer, + ExecutionType executionType, String name) { + RMContainer container = mock(RMContainer.class); + when(container.isAMContainer()).thenReturn(isAMContainer); + when(container.getExecutionType()).thenReturn(executionType); + when(container.getCreationTime()).thenReturn(Time.now()); + when(container.toString()).thenReturn(name); + return container; + } + + /** + * SchedulerNode mock to test launching containers. + */ + class MockSchedulerNode extends SchedulerNode { + private final List containers = new ArrayList<>(); + + MockSchedulerNode() { + super(MockNodes.newNodeInfo(0, Resource.newInstance(1, 1)), false); + } + + @Override + protected List getLaunchedContainers() { + return containers; + } + + @Override + public void allocateContainer(RMContainer rmContainer) { + containers.add(rmContainer); + // Shuffle for testing + Collections.shuffle(containers); + } + + @Override + public void reserveResource(SchedulerApplicationAttempt attempt, + SchedulerRequestKey schedulerKey, RMContainer container) {} + + @Override + public void unreserveResource(SchedulerApplicationAttempt attempt) {} + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java index 60e25ed83ac..92d866ed722 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java @@ -27,7 +27,7 @@ import java.util.Set; public class CapacitySchedulerTestBase { - protected final int GB = 1024; + protected final static int GB = 1024; protected static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; protected static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index aac7f15a5a5..228f9a380c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -41,12 +42,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import com.google.common.collect.Sets; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.net.NetworkTopology; @@ -57,6 +57,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -68,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -76,12 +78,13 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.PreemptionContainer; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -94,8 +97,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; -import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.Application; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -178,9 +179,12 @@ import com.google.common.collect.ImmutableSet; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestCapacityScheduler extends CapacitySchedulerTestBase { - private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); + private static final Logger LOG = + LoggerFactory.getLogger(TestCapacityScheduler.class); private final static ContainerUpdates NULL_UPDATE_REQUESTS = new ContainerUpdates(); private ResourceManager resourceManager = null; @@ -1309,110 +1313,152 @@ public void testAllocateReorder() throws Exception { @Test public void testResourceOverCommit() throws Exception { - int waitCount; Configuration conf = new Configuration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); MockRM rm = new MockRM(conf); rm.start(); + ResourceScheduler scheduler = rm.getResourceScheduler(); - MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB); - RMApp app1 = rm.submitApp(2048); - // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1 - nm1.nodeHeartbeat(true); - RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); - MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); - am1.registerAppAttempt(); - SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport( - nm1.getNodeId()); - // check node report, 2 GB used and 2 GB available - Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize()); - Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemorySize()); + MockNM nm = rm.registerNode("127.0.0.1:1234", 4 * GB); + NodeId nmId = nm.getNodeId(); + RMApp app = rm.submitApp(2048); + // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm + nm.nodeHeartbeat(true); + RMAppAttempt attempt1 = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am.registerAppAttempt(); + assertMemory(scheduler, nmId, 2 * GB, 2 * GB); - // add request for containers - am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1); - AllocateResponse alloc1Response = am1.schedule(); // send the request + // add request for 1 container of 2 GB + am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 1); + AllocateResponse alloc1Response = am.schedule(); // send the request // kick the scheduler, 2 GB given to AM1, resource remaining 0 - nm1.nodeHeartbeat(true); - while (alloc1Response.getAllocatedContainers().size() < 1) { + nm.nodeHeartbeat(true); + while (alloc1Response.getAllocatedContainers().isEmpty()) { LOG.info("Waiting for containers to be created for app 1..."); Thread.sleep(100); - alloc1Response = am1.schedule(); + alloc1Response = am.schedule(); } List allocated1 = alloc1Response.getAllocatedContainers(); - Assert.assertEquals(1, allocated1.size()); - Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize()); - Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId()); - - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - // check node report, 4 GB used and 0 GB available - Assert.assertEquals(0, report_nm1.getAvailableResource().getMemorySize()); - Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize()); - - // check container is assigned with 2 GB. + assertEquals(1, allocated1.size()); Container c1 = allocated1.get(0); - Assert.assertEquals(2 * GB, c1.getResource().getMemorySize()); - - // update node resource to 2 GB, so resource is over-consumed. - Map nodeResourceMap = - new HashMap(); - nodeResourceMap.put(nm1.getNodeId(), - ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1)); - UpdateNodeResourceRequest request = - UpdateNodeResourceRequest.newInstance(nodeResourceMap); - AdminService as = ((MockRM)rm).getAdminService(); - as.updateNodeResource(request); - - waitCount = 0; - while (waitCount++ != 20) { - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - if (report_nm1.getAvailableResource().getMemorySize() != 0) { - break; - } - LOG.info("Waiting for RMNodeResourceUpdateEvent to be handled... Tried " - + waitCount + " times already.."); - Thread.sleep(1000); - } - // Now, the used resource is still 4 GB, and available resource is minus value. - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize()); - Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize()); + assertEquals(2 * GB, c1.getResource().getMemorySize()); + assertEquals(nmId, c1.getNodeId()); - // Check container can complete successfully in case of resource over-commitment. + // check node report, 4 GB used and 0 GB available + assertMemory(scheduler, nmId, 4 * GB, 0); + nm.nodeHeartbeat(true); + assertEquals(4 * GB, nm.getCapability().getMemorySize()); + + // update node resource to 2 GB, so resource is over-consumed + updateNodeResource(rm, nmId, 2 * GB, 2, -1); + // the used resource should still 4 GB and negative available resource + waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000); + // check that we did not get a preemption requests + assertNull(am.schedule().getPreemptionMessage()); + + // check that the NM got the updated resources + nm.nodeHeartbeat(true); + assertEquals(2 * GB, nm.getCapability().getMemorySize()); + + // check container can complete successfully with resource over-commitment ContainerStatus containerStatus = BuilderUtils.newContainerStatus( c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource()); - nm1.containerStatus(containerStatus); - waitCount = 0; - while (attempt1.getJustFinishedContainers().size() < 1 - && waitCount++ != 20) { - LOG.info("Waiting for containers to be finished for app 1... Tried " - + waitCount + " times already.."); + nm.containerStatus(containerStatus); + + LOG.info("Waiting for containers to be finished for app 1..."); + GenericTestUtils.waitFor( + () -> attempt1.getJustFinishedContainers().size() == 1, 100, 2000); + assertEquals(1, am.schedule().getCompletedContainersStatuses().size()); + assertMemory(scheduler, nmId, 2 * GB, 0); + + // verify no NPE is trigger in schedule after resource is updated + am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1); + AllocateResponse allocResponse2 = am.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); + // try 10 times as scheduling is an async process + for (int i = 0; i < 10; i++) { Thread.sleep(100); + allocResponse2 = am.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); } - Assert.assertEquals(1, attempt1.getJustFinishedContainers().size()); - Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size()); - report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId()); - Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize()); - // As container return 2 GB back, the available resource becomes 0 again. - Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize()); - - // Verify no NPE is trigger in schedule after resource is updated. - am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1); - alloc1Response = am1.schedule(); - Assert.assertEquals("Shouldn't have enough resource to allocate containers", - 0, alloc1Response.getAllocatedContainers().size()); - int times = 0; - // try 10 times as scheduling is async process. - while (alloc1Response.getAllocatedContainers().size() < 1 - && times++ < 10) { - LOG.info("Waiting for containers to be allocated for app 1... Tried " - + times + " times already.."); + + // increase the resources again to 5 GB to schedule the 3GB container + updateNodeResource(rm, nmId, 5 * GB, 2, -1); + waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000); + + // kick the scheduling and check it took effect + nm.nodeHeartbeat(true); + while (allocResponse2.getAllocatedContainers().isEmpty()) { + LOG.info("Waiting for containers to be created for app 1..."); Thread.sleep(100); + allocResponse2 = am.schedule(); } - Assert.assertEquals("Shouldn't have enough resource to allocate containers", - 0, alloc1Response.getAllocatedContainers().size()); + assertEquals(1, allocResponse2.getAllocatedContainers().size()); + Container c2 = allocResponse2.getAllocatedContainers().get(0); + assertEquals(3 * GB, c2.getResource().getMemorySize()); + assertEquals(nmId, c2.getNodeId()); + assertMemory(scheduler, nmId, 5 * GB, 0); + + // reduce the resources and trigger a preempt request to the AM for c2 + updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000); + waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000); + + PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage(); + assertNotNull(preemptMsg); + Set preemptContainers = + preemptMsg.getContract().getContainers(); + assertEquals(1, preemptContainers.size()); + PreemptionContainer preemptContainer = preemptContainers.iterator().next(); + assertEquals(c2.getId(), preemptContainer.getId()); + + // increasing the resources again, should stop killing the containers + updateNodeResource(rm, nmId, 5 * GB, 2, -1); + waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000); + Thread.sleep(3 * 1000); + assertMemory(scheduler, nmId, 5 * GB, 0); + + // reduce the resources again to trigger a preempt request to the AM for c2 + long t0 = Time.now(); + updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000); + waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000); + + preemptMsg = am.schedule().getPreemptionMessage(); + assertNotNull(preemptMsg); + preemptContainers = preemptMsg.getContract().getContainers(); + assertEquals(1, preemptContainers.size()); + preemptContainer = preemptContainers.iterator().next(); + assertEquals(c2.getId(), preemptContainer.getId()); + + // wait until the scheduler kills the container + GenericTestUtils.waitFor(() -> { + try { + nm.nodeHeartbeat(true); // trigger preemption in the NM + } catch (Exception e) { + LOG.error("Cannot heartbeat", e); + } + SchedulerNodeReport report = scheduler.getNodeReport(nmId); + return report.getAvailableResource().getMemorySize() > 0; + }, 200, 5 * 1000); + assertMemory(scheduler, nmId, 2 * GB, 1 * GB); + + List completedContainers = + am.schedule().getCompletedContainersStatuses(); + assertEquals(1, completedContainers.size()); + ContainerStatus c2status = completedContainers.get(0); + assertContainerKilled(c2.getId(), c2status); + + long timeToKill = Time.now() - t0; + assertTrue("Took too short to kill: " + timeToKill + "ms", + timeToKill > 2000); + assertTrue("Took too long to kill: " + timeToKill + "ms", + timeToKill < 2500); + rm.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java new file mode 100644 index 00000000000..f61a93e513d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java @@ -0,0 +1,438 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.PreemptionContainer; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test changing resources and over commit + */ +public class TestCapacitySchedulerOvercommit + extends CapacitySchedulerTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(TestCapacitySchedulerOvercommit.class); + + + /** Mock Resource Manager. */ + private MockRM rm; + /** Scheduler for the Mock Resource Manager.*/ + private ResourceScheduler scheduler; + + /** Node Manager running containers. */ + private MockNM nm; + private NodeId nmId; + + /** Application to allocate containers. */ + private RMAppAttempt attempt; + private MockAM am; + + + @Before + public void setUp() throws Exception { + + // Start the Resource Manager + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class, ResourceScheduler.class); + rm = new MockRM(conf); + rm.start(); + scheduler = rm.getResourceScheduler(); + + // Add a Node Manager with 4GB + nm = rm.registerNode("127.0.0.1:1234", 4 * GB); + nmId = nm.getNodeId(); + + // Start an AM with 2GB + RMApp app = rm.submitApp(2048); + nm.nodeHeartbeat(true); + attempt = app.getCurrentAppAttempt(); + am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + + // After allocation, used 2GB and remaining 2GB on the NM + assertMemory(scheduler, nmId, 2 * GB, 2 * GB); + nm.nodeHeartbeat(true); + } + + @After + public void tearDown() throws Exception { + if (am != null) { + am.unregisterAppAttempt(); + am = null; + } + if (rm != null) { + rm.stop(); + rm = null; + } + } + + @Test + public void testChangeResourcesNoTimeout() throws Exception { + waitMemory(scheduler, nmId, 2 * GB, 2 * GB, 100, 2 * 1000); + + updateNodeResource(rm, nmId, 5 * GB, 2, -1); + waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 2 * 1000); + + updateNodeResource(rm, nmId, 0 * GB, 2, -1); + waitMemory(scheduler, nmId, 2 * GB, -2 * GB, 100, 2 * 1000); + + updateNodeResource(rm, nmId, 4 * GB, 2, -1); + waitMemory(scheduler, nmId, 2 * GB, 2 * GB, 100, 2 * 1000); + } + + /** + * Create a container with a particular size and make sure it succeeds. + * @param memory Memory of the container. + * @return Newly created container. + * @throws Exception + */ + private Container createContainer(int memory) throws Exception { + + // add request for 1 container of 2 GB + am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, memory, 1, 1); + AllocateResponse allocResponse = am.schedule(); // send the request + nm.nodeHeartbeat(true); + while (allocResponse.getAllocatedContainers().isEmpty()) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + allocResponse = am.schedule(); + } + + List allocated = allocResponse.getAllocatedContainers(); + assertEquals(1, allocated.size()); + final Container c = allocated.get(0); + assertEquals(memory, c.getResource().getMemorySize()); + assertEquals(nmId, c.getNodeId()); + return c; + } + + /** + * Reducing the resources with no timeout should prevent new containers + * but wait for the current ones without killing. + */ + @Test + public void testReduceNoTimeout() throws Exception { + + // new 2GB container should give 4 GB used (2+2) and 0 GB available + Container c1 = createContainer(2 * GB); + assertMemory(scheduler, nmId, 4 * GB, 0); + + // update node resource to 2 GB, so resource is over-consumed + updateNodeResource(rm, nmId, 2 * GB, 2, -1); + // the used resource should still 4 GB and negative available resource + waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000); + // check that we did not get a preemption request + assertNull(am.schedule().getPreemptionMessage()); + + // check that the NM got the updated resources + nm.nodeHeartbeat(true); + assertEquals(2 * GB, nm.getCapability().getMemorySize()); + + // check container can complete successfully with resource over-commitment + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource()); + nm.containerStatus(containerStatus); + + LOG.info("Waiting for container to be finished for app..."); + GenericTestUtils.waitFor( + () -> attempt.getJustFinishedContainers().size() == 1, 100, 2000); + assertEquals(1, am.schedule().getCompletedContainersStatuses().size()); + assertMemory(scheduler, nmId, 2 * GB, 0); + + // verify no NPE is trigger in schedule after resource is updated + am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1); + AllocateResponse allocResponse2 = am.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); + // try 10 times as scheduling is an async process + for (int i = 0; i < 10; i++) { + Thread.sleep(100); + allocResponse2 = am.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); + } + } + + /** + * Reducing the resources with 0 time out kills the container right away. + */ + @Test + public void testReduceKill() throws Exception { + + Container container = createContainer(2 * GB); + assertMemory(scheduler, nmId, 4 * GB, 0); + + // reducing to 2GB should kill c1 + long t0 = Time.now(); + updateNodeResource(rm, nmId, 2 * GB, 2, 0); + nm.nodeHeartbeat(true); + waitMemory(scheduler, nmId, 2 * GB, 0 * GB, 200, 5 * 1000); + + // check that the new container was the one killed + List completedContainers = + am.schedule().getCompletedContainersStatuses(); + assertEquals(1, completedContainers.size()); + ContainerStatus containerStatus = completedContainers.get(0); + assertContainerKilled(container.getId(), containerStatus); + + // check the time for killing + long timeToKill = Time.now() - t0; + assertTrue("Too long to kill: " + timeToKill + "ms", timeToKill < 500); + } + + @Test + public void testReducePreemptAndKill() throws Exception { + // TODO + } + + @Test + public void testEndToEnd() throws Exception { + + Container c1 = createContainer(2 * GB); + assertMemory(scheduler, nmId, 4 * GB, 0); + + // check node report, 4 GB used and 0 GB available + assertMemory(scheduler, nmId, 4 * GB, 0); + nm.nodeHeartbeat(true); + assertEquals(4 * GB, nm.getCapability().getMemorySize()); + + // update node resource to 2 GB, so resource is over-consumed + updateNodeResource(rm, nmId, 2 * GB, 2, -1); + // the used resource should still 4 GB and negative available resource + waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000); + // check that we did not get a preemption requests + assertNull(am.schedule().getPreemptionMessage()); + + // check that the NM got the updated resources + nm.nodeHeartbeat(true); + assertEquals(2 * GB, nm.getCapability().getMemorySize()); + + // check container can complete successfully with resource over-commitment + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource()); + nm.containerStatus(containerStatus); + + LOG.info("Waiting for containers to be finished for app 1..."); + GenericTestUtils.waitFor( + () -> attempt.getJustFinishedContainers().size() == 1, 100, 2000); + assertEquals(1, am.schedule().getCompletedContainersStatuses().size()); + assertMemory(scheduler, nmId, 2 * GB, 0); + + // verify no NPE is trigger in schedule after resource is updated + am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1); + AllocateResponse allocResponse2 = am.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); + // try 10 times as scheduling is an async process + for (int i = 0; i < 10; i++) { + Thread.sleep(100); + allocResponse2 = am.schedule(); + assertTrue("Shouldn't have enough resource to allocate containers", + allocResponse2.getAllocatedContainers().isEmpty()); + } + + // increase the resources again to 5 GB to schedule the 3GB container + updateNodeResource(rm, nmId, 5 * GB, 2, -1); + waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000); + + // kick the scheduling and check it took effect + nm.nodeHeartbeat(true); + while (allocResponse2.getAllocatedContainers().isEmpty()) { + LOG.info("Waiting for containers to be created for app 1..."); + Thread.sleep(100); + allocResponse2 = am.schedule(); + } + assertEquals(1, allocResponse2.getAllocatedContainers().size()); + Container c2 = allocResponse2.getAllocatedContainers().get(0); + assertEquals(3 * GB, c2.getResource().getMemorySize()); + assertEquals(nmId, c2.getNodeId()); + assertMemory(scheduler, nmId, 5 * GB, 0); + + // reduce the resources and trigger a preempt request to the AM for c2 + updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000); + waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000); + + PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage(); + assertNotNull(preemptMsg); + Set preemptContainers = + preemptMsg.getContract().getContainers(); + assertEquals(1, preemptContainers.size()); + PreemptionContainer preemptContainer = preemptContainers.iterator().next(); + assertEquals(c2.getId(), preemptContainer.getId()); + + // increasing the resources again, should stop killing the containers + updateNodeResource(rm, nmId, 5 * GB, 2, -1); + waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000); + Thread.sleep(3 * 1000); + assertMemory(scheduler, nmId, 5 * GB, 0); + + // reduce the resources again to trigger a preempt request to the AM for c2 + long t0 = Time.now(); + updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000); + waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000); + + preemptMsg = am.schedule().getPreemptionMessage(); + assertNotNull(preemptMsg); + preemptContainers = preemptMsg.getContract().getContainers(); + assertEquals(1, preemptContainers.size()); + preemptContainer = preemptContainers.iterator().next(); + assertEquals(c2.getId(), preemptContainer.getId()); + + // wait until the scheduler kills the container + GenericTestUtils.waitFor(() -> { + try { + nm.nodeHeartbeat(true); // trigger preemption in the NM + } catch (Exception e) { + LOG.error("Cannot heartbeat", e); + } + SchedulerNodeReport report = scheduler.getNodeReport(nmId); + return report.getAvailableResource().getMemorySize() > 0; + }, 200, 5 * 1000); + assertMemory(scheduler, nmId, 2 * GB, 1 * GB); + + List completedContainers = + am.schedule().getCompletedContainersStatuses(); + assertEquals(1, completedContainers.size()); + ContainerStatus c2status = completedContainers.get(0); + assertContainerKilled(c2.getId(), c2status); + + long timeToKill = Time.now() - t0; + assertTrue("Took too short to kill: " + timeToKill + "ms", + timeToKill > 2000); + assertTrue("Took too long to kill: " + timeToKill + "ms", + timeToKill < 2500); + } + + /** + * Update the resources on a Node Manager. + * @param rm Resource Manager to contact. + * @param nmId Identifier of the Node Manager. + * @param memory Memory in MB. + * @param vCores Number of virtual cores. + * @param overcommitTimeout Timeout for over commit. + * @throws Exception If the update cannot be completed. + */ + public static void updateNodeResource(MockRM rm, NodeId nmId, + int memory, int vCores, int overcommitTimeout) throws Exception { + AdminService admin = rm.getAdminService(); + ResourceOption resourceOption = ResourceOption.newInstance( + Resource.newInstance(memory, vCores), overcommitTimeout); + UpdateNodeResourceRequest req = UpdateNodeResourceRequest.newInstance( + Collections.singletonMap(nmId, resourceOption)); + admin.updateNodeResource(req); + } + + /** + * Make sure that the container was killed. + * @param containerId Expected container identifier. + * @param status Container status to check. + */ + public static void assertContainerKilled( + ContainerId containerId, ContainerStatus status) { + assertEquals(containerId, status.getContainerId()); + assertEquals(ContainerState.COMPLETE, status.getState()); + assertEquals(ContainerExitStatus.PREEMPTED, status.getExitStatus()); + assertEquals("Container preempted by scheduler", status.getDiagnostics()); + } + + /** + * Check if a node report has the expected memory values. + * @param scheduler Scheduler with the data. + * @param nmId Identifier of the node to check. + * @param expectedUsed The expected used memory in MB. + * @param expectedAvailable The expected available memory in MB. + */ + public static void assertMemory(ResourceScheduler scheduler, NodeId nmId, + long expectedUsed, long expectedAvailable) { + SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId); + assertNotNull(nmReport); + Resource used = nmReport.getUsedResource(); + assertEquals("Used memory", expectedUsed, used.getMemorySize()); + Resource available = nmReport.getAvailableResource(); + assertEquals("Available memory", expectedAvailable, available.getMemorySize()); + } + + /** + * Wait until the memory of a NM is at a given point. + * @param scheduler Scheduler with the data. + * @param nmId Identifier of the node to check. + * @param expectedUsed The expected used memory in MB. + * @param expectedAvailable The expected available memory in MB. + * @param checkEveryMillis How often to perform the test in ms. + * @param waitForMillis The maximum time to wait. + * @throws Exception If we don't get to the expected memory. + */ + public static void waitMemory(ResourceScheduler scheduler, NodeId nmId, + int expectedUsed, int expectedAvailable, + int checkEveryMillis, int waitForMillis) throws Exception { + + long start = Time.monotonicNow(); + while (Time.monotonicNow() - start < waitForMillis) { + try { + assertMemory(scheduler, nmId, expectedUsed, expectedAvailable); + return; + } catch (AssertionError e) { + Thread.sleep(checkEveryMillis); + } + } + + LOG.error("Took longer than expected ({}ms), checking once more", + Time.monotonicNow() - start); + assertMemory(scheduler, nmId, expectedUsed, expectedAvailable); + } +} \ No newline at end of file