diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java index 86261dea199..b28332642bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java @@ -37,7 +37,12 @@ public static ResourceOption newInstance(Resource resource, } /** Negative value means no timeout. */ - public static final int OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT = -1; + public static final int OVER_COMMIT_NO_TIMEOUT = -1; + /** Zero value means immediate kill. */ + public static final int OVER_COMMIT_IMMEDIATE = 0; + /** By default, no timeout. */ + public static final int OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT = + OVER_COMMIT_NO_TIMEOUT; /** * Get the resource of the ResourceOption. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index a565fe75656..9574d30eebf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -963,8 +963,9 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { .getDispatcher() .getEventHandler() .handle( - new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption - .newInstance(newNode.getTotalCapability(), -1))); + new NodeResourceUpdateSchedulerEvent(rmNode, + ResourceOption.newInstance(newNode.getTotalCapability(), + ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); } } @@ -1209,8 +1210,9 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { .getDispatcher() .getEventHandler() .handle( - new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption - .newInstance(rmNode.totalCapability, 0))); + new NodeResourceUpdateSchedulerEvent(rmNode, + ResourceOption.newInstance(rmNode.totalCapability, + ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index d7426833ac8..8165c3c09a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -852,7 +852,8 @@ public void updateNodeResource(RMNode nm, // update resource to node node.updateTotalResource(newResource); node.setOvercommitTimeOut(timeout); - signalContainersIfOvercommitted(node, timeout == 0); + signalContainersIfOvercommitted(node, + timeout == ResourceOption.OVER_COMMIT_IMMEDIATE); nodeTracker.addNode((N) node); } else{ @@ -1186,8 +1187,10 @@ protected void nodeUpdate(RMNode nm) { .getDispatcher() .getEventHandler() .handle( - new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption - .newInstance(schedulerNode.getAllocatedResource(), 0))); + new RMNodeResourceUpdateEvent(nm.getNodeID(), + ResourceOption.newInstance( + schedulerNode.getAllocatedResource(), + ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); } updateSchedulerHealthInformation(releasedResources, releasedContainers); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java index 36b51985060..525de808da6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java @@ -187,7 +187,8 @@ public void testReduceNoTimeout() throws Exception { assertMemory(scheduler, nmId, 4 * GB, 0); // Update node resource to 2 GB, so resource is over-consumed - updateNodeResource(rm, nmId, 2 * GB, 2, -1); + updateNodeResource( + rm, nmId, 2 * GB, 2, ResourceOption.OVER_COMMIT_NO_TIMEOUT); // The used resource should still be 4 GB and negative available resource waitMemory(scheduler, nmId, 4 * GB, -2 * GB, INTERVAL, 2 * 1000); // Check that the NM got the updated resources @@ -231,13 +232,16 @@ public void testReduceNoTimeout() throws Exception { public void testChangeResourcesNoTimeout() throws Exception { waitMemory(scheduler, nmId, 2 * GB, 2 * GB, 100, 2 * 1000); - updateNodeResource(rm, nmId, 5 * GB, 2, -1); + updateNodeResource( + rm, nmId, 5 * GB, 2, ResourceOption.OVER_COMMIT_NO_TIMEOUT); waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 2 * 1000); - updateNodeResource(rm, nmId, 0 * GB, 2, -1); + updateNodeResource( + rm, nmId, 0 * GB, 2, ResourceOption.OVER_COMMIT_NO_TIMEOUT); waitMemory(scheduler, nmId, 2 * GB, -2 * GB, 100, 2 * 1000); - updateNodeResource(rm, nmId, 4 * GB, 2, -1); + updateNodeResource( + rm, nmId, 4 * GB, 2, ResourceOption.OVER_COMMIT_NO_TIMEOUT); waitMemory(scheduler, nmId, 2 * GB, 2 * GB, 100, 2 * 1000); // The application should still be running without issues. @@ -255,7 +259,8 @@ public void testReduceKill() throws Exception { // Reducing to 2GB should kill the container long t0 = Time.now(); - updateNodeResource(rm, nmId, 2 * GB, 2, 0); + updateNodeResource( + rm, nmId, 2 * GB, 2, ResourceOption.OVER_COMMIT_IMMEDIATE); waitMemory(scheduler, nm, 2 * GB, 0 * GB, INTERVAL, 2 * INTERVAL); // Check that the new container was killed @@ -358,7 +363,8 @@ public void testReducePreemptAndCancel() throws Exception { @Test public void testKillMultipleContainers() throws Exception { - updateNodeResource(rm, nmId, 8 * GB, 6, -1); + updateNodeResource( + rm, nmId, 8 * GB, 6, ResourceOption.OVER_COMMIT_NO_TIMEOUT); waitMemory(scheduler, nmId, 2 * GB, 6 * GB, 200, 5 * 1000); // Start 2 containers with 1 GB each @@ -380,7 +386,8 @@ public void testKillMultipleContainers() throws Exception { assertEquals(5, scheduler.getNodeReport(nmId).getNumContainers()); // Reduce the resources to kill C3 and C2 (not AM2) - updateNodeResource(rm, nmId, 5 * GB, 6, 0); + updateNodeResource( + rm, nmId, 5 * GB, 6, ResourceOption.OVER_COMMIT_IMMEDIATE); waitMemory(scheduler, nm, 5 * GB, 0 * GB, 200, 5 * 1000); assertEquals(3, scheduler.getNodeReport(nmId).getNumContainers()); @@ -398,7 +405,8 @@ public void testKillMultipleContainers() throws Exception { assertEquals(RMAppAttemptState.RUNNING, attempt2.getState()); // Reduce the resources to kill C1 (not AM2) - updateNodeResource(rm, nmId, 4 * GB, 6, 0); + updateNodeResource( + rm, nmId, 4 * GB, 6, ResourceOption.OVER_COMMIT_IMMEDIATE); waitMemory(scheduler, nm, 4 * GB, 0 * GB, 200, 5 * 1000); assertEquals(2, scheduler.getNodeReport(nmId).getNumContainers()); completedContainers = am.schedule().getCompletedContainersStatuses(); @@ -409,7 +417,8 @@ public void testKillMultipleContainers() throws Exception { assertEquals(RMAppAttemptState.RUNNING, attempt2.getState()); // Reduce the resources to kill AM2 - updateNodeResource(rm, nmId, 2 * GB, 6, 0); + updateNodeResource( + rm, nmId, 2 * GB, 6, ResourceOption.OVER_COMMIT_IMMEDIATE); waitMemory(scheduler, nm, 2 * GB, 0 * GB, 200, 5 * 1000); assertEquals(1, scheduler.getNodeReport(nmId).getNumContainers()); assertEquals(RMAppAttemptState.FAILED, attempt2.getState()); @@ -430,7 +439,8 @@ public void testEndToEnd() throws Exception { assertEquals(4 * GB, nm.getCapability().getMemorySize()); // update node resource to 2 GB, so resource is over-consumed - updateNodeResource(rm, nmId, 2 * GB, 2, -1); + updateNodeResource( + rm, nmId, 2 * GB, 2, ResourceOption.OVER_COMMIT_NO_TIMEOUT); // the used resource should still 4 GB and negative available resource waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000); // check that we did not get a preemption requests @@ -465,7 +475,8 @@ public void testEndToEnd() throws Exception { } // increase the resources again to 5 GB to schedule the 3GB container - updateNodeResource(rm, nmId, 5 * GB, 2, -1); + updateNodeResource( + rm, nmId, 5 * GB, 2, ResourceOption.OVER_COMMIT_NO_TIMEOUT); waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000); // kick the scheduling and check it took effect @@ -492,7 +503,8 @@ public void testEndToEnd() throws Exception { assertPreemption(c2.getId(), preemptMsg); // increasing the resources again, should stop killing the containers - updateNodeResource(rm, nmId, 5 * GB, 2, -1); + updateNodeResource( + rm, nmId, 5 * GB, 2, ResourceOption.OVER_COMMIT_NO_TIMEOUT); waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000); Thread.sleep(3 * 1000); assertMemory(scheduler, nmId, 5 * GB, 0);