diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index df58182..2697462 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -40,7 +40,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; @@ -99,6 +102,7 @@ private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request"; private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics"; private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; + private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX = "/resourceChanged"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; @@ -230,6 +234,9 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) { rcs.status = RecoveredContainerStatus.COMPLETED; rcs.exitCode = Integer.parseInt(asString(entry.getValue())); + } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) { + rcs.capability = new ResourcePBImpl( + ResourceProto.parseFrom(entry.getValue())); } else { throw new IOException("Unexpected container state key: " + key); } @@ -275,6 +282,20 @@ public void storeContainerLaunched(ContainerId containerId) } @Override + public void storeContainerResourceChanged(ContainerId containerId, + Resource capability) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX; + try { + // New value will overwrite old values for the same key + db.put(bytes(key), + ((ResourcePBImpl) capability).getProto().toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override public void storeContainerKilled(ContainerId containerId) throws IOException { String key = CONTAINERS_KEY_PREFIX + containerId.toString() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index ab49543..d5dce9b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; @@ -88,6 +89,11 @@ public void storeContainerLaunched(ContainerId containerId) } @Override + public void storeContainerResourceChanged(ContainerId containerId, + Resource capability) throws IOException { + } + + @Override public void storeContainerKilled(ContainerId containerId) throws IOException { } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index fa66349..e8ccf54 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; @@ -74,6 +75,7 @@ public NMStateStoreService(String name) { boolean killed = false; String diagnostics = ""; StartContainerRequest startRequest; + Resource capability; public RecoveredContainerStatus getStatus() { return status; @@ -94,6 +96,10 @@ public String getDiagnostics() { public StartContainerRequest getStartRequest() { return startRequest; } + + public Resource getCapability() { + return capability; + } } public static class LocalResourceTrackerState { @@ -284,6 +290,15 @@ public abstract void storeContainerLaunched(ContainerId containerId) throws IOException; /** + * Record that a container resource has been changed + * @param containerId the container ID + * @param capability the container resource capability + * @throws IOException + */ + public abstract void storeContainerResourceChanged(ContainerId containerId, + Resource capability) throws IOException; + + /** * Record that a container has completed * @param containerId the container ID * @param exitCode the exit code from the container diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 781950e..40d3402 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -28,18 +28,30 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -48,9 +60,17 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -58,6 +78,9 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -65,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; @@ -77,18 +101,50 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.Before; import org.junit.Test; -public class TestContainerManagerRecovery { +public class TestContainerManagerRecovery extends BaseContainerManagerTest { - private NodeManagerMetrics metrics = NodeManagerMetrics.create(); + public TestContainerManagerRecovery() throws UnsupportedFileSystemException { + super(); + } + + @Override + @Before + public void setup() throws IOException { + localFS.delete(new Path(localDir.getAbsolutePath()), true); + localFS.delete(new Path(tmpDir.getAbsolutePath()), true); + localFS.delete(new Path(localLogDir.getAbsolutePath()), true); + localFS.delete(new Path(remoteLogDir.getAbsolutePath()), true); + localDir.mkdir(); + tmpDir.mkdir(); + localLogDir.mkdir(); + remoteLogDir.mkdir(); + LOG.info("Created localDir in " + localDir.getAbsolutePath()); + LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath()); + + String bindAddress = "0.0.0.0:12345"; + conf.set(YarnConfiguration.NM_ADDRESS, bindAddress); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath()); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); + // Default delSrvc + delSrvc = createDeletionService(); + delSrvc.init(conf); + exec = createContainerExecutor(); + dirsHandler = new LocalDirsHandlerService(); + nodeHealthChecker = new NodeHealthCheckerService( + NodeManager.getNodeHealthScriptRunner(conf), dirsHandler); + nodeHealthChecker.init(conf); + } @Test public void testApplicationRecovery() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); - conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234"); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user"); NMStateStoreService stateStore = new NMMemoryStateStoreService(); @@ -234,6 +290,97 @@ public void testApplicationRecovery() throws Exception { } @Test + public void testContainerResizeRecovery() throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + Context context = createContext(conf, stateStore); + ContainerManagerImpl cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + // add an application by starting a container + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + Map containerEnv = Collections.emptyMap(); + Map serviceData = Collections.emptyMap(); + Credentials containerCreds = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + containerCreds.writeTokenStorageToStream(dob); + ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0, + dob.getLength()); + Map acls = Collections.emptyMap(); + File tmpDir = new File("target", + this.getClass().getSimpleName() + "-tmpDir"); + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + if (Shell.WINDOWS) { + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); + fileWriter.write("\nexec sleep 100"); + } + fileWriter.close(); + FileContext localFS = FileContext.getLocalFSFileContext(); + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = RecordFactoryProvider + .getRecordFactory(null).newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = new HashMap<>(); + localResources.put(destinationFile, rsrc_alpha); + List commands = + Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, containerEnv, commands, serviceData, + containerTokens, acls); + StartContainersResponse startResponse = startContainer( + context, cm, cid, clc, null); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + Application app = context.getApplications().get(appId); + assertNotNull(app); + // make sure the container reaches RUNNING state + waitForNMContainerState(cm, cid, + org.apache.hadoop.yarn.server.nodemanager + .containermanager.container.ContainerState.RUNNING); + Resource targetResource = Resource.newInstance(2048, 2); + IncreaseContainersResourceResponse increaseResponse = + increaseContainersResource(context, cm, cid, targetResource); + assertTrue(increaseResponse.getFailedRequests().isEmpty()); + // check status + ContainerStatus containerStatus = getContainerStatus(context, cm, cid); + int retry = 0; + while (!targetResource.equals(containerStatus.getCapability()) && + (retry++ < 5)) { + Thread.sleep(200); + containerStatus = getContainerStatus(context, cm, cid); + } + assertEquals(targetResource, containerStatus.getCapability()); + // restart and verify container is running and recovered + // to the correct size + cm.stop(); + context = createContext(conf, stateStore); + cm = createContainerManager(context); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + containerStatus = getContainerStatus(context, cm, cid); + assertEquals(targetResource, containerStatus.getCapability()); + } + + @Test public void testContainerCleanupOnShutdown() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 1); ApplicationAttemptId attemptId = @@ -257,10 +404,8 @@ public void testContainerCleanupOnShutdown() throws Exception { LogAggregationContext.newInstance("includePattern", "excludePattern"); // verify containers are stopped on shutdown without recovery - YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false); conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false); - conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234"); Context context = createContext(conf, new NMNullStateStoreService()); ContainerManagerImpl cm = spy(createContainerManager(context)); cm.init(conf); @@ -306,12 +451,37 @@ public void testContainerCleanupOnShutdown() throws Exception { verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class)); } - private NMContext createContext(YarnConfiguration conf, + private ContainerManagerImpl createContainerManager(Context context, + DeletionService delSrvc) { + return new ContainerManagerImpl(context, exec, delSrvc, + mock(NodeStatusUpdater.class), metrics, + new ApplicationACLsManager(conf), dirsHandler) { + @Override + public void + setBlockNewContainerRequests(boolean blockNewContainerRequests) { + // do nothing + } + @Override + protected void authorizeGetAndStopContainerRequest( + ContainerId containerId, Container container, + boolean stopRequest, NMTokenIdentifier identifier) + throws YarnException { + if(container == null || container.getUser().equals("Fail")){ + throw new YarnException("Reject this container"); + } + } + }; + } + + private NMContext createContext(Configuration conf, NMStateStoreService stateStore) { NMContext context = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore); - + new ApplicationACLsManager(conf), stateStore){ + public int getHttpPort() { + return HTTP_PORT; + } + }; // simulate registration with RM MasterKey masterKey = new MasterKeyPBImpl(); masterKey.setKeyId(123); @@ -349,6 +519,58 @@ public StartContainersResponse run() throws Exception { }); } + private IncreaseContainersResourceResponse increaseContainersResource( + Context context, final ContainerManagerImpl cm, ContainerId cid, + Resource capability) throws Exception { + UserGroupInformation user = UserGroupInformation.createRemoteUser( + cid.getApplicationAttemptId().toString()); + // construct container resource increase request + final List increaseTokens = new ArrayList(); + // add increase request + Token containerToken = TestContainerManager.createContainerToken( + cid, 0, context.getNodeId(), user.getShortUserName(), + capability, context.getContainerTokenSecretManager(), null); + increaseTokens.add(containerToken); + final IncreaseContainersResourceRequest increaseRequest = + IncreaseContainersResourceRequest.newInstance(increaseTokens); + NMTokenIdentifier nmToken = new NMTokenIdentifier( + cid.getApplicationAttemptId(), context.getNodeId(), + user.getShortUserName(), + context.getNMTokenSecretManager().getCurrentKey().getKeyId()); + user.addTokenIdentifier(nmToken); + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public IncreaseContainersResourceResponse run() throws Exception { + return cm.increaseContainersResource(increaseRequest); + } + }); + } + + private ContainerStatus getContainerStatus( + Context context, final ContainerManagerImpl cm, ContainerId cid) + throws Exception { + UserGroupInformation user = UserGroupInformation.createRemoteUser( + cid.getApplicationAttemptId().toString()); + NMTokenIdentifier nmToken = new NMTokenIdentifier( + cid.getApplicationAttemptId(), context.getNodeId(), + user.getShortUserName(), + context.getNMTokenSecretManager().getCurrentKey().getKeyId()); + user.addTokenIdentifier(nmToken); + List containerIds = new ArrayList<>(); + containerIds.add(cid); + final GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public ContainerStatus run() throws Exception { + return cm.getContainerStatuses(gcsRequest) + .getContainerStatuses().get(0); + } + }); + } + private void waitForAppState(Application app, ApplicationState state) throws Exception { final int msecPerSleep = 10; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index e0487e7..a1c95ab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; @@ -122,9 +123,10 @@ public synchronized void removeApplication(ApplicationId appId) rcsCopy.killed = rcs.killed; rcsCopy.diagnostics = rcs.diagnostics; rcsCopy.startRequest = rcs.startRequest; + rcsCopy.capability = rcs.capability; result.add(rcsCopy); } - return new ArrayList(); + return result; } @Override @@ -153,6 +155,13 @@ public synchronized void storeContainerLaunched(ContainerId containerId) } @Override + public synchronized void storeContainerResourceChanged( + ContainerId containerId, Resource capability) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.capability = capability; + } + + @Override public synchronized void storeContainerKilled(ContainerId containerId) throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId);