diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 34709104264..89d261b786a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -179,7 +179,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; - +import java.util.Iterator; +import java.util.NoSuchElementException; import static org.apache.hadoop.service.Service.STATE.STARTED; public class ContainerManagerImpl extends CompositeService implements @@ -363,11 +364,18 @@ private void recover() throws IOException, URISyntaxException { recoverApplication(proto); } - for (RecoveredContainerState rcs : stateStore.loadContainersState()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering container with state: " + rcs); + Iterator it = stateStore.getRCSIterator(); + try { + while (it.hasNext()) { + RecoveredContainerState rcs = it.next(); + if(rcs.getStartRequest()!=null) { + recoverContainer(rcs); + } else { + stateStore.removeContainer(rcs.getContainerId()); + } } - recoverContainer(rcs); + } catch (NoSuchElementException e) { + LOG.error(e.getMessage()); } // Recovery AMRMProxy state after apps and containers are recovered diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 6f643b04d5f..eda94c790e6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -67,6 +67,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -75,10 +76,10 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; - import static org.fusesource.leveldbjni.JniDBFactory.asString; import static org.fusesource.leveldbjni.JniDBFactory.bytes; - +import java.util.Iterator; +import java.util.NoSuchElementException; public class NMLeveldbStateStoreService extends NMStateStoreService { public static final org.slf4j.Logger LOG = @@ -224,63 +225,92 @@ boolean isHealthy() { return isHealthy; } + private LeveldbIterator getLevelDBIterator() { + LeveldbIterator iterator = new LeveldbIterator(db); + iterator.seek(bytes(CONTAINERS_KEY_PREFIX)); + return iterator; + } + + private class RCSIterator implements Iterator { + private LeveldbIterator it; + private boolean hasNext; + private RecoveredContainerState rcs; + + RCSIterator(LeveldbIterator it) { + this.it = it; + hasNext = false; + rcs = null; + } + + @Override + public boolean hasNext() { + if(rcs != null) { + hasNext = true; + } else { + try { + rcs = getNextRecoveredObject(it); + if(rcs == null) { + hasNext = false; + } else + hasNext = true; + } catch (IOException e) { + return hasNext = false; + } + } + return hasNext; + } + + @Override + public RecoveredContainerState next() { + RecoveredContainerState tmp = rcs; + + if(rcs != null) { + rcs = null; + return tmp; + } else { + try { + tmp = getNextRecoveredObject(it); + if(tmp == null) throw new NoSuchElementException(); + return tmp; + } catch (IOException e) { + throw new NoSuchElementException(e.getMessage()); + } + } + } + } + @Override - public List loadContainersState() - throws IOException { - ArrayList containers = - new ArrayList(); - ArrayList containersToRemove = - new ArrayList(); - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(CONTAINERS_KEY_PREFIX)); + public Iterator getRCSIterator() throws IOException{ - while (iter.hasNext()) { - Entry entry = iter.peekNext(); - String key = asString(entry.getKey()); + LeveldbIterator it = getLevelDBIterator(); + RCSIterator rcsit = new RCSIterator(it); + return rcsit; + } + + + private RecoveredContainerState getNextRecoveredObject(LeveldbIterator it) throws IOException{ + RecoveredContainerState rcs = null; + if(it.hasNext()) { + Entry entry = it.peekNext(); + String key = asString(entry.getKey()); if (!key.startsWith(CONTAINERS_KEY_PREFIX)) { - break; + return null; } - int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length()); if (idEndPos < 0) { throw new IOException("Unable to determine container in key: " + key); } ContainerId containerId = ContainerId.fromString( - key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos)); + key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos)); String keyPrefix = key.substring(0, idEndPos+1); - RecoveredContainerState rcs = loadContainerState(containerId, - iter, keyPrefix); - // Don't load container without StartContainerRequest - if (rcs.startRequest != null) { - containers.add(rcs); - } else { - containersToRemove.add(containerId); - } - } - } catch (DBException e) { - throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } + rcs = loadContainerState(containerId, + it, keyPrefix); + rcs.setContainerId(containerId); +// if(rcs.getStartRequest() != null) { +// break; +// } } - - // remove container without StartContainerRequest - for (ContainerId containerId : containersToRemove) { - LOG.warn("Remove container " + containerId + - " with incomplete records"); - try { - removeContainer(containerId); - // TODO: kill and cleanup the leaked container - } catch (IOException e) { - LOG.error("Unable to remove container " + containerId + - " in store", e); - } - } - - return containers; + return rcs; } private RecoveredContainerState loadContainerState(ContainerId containerId, @@ -739,6 +769,11 @@ public void removeApplication(ApplicationId appId) } } + @Override + public List loadContainersState() throws IOException { + return Collections.emptyList(); + } + @Override public RecoveredLocalizationState loadLocalizationState() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index f217f2f8605..b2fde4d2015 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.io.Serializable; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -291,4 +293,9 @@ protected void startStorage() throws IOException { @Override protected void closeStorage() throws IOException { } + + @Override + public Iterator getRCSIterator() throws IOException { + return Collections.emptyIterator(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 0ea0ef3b86c..57837a1a67b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -106,6 +107,15 @@ public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { RecoveredContainerType.RECOVER; private long startTime; private ResourceMappings resMappings = new ResourceMappings(); + private ContainerId containerId; + + public ContainerId getContainerId() { + return containerId; + } + + public void setContainerId(ContainerId containerId) { + this.containerId = containerId; + } public RecoveredContainerStatus getStatus() { return status; @@ -788,4 +798,6 @@ protected void updateContainerResourceMapping(Container container, container.getResourceMappings().addAssignedResources(resourceType, newAssigned); } + + public abstract Iterator getRCSIterator() throws IOException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index b67d11fceb3..fde83b8990f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -22,7 +22,9 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -82,6 +84,10 @@ protected void startStorage() { protected void closeStorage() { } + @Override + public Iterator getRCSIterator() throws IOException { + return Collections.emptyIterator(); + } @Override public synchronized RecoveredApplicationsState loadApplicationsState() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 265b3e68833..29c2e8fa756 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -41,6 +41,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; @@ -143,6 +144,18 @@ private void verifyEmptyState() throws IOException { assertTrue(state.getUserResources().isEmpty()); } + private RecoveredContainerState + getOne(Iterator it) throws IOException { + RecoveredContainerState rcs = null; + if(it.hasNext()) { + rcs = it.next(); + } + assertNotNull(rcs); + assertTrue(it.hasNext() == false || it.next().getStartRequest() == null); + + return rcs; + } + @Test public void testIsNewlyCreated() throws IOException { assertTrue(stateStore.isNewlyCreated()); @@ -226,13 +239,16 @@ public void testApplicationStorage() throws IOException { assertEquals(appProto1, state.getApplications().get(0)); } + + + private boolean isStoreEmpty() throws IOException{ + return(!stateStore.getRCSIterator().hasNext()); + } + @Test public void testContainerStorage() throws IOException { - // test empty when no state - List recoveredContainers = - stateStore.loadContainersState(); - assertTrue(recoveredContainers.isEmpty()); - + //test empty when no state + assertFalse(stateStore.getRCSIterator().hasNext()); // create a container request ApplicationId appId = ApplicationId.newInstance(1234, 3); ApplicationAttemptId appAttemptId = @@ -250,35 +266,33 @@ public void testContainerStorage() throws IOException { stateStore.getContainerVersionKey(containerId.toString())))); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertEquals(1, recoveredContainers.size()); - RecoveredContainerState rcs = recoveredContainers.get(0); - assertEquals(0, rcs.getVersion()); - assertEquals(containerStartTime, rcs.getStartTime()); - assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); - assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); - assertEquals(false, rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); - assertTrue(rcs.getDiagnostics().isEmpty()); + RecoveredContainerState recoveredContainer = getOne(stateStore.getRCSIterator()); + assertNotNull(recoveredContainer); + + //recoveredContainer = stateStore.loadContainersState(); + assertEquals(0, recoveredContainer.getVersion()); + assertEquals(containerStartTime, recoveredContainer.getStartTime()); + assertEquals(RecoveredContainerStatus.REQUESTED, recoveredContainer.getStatus()); + assertEquals(ContainerExitStatus.INVALID, recoveredContainer.getExitCode()); + assertEquals(false, recoveredContainer.getKilled()); + assertEquals(containerReq, recoveredContainer.getStartRequest()); + assertTrue(recoveredContainer.getDiagnostics().isEmpty()); // store a new container record without StartContainerRequest ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); stateStore.storeContainerLaunched(containerId1); - recoveredContainers = stateStore.loadContainersState(); - // check whether the new container record is discarded - assertEquals(1, recoveredContainers.size()); + recoveredContainer = getOne(stateStore.getRCSIterator()); + assertNotNull(recoveredContainer); // queue the container, and verify recovered stateStore.storeContainerQueued(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); - assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); - assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); - assertEquals(false, rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); - assertTrue(rcs.getDiagnostics().isEmpty()); + recoveredContainer = getOne(stateStore.getRCSIterator()); + assertEquals(RecoveredContainerStatus.QUEUED, recoveredContainer.getStatus()); + assertEquals(ContainerExitStatus.INVALID, recoveredContainer.getExitCode()); + assertEquals(false, recoveredContainer.getKilled()); + assertEquals(containerReq, recoveredContainer.getStartRequest()); + assertTrue(recoveredContainer.getDiagnostics().isEmpty()); // launch the container, add some diagnostics, and verify recovered StringBuilder diags = new StringBuilder(); @@ -286,33 +300,31 @@ public void testContainerStorage() throws IOException { diags.append("some diags for container"); stateStore.storeContainerDiagnostics(containerId, diags); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); - assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); - assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); - assertEquals(false, rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); - assertEquals(diags.toString(), rcs.getDiagnostics()); + recoveredContainer = getOne(stateStore.getRCSIterator()); + assertEquals(RecoveredContainerStatus.LAUNCHED, recoveredContainer.getStatus()); + assertEquals(ContainerExitStatus.INVALID, recoveredContainer.getExitCode()); + assertEquals(false, recoveredContainer.getKilled()); + assertEquals(containerReq, recoveredContainer.getStartRequest()); + assertEquals(diags.toString(), recoveredContainer.getDiagnostics()); // pause the container, and verify recovered stateStore.storeContainerPaused(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); + RecoveredContainerState rcs = getOne(stateStore.getRCSIterator()); assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); assertEquals(containerReq, rcs.getStartRequest()); + + // Resume the container stateStore.removeContainerPaused(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertEquals(1, recoveredContainers.size()); + recoveredContainer = getOne(stateStore.getRCSIterator()); - // increase the container size, and verify recovered + + // increase the container size, and verify recovered ContainerTokenIdentifier updateTokenIdentifier = new ContainerTokenIdentifier(containerId, "host", "user", Resource.newInstance(2468, 4), 9876543210L, 42, 2468, @@ -321,9 +333,7 @@ public void testContainerStorage() throws IOException { stateStore .storeContainerUpdateToken(containerId, updateTokenIdentifier); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); + rcs = getOne(stateStore.getRCSIterator()); assertEquals(0, rcs.getVersion()); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); @@ -335,9 +345,7 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerKilled(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); + rcs = getOne(stateStore.getRCSIterator()); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertTrue(rcs.getKilled()); @@ -351,22 +359,18 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerCompleted(containerId, 21); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); + rcs = getOne(stateStore.getRCSIterator()); assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); assertEquals(21, rcs.getExitCode()); assertTrue(rcs.getKilled()); assertEquals(diags.toString(), rcs.getDiagnostics()); - +// // store remainingRetryAttempts, workDir and logDir stateStore.storeContainerRemainingRetryAttempts(containerId, 6); stateStore.storeContainerWorkDir(containerId, "/test/workdir"); stateStore.storeContainerLogDir(containerId, "/test/logdir"); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); + rcs = getOne(stateStore.getRCSIterator()); assertEquals(6, rcs.getRemainingRetryAttempts()); assertEquals("/test/workdir", rcs.getWorkDir()); assertEquals("/test/logdir", rcs.getLogDir()); @@ -375,8 +379,12 @@ public void testContainerStorage() throws IOException { // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertTrue(recoveredContainers.isEmpty()); + //make sure nothing is in the store + if(isStoreEmpty() == false) { + assertNull(getOne(stateStore.getRCSIterator()).getStartRequest()); + } else { + assertTrue(isStoreEmpty()); + } } private void validateRetryAttempts(ContainerId containerId) @@ -387,7 +395,7 @@ private void validateRetryAttempts(ContainerId containerId) stateStore.storeContainerRestartTimes(containerId, finishTimeForRetryAttempts); restartStateStore(); - RecoveredContainerState rcs = stateStore.loadContainersState().get(0); + RecoveredContainerState rcs = getOne(stateStore.getRCSIterator()); List recoveredRestartTimes = rcs.getRestartTimes(); assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0)); assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1)); @@ -1003,9 +1011,7 @@ protected DB openDatabase(Configuration conf) { @Test public void testUnexpectedKeyDoesntThrowException() throws IOException { // test empty when no state - List recoveredContainers = stateStore - .loadContainersState(); - assertTrue(recoveredContainers.isEmpty()); + assertTrue(isStoreEmpty()); ApplicationId appId = ApplicationId.newInstance(1234, 3); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, @@ -1019,9 +1025,7 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException { + containerId.toString() + "/invalidKey1234").getBytes(); stateStore.getDB().put(invalidKey, new byte[1]); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertEquals(1, recoveredContainers.size()); - RecoveredContainerState rcs = recoveredContainers.get(0); + RecoveredContainerState rcs = getOne(stateStore.getRCSIterator()); assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(false, rcs.getKilled()); @@ -1136,9 +1140,7 @@ public void testAMRMProxyStorage() throws IOException { @Test public void testStateStoreForResourceMapping() throws IOException { // test empty when no state - List recoveredContainers = stateStore - .loadContainersState(); - assertTrue(recoveredContainers.isEmpty()); + assertTrue(isStoreEmpty()); ApplicationId appId = ApplicationId.newInstance(1234, 3); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, @@ -1164,9 +1166,7 @@ public void testStateStoreForResourceMapping() throws IOException { // add a invalid key restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertEquals(1, recoveredContainers.size()); - RecoveredContainerState rcs = recoveredContainers.get(0); + RecoveredContainerState rcs = getOne(stateStore.getRCSIterator()); List res = rcs.getResourceMappings() .getAssignedResources("gpu"); Assert.assertTrue(res.equals(gpuRes1)); @@ -1227,7 +1227,7 @@ public void testEmptyRestartTimes() throws IOException { stateStore.storeContainerRestartTimes(containerId, restartTimes); restartStateStore(); - RecoveredContainerState rcs = stateStore.loadContainersState().get(0); + RecoveredContainerState rcs = getOne(stateStore.getRCSIterator()); List recoveredRestartTimes = rcs.getRestartTimes(); assertTrue(recoveredRestartTimes.isEmpty()); }