diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 742a43a1207..90e1fb17240 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import java.util.HashMap; import java.util.HashSet; @@ -51,6 +52,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -68,6 +70,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -96,6 +99,7 @@ @RunWith(value = Parameterized.class) public class TestAMRMClient { private String schedulerName = null; + private boolean autoUpdate = false; private Configuration conf = null; private MiniYARNCluster yarnCluster = null; private YarnClient yarnClient = null; @@ -115,16 +119,19 @@ private String[] racks; private final static int DEFAULT_ITERATION = 3; - public TestAMRMClient(String schedulerName) { + public TestAMRMClient(String schedulerName, boolean autoUpdate) { this.schedulerName = schedulerName; + this.autoUpdate = autoUpdate; } @Parameterized.Parameters public static Collection data() { - List list = new ArrayList(2); - list.add(new Object[] {CapacityScheduler.class.getName()}); - list.add(new Object[] {FairScheduler.class.getName()}); - return list; + // Currently only capacity scheduler supports auto update. + return Arrays.asList(new Object[][] { + { CapacityScheduler.class.getName(), true }, + { CapacityScheduler.class.getName(), false }, + { FairScheduler.class.getName(), false } + }); } @Before @@ -137,6 +144,9 @@ private void createClusterAndStartApplication(Configuration conf) throws Exception { // start minicluster this.conf = conf; + if (autoUpdate) { + conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true); + } conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName); conf.setLong( YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, @@ -1157,6 +1167,118 @@ private void doContainerResourceChange( assertEquals(1, updatedContainers.size()); } + @Test + public void testNMContextUpdatedWithContainerPromotion() throws Exception { + AMRMClientImpl amClient = + (AMRMClientImpl) AMRMClient + .createAMRMClient(); + amClient.init(conf); + amClient.start(); + + // start am nm client + NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient(); + Assert.assertNotNull(nmClient); + nmClient.init(conf); + nmClient.start(); + assertEquals(STATE.STARTED, nmClient.getServiceState()); + + amClient.registerApplicationMaster("Host", 10000, ""); + + // setup container request + assertEquals(0, amClient.ask.size()); + assertEquals(0, amClient.release.size()); + + // START OPPORTUNISTIC Container, Send allocation request to RM + Resource reqResource = Resource.newInstance(512, 1); + amClient.addContainerRequest( + new AMRMClient.ContainerRequest(reqResource, null, null, priority2, 0, + true, null, ExecutionTypeRequest + .newInstance(ExecutionType.OPPORTUNISTIC, true))); + + // RM should allocate container within 1 calls to allocate() + AllocateResponse allocResponse = waitForAllocation(amClient, 1, 0); + + assertEquals(1, allocResponse.getAllocatedContainers().size()); + startContainer(allocResponse, nmClient); + + Container c = allocResponse.getAllocatedContainers().get(0); + amClient.requestContainerUpdate(c, + UpdateContainerRequest.newInstance(c.getVersion(), + c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, + null, ExecutionType.GUARANTEED)); + + allocResponse = waitForAllocation(amClient, 0, 1); + + // Make sure container is updated. + UpdatedContainer updatedContainer = allocResponse + .getUpdatedContainers().get(0); + + // If container auto update is not enabled, we need to notify + // NM about this update. + if (!autoUpdate) { + nmClient.updateContainerResource(updatedContainer.getContainer()); + } + + // Wait until NM context updated, or fail on timeout. + waitForNMContextUpdate(updatedContainer, ExecutionType.GUARANTEED); + + amClient.close(); + } + + private AllocateResponse waitForAllocation(AMRMClient amrmClient, + int expectedAllocatedContainerNum, int expectedUpdatedContainerNum) + throws Exception { + AllocateResponse allocResponse = null; + int iteration = 100; + while(iteration>0) { + allocResponse = amrmClient.allocate(0.1f); + int actualAllocated = allocResponse.getAllocatedContainers().size(); + int actualUpdated = allocResponse.getUpdatedContainers().size(); + if (expectedAllocatedContainerNum == actualAllocated && + expectedUpdatedContainerNum == actualUpdated) { + break; + } + Thread.sleep(100); + iteration--; + } + return allocResponse; + } + + private void waitForNMContextUpdate(UpdatedContainer updatedContainer, + ExecutionType expectedType) { + for (int i=0; i { + org.apache.hadoop.yarn.server.nodemanager.containermanager + .container.Container nmContainer = + nm.getNMContext().getContainers() + .get(updatedContainer.getContainer().getId()); + if (nmContainer != null) { + ExecutionType actual = nmContainer.getContainerTokenIdentifier() + .getExecutionType(); + return actual.equals(expectedType); + } + return false; + },1000, 30000); + } catch (TimeoutException e) { + fail("Times out waiting for container state in" + + " NM context to be updated"); + } catch (InterruptedException e) { + // Ignorable. + } + break; + } + + // Iterated all nodes but still can't get a match + if (i == nodeCount -1) { + fail("Container doesn't exist in NM context."); + } + } + } + @Test(timeout=60000) public void testAMRMClientWithContainerPromotion() throws YarnException, IOException { @@ -1446,7 +1568,9 @@ private void updateContainerExecType(AllocateResponse allocResponse, for (UpdatedContainer updatedContainer : allocResponse .getUpdatedContainers()) { Container container = updatedContainer.getContainer(); - nmClient.increaseContainerResource(container); + if (!autoUpdate) { + nmClient.increaseContainerResource(container); + } // NodeManager may still need some time to get the stable // container status while (true) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index 9e7d132d9b0..499423f3b1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -354,4 +354,10 @@ private ExecutionType convertFromProtoFormat( ExecutionTypeProto executionType) { return ProtoUtils.convertFromProtoFormat(executionType); } + + public void setExecutionType(ExecutionType type) { + proto = proto.toBuilder() + .setExecutionType(convertToProtoFormat(type)) + .build(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index d9b713f6d4a..3d14f626fe2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -178,6 +178,22 @@ public void handle(ContainerSchedulerEvent event) { } } + /** + * Updates the execution type of a container in NM context. + * This should be called after the container is promoted/demoted. + * + * @param containerId container ID. + * @param currentType the execution type the container updated to. + */ + private void updateContainerExecType(ContainerId containerId, + ExecutionType currentType) { + Container nmContainer = context.getContainers().get(containerId); + if (nmContainer != null) { + nmContainer.getContainerTokenIdentifier() + .setExecutionType(currentType); + } + } + /** * We assume that the ContainerManager has already figured out what kind * of update this is. @@ -205,6 +221,8 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { if (queuedOpportunisticContainers.remove(containerId) != null) { queuedGuaranteedContainers.put(containerId, updateEvent.getContainer()); + // Update execution type in NM context. + updateContainerExecType(containerId, ExecutionType.GUARANTEED); //Kill/pause opportunistic containers if any to make room for // promotion request reclaimOpportunisticContainerResources(updateEvent.getContainer()); @@ -216,6 +234,8 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { if (queuedGuaranteedContainers.remove(containerId) != null) { queuedOpportunisticContainers.put(containerId, updateEvent.getContainer()); + // Update execution type in NM context. + updateContainerExecType(containerId, ExecutionType.OPPORTUNISTIC); } } startPendingContainers(maxOppQueueLength <= 0);