diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 5ec0ae64c3..8acd2de0a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -109,6 +110,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Assert; import org.junit.Before; @@ -396,6 +398,79 @@ public void testNMRecoveryForAppFinishedWithLogAggregationFailure() cm.stop(); } + @Test + public void testContainerExtendedResourceRecovery() throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + Context context = createContext(conf, stateStore); + ContainerManagerImpl cm = createContainerManager(context, delSrvc); + cm.dispatcher.disableExitOnDispatchException(); + cm.init(conf); + cm.start(); + // add an application by starting a container + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + Map containerEnv = new HashMap<>(); + setFlowContext(containerEnv, "app_name1", appId); + Map serviceData = Collections.emptyMap(); + Credentials containerCreds = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + containerCreds.writeTokenStorageToStream(dob); + ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0, + dob.getLength()); + Map acls = Collections.emptyMap(); + Map localResources = Collections.emptyMap(); + List commands = Arrays.asList("sleep 10"); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, containerEnv, commands, serviceData, + containerTokens, acls); + Map resourceMap = new HashMap<>(); + resourceMap.put(ResourceInformation.MEMORY_MB.getName(), ResourceInformation + .newInstance(ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), 8096)); + resourceMap.put(ResourceInformation.VCORES.getName(), ResourceInformation + .newInstance(ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), 8)); + resourceMap + .put("extRes", ResourceInformation.newInstance("extRes", (long) 4)); + ResourceUtils.initializeResourcesFromResourceInformationMap(resourceMap); + Resource resource = Resource.newInstance(2048, 2); + resource.setResourceValue("extRes", 1); + StartContainersResponse startResponse = startContainerWithResource( + context, cm, cid, resource, clc, null); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + Application app = context.getApplications().get(appId); + assertNotNull(app); + // make sure the container reaches RUNNING state + waitForNMContainerState(cm, cid, + org.apache.hadoop.yarn.server.nodemanager + .containermanager.container.ContainerState.RUNNING); + ContainerStatus containerStatus = getContainerStatus(context, cm, cid); + assertEquals( + containerStatus.getCapability().getResourceValue("extRes"), 1); + assertEquals(resource, containerStatus.getCapability()); + // restart and verify container is running and extended resources have + // been recovered + cm.stop(); + context = createContext(conf, stateStore); + cm = createContainerManager(context); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + containerStatus = getContainerStatus(context, cm, cid); + assertEquals( + containerStatus.getCapability().getResourceValue("extRes"), 1); + assertEquals(resource, containerStatus.getCapability()); + } + @Test public void testContainerResizeRecovery() throws Exception { conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); @@ -705,6 +780,33 @@ public StartContainersResponse run() throws Exception { }); } + private StartContainersResponse startContainerWithResource(Context context, + final ContainerManagerImpl cm, ContainerId cid, Resource resource, + ContainerLaunchContext clc, LogAggregationContext logAggregationContext) + throws Exception { + UserGroupInformation user = UserGroupInformation.createRemoteUser( + cid.getApplicationAttemptId().toString()); + StartContainerRequest scReq = StartContainerRequest.newInstance( + clc, TestContainerManager.createContainerToken(cid, 0, + context.getNodeId(), user.getShortUserName(), resource, + context.getContainerTokenSecretManager(), logAggregationContext)); + final List scReqList = + new ArrayList(); + scReqList.add(scReq); + NMTokenIdentifier nmToken = new NMTokenIdentifier( + cid.getApplicationAttemptId(), context.getNodeId(), + user.getShortUserName(), + context.getNMTokenSecretManager().getCurrentKey().getKeyId()); + user.addTokenIdentifier(nmToken); + return user.doAs(new PrivilegedExceptionAction() { + @Override + public StartContainersResponse run() throws Exception { + return cm.startContainers( + StartContainersRequest.newInstance(scReqList)); + } + }); + } + private ContainerUpdateResponse updateContainers( Context context, final ContainerManagerImpl cm, ContainerId cid, Resource capability) throws Exception {