diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index d9b887f56de..a0d5ba103c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -342,29 +342,40 @@ public void recoverLocalizedResources(RecoveredLocalizationState state) } private void recoverTrackerResources(LocalResourcesTracker tracker, - LocalResourceTrackerState state) throws URISyntaxException { - for (LocalizedResourceProto proto : state.getLocalizedResources()) { - LocalResource rsrc = new LocalResourcePBImpl(proto.getResource()); - LocalResourceRequest req = new LocalResourceRequest(rsrc); - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering localized resource " + req + " at " - + proto.getLocalPath()); + LocalResourceTrackerState state) throws URISyntaxException, IOException { + try (RecoveryIterator it = + state.getCompletedResourcesIterator()) { + while (it.hasNext()) { + LocalizedResourceProto proto = it.next(); + LocalResource rsrc = new LocalResourcePBImpl(proto.getResource()); + LocalResourceRequest req = new LocalResourceRequest(rsrc); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering localized resource " + req + " at " + + proto.getLocalPath()); + } + tracker.handle(new ResourceRecoveredEvent(req, + new Path(proto.getLocalPath()), proto.getSize())); } - tracker.handle(new ResourceRecoveredEvent(req, - new Path(proto.getLocalPath()), proto.getSize())); + } catch (NullPointerException e) { + e.getMessage(); } - for (Map.Entry entry : - state.getInProgressResources().entrySet()) { - LocalResource rsrc = new LocalResourcePBImpl(entry.getKey()); - LocalResourceRequest req = new LocalResourceRequest(rsrc); - Path localPath = entry.getValue(); - tracker.handle(new ResourceRecoveredEvent(req, localPath, 0)); - - // delete any in-progress localizations, containers will request again - LOG.info("Deleting in-progress localization for " + req + " at " - + localPath); - tracker.remove(tracker.getLocalizedResource(req), delService); + try (RecoveryIterator> it = + state.getStartedResourcesIterator()) { + while (it.hasNext()) { + Map.Entry entry = it.next(); + LocalResource rsrc = new LocalResourcePBImpl(entry.getKey()); + LocalResourceRequest req = new LocalResourceRequest(rsrc); + Path localPath = entry.getValue(); + tracker.handle(new ResourceRecoveredEvent(req, localPath, 0)); + + // delete any in-progress localizations, containers will request again + LOG.info("Deleting in-progress localization for " + req + " at " + + localPath); + tracker.remove(tracker.getLocalizedResource(req), delService); + } + } catch (NullPointerException e) { + e.getMessage(); } // TODO: remove untracked directories in local filesystem diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 5d4253db9df..17dd69dc008 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -67,6 +67,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.AbstractMap; +import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -862,80 +863,91 @@ public void removeApplication(ApplicationId appId) public RecoveredLocalizationState loadLocalizationState() throws IOException { RecoveredLocalizationState state = new RecoveredLocalizationState(); - LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX); - state.publicTrackerState = loadResourceTrackerState(it, + state.publicTrackerState = loadResourceTrackerState( LOCALIZATION_PUBLIC_KEY_PREFIX); state.it = new UserResourcesIterator(); return state; } - private LocalResourceTrackerState loadResourceTrackerState( - LeveldbIterator iter, String keyPrefix) throws IOException { + private LocalResourceTrackerState loadResourceTrackerState(String keyPrefix) + throws IOException { final String completedPrefix = keyPrefix + LOCALIZATION_COMPLETED_SUFFIX; final String startedPrefix = keyPrefix + LOCALIZATION_STARTED_SUFFIX; LocalResourceTrackerState state = new LocalResourceTrackerState(); - while (iter.hasNext()) { - Entry entry = iter.peekNext(); - String key = asString(entry.getKey()); - if (!key.startsWith(keyPrefix)) { - break; - } + state.completedResourcesIterator = new CompletedResourcesIterator( + completedPrefix); + state.startedResourcesIterator = new StartedResourcesIterator( + startedPrefix); + return state; + } - if (key.startsWith(completedPrefix)) { - state.localizedResources = loadCompletedResources(iter, - completedPrefix); - } else if (key.startsWith(startedPrefix)) { - state.inProgressResources = loadStartedResources(iter, startedPrefix); - } else { - throw new IOException("Unexpected key in resource tracker state: " - + key); - } + private class CompletedResourcesIterator extends + BaseRecoveryIterator { + String startKey; + CompletedResourcesIterator(String startKey) throws IOException { + super(startKey); + this.startKey = startKey; } - return state; + @Override + protected LocalizedResourceProto getNextItem(LeveldbIterator it) + throws IOException { + return getNextCompletedResource(it, startKey); + } } - private List loadCompletedResources( + private LocalizedResourceProto getNextCompletedResource( LeveldbIterator iter, String keyPrefix) throws IOException { - List rsrcs = - new ArrayList(); - while (iter.hasNext()) { - Entry entry = iter.peekNext(); + LocalizedResourceProto nextCompletedResource = null; + if (iter .hasNext()){ + Entry entry = iter.next(); String key = asString(entry.getKey()); if (!key.startsWith(keyPrefix)) { - break; + return null; } if (LOG.isDebugEnabled()) { LOG.debug("Loading completed resource from " + key); } - rsrcs.add(LocalizedResourceProto.parseFrom(entry.getValue())); - iter.next(); + nextCompletedResource = LocalizedResourceProto.parseFrom( + entry.getValue()); + } + return nextCompletedResource; + } + + private class StartedResourcesIterator extends + BaseRecoveryIterator> { + String startKey; + StartedResourcesIterator(String startKey) throws IOException { + super(startKey); + this.startKey = startKey; } - return rsrcs; + @Override + protected Entry getNextItem(LeveldbIterator it) + throws IOException { + return getNextStartedResource(it, startKey); + } } - private Map loadStartedResources( + private Entry getNextStartedResource( LeveldbIterator iter, String keyPrefix) throws IOException { - Map rsrcs = - new HashMap(); - while (iter.hasNext()) { - Entry entry = iter.peekNext(); + Entry nextStartedResource = null; + if (iter.hasNext()){ + Entry entry = iter.next(); String key = asString(entry.getKey()); if (!key.startsWith(keyPrefix)) { - break; + return null; } Path localPath = new Path(key.substring(keyPrefix.length())); if (LOG.isDebugEnabled()) { LOG.debug("Loading in-progress resource at " + localPath); } - rsrcs.put(LocalResourceProto.parseFrom(entry.getValue()), localPath); - iter.next(); + nextStartedResource = new SimpleEntry( + LocalResourceProto.parseFrom(entry.getValue()), localPath); } - - return rsrcs; + return nextStartedResource; } private RecoveredUserResources loadUserLocalizedResources( @@ -949,8 +961,10 @@ private RecoveredUserResources loadUserLocalizedResources( } if (key.startsWith(LOCALIZATION_FILECACHE_SUFFIX, keyPrefix.length())) { - userResources.privateTrackerState = loadResourceTrackerState(iter, + userResources.privateTrackerState = loadResourceTrackerState( keyPrefix + LOCALIZATION_FILECACHE_SUFFIX); + // seek to next user. + iter.seek(bytes(keyPrefix + "zzz")); } else if (key.startsWith(LOCALIZATION_APPCACHE_SUFFIX, keyPrefix.length())) { int appIdStartPos = keyPrefix.length() + @@ -963,7 +977,9 @@ private RecoveredUserResources loadUserLocalizedResources( ApplicationId appId = ApplicationId.fromString( key.substring(appIdStartPos, appIdEndPos)); userResources.appTrackerStates.put(appId, - loadResourceTrackerState(iter, key.substring(0, appIdEndPos+1))); + loadResourceTrackerState(key.substring(0, appIdEndPos+1))); + // seek to next application + iter.seek(bytes(key.substring(0, appIdEndPos+1) + "zzz")); } else { throw new IOException("Unexpected user resource key " + key); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 35caec9a479..1158b8a1906 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -220,21 +220,25 @@ public void setResourceMappings(ResourceMappings mappings) { } public static class LocalResourceTrackerState { - List localizedResources = - new ArrayList(); - Map inProgressResources = - new HashMap(); + RecoveryIterator completedResourcesIterator = null; + RecoveryIterator> startedResourcesIterator = + null; - public List getLocalizedResources() { - return localizedResources; + public RecoveryIterator + getCompletedResourcesIterator() { + return completedResourcesIterator; } - public Map getInProgressResources() { - return inProgressResources; + public RecoveryIterator> + getStartedResourcesIterator() { + return startedResourcesIterator; } - public boolean isEmpty() { - return localizedResources.isEmpty() && inProgressResources.isEmpty(); + public boolean isEmpty() throws IOException { + return (completedResourcesIterator == null || + !completedResourcesIterator.hasNext()) && + (startedResourcesIterator ==null || + !startedResourcesIterator.hasNext()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 9658ecdf635..f44101539d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -293,11 +293,20 @@ private RecoveredContainerState getRecoveredContainerState( private LocalResourceTrackerState loadTrackerState(TrackerState ts) { LocalResourceTrackerState result = new LocalResourceTrackerState(); - result.localizedResources.addAll(ts.localizedResources.values()); + List completedResources = + new ArrayList(ts.localizedResources.values()); + result.completedResourcesIterator = + new NMMemoryRecoveryIterator( + completedResources.iterator()); + Map inProgressMap = + new HashMap(); for (Map.Entry entry : ts.inProgressMap.entrySet()) { - result.inProgressResources.put(entry.getValue(), entry.getKey()); + inProgressMap.put(entry.getValue(), entry.getKey()); } + result.startedResourcesIterator = + new NMMemoryRecoveryIterator>( + inProgressMap.entrySet().iterator()); return result; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index fcbbc52a3c8..8d06939aea9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -192,6 +192,28 @@ public void cleanup() throws IOException { return containerTokens; } + private List loadCompletedResources( + RecoveryIterator it) throws IOException { + List completedResources = + new ArrayList(); + while (it.hasNext()) { + completedResources.add(it.next()); + } + return completedResources; + } + + private Map loadStartedResources( + RecoveryIterator > it) + throws IOException { + Map startedResources = + new HashMap(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + startedResources.put(entry.getKey(), entry.getValue()); + } + return startedResources; + } + private void restartStateStore() throws IOException { // need to close so leveldb releases database lock if (stateStore != null) { @@ -207,8 +229,10 @@ private void verifyEmptyState() throws IOException { assertNotNull(state); LocalResourceTrackerState pubts = state.getPublicTrackerState(); assertNotNull(pubts); - assertTrue(pubts.getLocalizedResources().isEmpty()); - assertTrue(pubts.getInProgressResources().isEmpty()); + assertTrue(loadCompletedResources(pubts.getCompletedResourcesIterator()) + .isEmpty()); + assertTrue(loadStartedResources(pubts.getStartedResourcesIterator()) + .isEmpty()); assertTrue(loadUserResources(state.getIterator()).isEmpty()); } @@ -541,6 +565,111 @@ private StartContainerRequest createContainerRequestInternal(ContainerId return StartContainerRequest.newInstance(clc, containerToken); } + @Test + public void testLocalTrackerStateIterator() throws IOException { + String user1 = "somebody"; + ApplicationId appId1 = ApplicationId.newInstance(1, 1); + ApplicationId appId2 = ApplicationId.newInstance(2,2); + + String user2 = "someone"; + ApplicationId appId3 = ApplicationId.newInstance(3, 3); + + // start and finish local resource for applications + Path appRsrcPath1 = new Path("hdfs://some/app/resource1"); + LocalResourcePBImpl rsrcPb1 = (LocalResourcePBImpl) + LocalResource.newInstance( + URL.fromPath(appRsrcPath1), + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION, + 123L, 456L); + LocalResourceProto appRsrcProto1 = rsrcPb1.getProto(); + Path appRsrcLocalPath1 = new Path("/some/local/dir/for/apprsrc1"); + Path appRsrcPath2 = new Path("hdfs://some/app/resource2"); + LocalResourcePBImpl rsrcPb2 = (LocalResourcePBImpl) + LocalResource.newInstance( + URL.fromPath(appRsrcPath2), + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION, + 123L, 456L); + LocalResourceProto appRsrcProto2 = rsrcPb2.getProto(); + Path appRsrcLocalPath2 = new Path("/some/local/dir/for/apprsrc2"); + Path appRsrcPath3 = new Path("hdfs://some/app/resource3"); + LocalResourcePBImpl rsrcPb3 = (LocalResourcePBImpl) + LocalResource.newInstance( + URL.fromPath(appRsrcPath3), + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION, + 123L, 456L); + LocalResourceProto appRsrcProto3 = rsrcPb3.getProto(); + Path appRsrcLocalPath3 = new Path("/some/local/dir/for/apprsrc2"); + + stateStore.startResourceLocalization(user1, appId1, appRsrcProto1, + appRsrcLocalPath1); + stateStore.startResourceLocalization(user1, appId2, appRsrcProto2, + appRsrcLocalPath2); + stateStore.startResourceLocalization(user2, appId3, appRsrcProto3, + appRsrcLocalPath3); + + LocalizedResourceProto appLocalizedProto1 = + LocalizedResourceProto.newBuilder() + .setResource(appRsrcProto1) + .setLocalPath(appRsrcLocalPath1.toString()) + .setSize(1234567L) + .build(); + LocalizedResourceProto appLocalizedProto2 = + LocalizedResourceProto.newBuilder() + .setResource(appRsrcProto2) + .setLocalPath(appRsrcLocalPath2.toString()) + .setSize(1234567L) + .build(); + LocalizedResourceProto appLocalizedProto3 = + LocalizedResourceProto.newBuilder() + .setResource(appRsrcProto3) + .setLocalPath(appRsrcLocalPath3.toString()) + .setSize(1234567L) + .build(); + + + stateStore.finishResourceLocalization(user1, appId1, appLocalizedProto1); + stateStore.finishResourceLocalization(user1, appId2, appLocalizedProto2); + stateStore.finishResourceLocalization(user2, appId3, appLocalizedProto3); + + + List completedResources = + new ArrayList(); + Map startedResources = + new HashMap(); + + // restart and verify two users exist and two apps completed for user1. + restartStateStore(); + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + Map userResources = + loadUserResources(state.getIterator()); + assertEquals(2, userResources.size()); + + RecoveredUserResources uResource = userResources.get(user1); + assertEquals(2, uResource.getAppTrackerStates().size()); + LocalResourceTrackerState app1ts = + uResource.getAppTrackerStates().get(appId1); + assertNotNull(app1ts); + completedResources = loadCompletedResources( + app1ts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + app1ts.getStartedResourcesIterator()); + assertTrue(startedResources.isEmpty()); + assertEquals(1, completedResources.size()); + assertEquals(appLocalizedProto1, + completedResources.iterator().next()); + LocalResourceTrackerState app2ts = + uResource.getAppTrackerStates().get(appId2); + assertNotNull(app2ts); + completedResources = loadCompletedResources( + app2ts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + app2ts.getStartedResourcesIterator()); + assertTrue(startedResources.isEmpty()); + assertEquals(1, completedResources.size()); + assertEquals(appLocalizedProto2, + completedResources.iterator().next()); + } + @Test public void testStartResourceLocalization() throws IOException { String user = "somebody"; @@ -558,27 +687,39 @@ public void testStartResourceLocalization() throws IOException { stateStore.startResourceLocalization(user, appId, appRsrcProto, appRsrcLocalPath); + List completedResources = + new ArrayList(); + Map startedResources = + new HashMap(); + // restart and verify only app resource is marked in-progress restartStateStore(); RecoveredLocalizationState state = stateStore.loadLocalizationState(); LocalResourceTrackerState pubts = state.getPublicTrackerState(); - assertTrue(pubts.getLocalizedResources().isEmpty()); - assertTrue(pubts.getInProgressResources().isEmpty()); + completedResources = loadCompletedResources( + pubts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + pubts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertTrue(startedResources.isEmpty()); Map userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); assertNotNull(privts); - assertTrue(privts.getLocalizedResources().isEmpty()); - assertTrue(privts.getInProgressResources().isEmpty()); + assertTrue(privts.isEmpty()); assertEquals(1, rur.getAppTrackerStates().size()); LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId); assertNotNull(appts); - assertTrue(appts.getLocalizedResources().isEmpty()); - assertEquals(1, appts.getInProgressResources().size()); + completedResources = loadCompletedResources( + appts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + appts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertEquals(1, startedResources.size()); assertEquals(appRsrcLocalPath, - appts.getInProgressResources().get(appRsrcProto)); + startedResources.get(appRsrcProto)); // start some public and private resources Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); @@ -613,28 +754,40 @@ public void testStartResourceLocalization() throws IOException { restartStateStore(); state = stateStore.loadLocalizationState(); pubts = state.getPublicTrackerState(); - assertTrue(pubts.getLocalizedResources().isEmpty()); - assertEquals(2, pubts.getInProgressResources().size()); + completedResources = loadCompletedResources( + pubts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + pubts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertEquals(2, startedResources.size()); assertEquals(pubRsrcLocalPath1, - pubts.getInProgressResources().get(pubRsrcProto1)); + startedResources.get(pubRsrcProto1)); assertEquals(pubRsrcLocalPath2, - pubts.getInProgressResources().get(pubRsrcProto2)); + startedResources.get(pubRsrcProto2)); userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); assertNotNull(privts); - assertTrue(privts.getLocalizedResources().isEmpty()); - assertEquals(1, privts.getInProgressResources().size()); + completedResources = loadCompletedResources( + privts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + privts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertEquals(1, startedResources.size()); assertEquals(privRsrcLocalPath, - privts.getInProgressResources().get(privRsrcProto)); + startedResources.get(privRsrcProto)); assertEquals(1, rur.getAppTrackerStates().size()); appts = rur.getAppTrackerStates().get(appId); assertNotNull(appts); - assertTrue(appts.getLocalizedResources().isEmpty()); - assertEquals(1, appts.getInProgressResources().size()); + completedResources = loadCompletedResources( + appts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + appts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertEquals(1, startedResources.size()); assertEquals(appRsrcLocalPath, - appts.getInProgressResources().get(appRsrcProto)); + startedResources.get(appRsrcProto)); } @Test @@ -661,27 +814,39 @@ public void testFinishResourceLocalization() throws IOException { .build(); stateStore.finishResourceLocalization(user, appId, appLocalizedProto); + List completedResources = + new ArrayList(); + Map startedResources = + new HashMap(); + // restart and verify only app resource is completed restartStateStore(); RecoveredLocalizationState state = stateStore.loadLocalizationState(); LocalResourceTrackerState pubts = state.getPublicTrackerState(); - assertTrue(pubts.getLocalizedResources().isEmpty()); - assertTrue(pubts.getInProgressResources().isEmpty()); + completedResources = loadCompletedResources( + pubts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + pubts.getStartedResourcesIterator()); + assertTrue(completedResources.isEmpty()); + assertTrue(startedResources.isEmpty()); Map userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); assertNotNull(privts); - assertTrue(privts.getLocalizedResources().isEmpty()); - assertTrue(privts.getInProgressResources().isEmpty()); + assertTrue(privts.isEmpty()); assertEquals(1, rur.getAppTrackerStates().size()); LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId); assertNotNull(appts); - assertTrue(appts.getInProgressResources().isEmpty()); - assertEquals(1, appts.getLocalizedResources().size()); + completedResources = loadCompletedResources( + appts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + appts.getStartedResourcesIterator()); + assertTrue(startedResources.isEmpty()); + assertEquals(1, completedResources.size()); assertEquals(appLocalizedProto, - appts.getLocalizedResources().iterator().next()); + completedResources.iterator().next()); // start some public and private resources Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); @@ -732,28 +897,40 @@ public void testFinishResourceLocalization() throws IOException { restartStateStore(); state = stateStore.loadLocalizationState(); pubts = state.getPublicTrackerState(); - assertEquals(1, pubts.getLocalizedResources().size()); + completedResources = loadCompletedResources( + pubts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + pubts.getStartedResourcesIterator()); + assertEquals(1, completedResources.size()); assertEquals(pubLocalizedProto1, - pubts.getLocalizedResources().iterator().next()); - assertEquals(1, pubts.getInProgressResources().size()); + completedResources.iterator().next()); + assertEquals(1, startedResources.size()); assertEquals(pubRsrcLocalPath2, - pubts.getInProgressResources().get(pubRsrcProto2)); + startedResources.get(pubRsrcProto2)); userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); assertNotNull(privts); - assertEquals(1, privts.getLocalizedResources().size()); + completedResources = loadCompletedResources( + privts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + privts.getStartedResourcesIterator()); + assertEquals(1, completedResources.size()); assertEquals(privLocalizedProto, - privts.getLocalizedResources().iterator().next()); - assertTrue(privts.getInProgressResources().isEmpty()); + completedResources.iterator().next()); + assertTrue(startedResources.isEmpty()); assertEquals(1, rur.getAppTrackerStates().size()); appts = rur.getAppTrackerStates().get(appId); assertNotNull(appts); - assertTrue(appts.getInProgressResources().isEmpty()); - assertEquals(1, appts.getLocalizedResources().size()); + completedResources = loadCompletedResources( + appts.getCompletedResourcesIterator()); + startedResources = loadStartedResources( + appts.getStartedResourcesIterator()); + assertTrue(startedResources.isEmpty()); + assertEquals(1, completedResources.size()); assertEquals(appLocalizedProto, - appts.getLocalizedResources().iterator().next()); + completedResources.iterator().next()); } @Test @@ -841,10 +1018,14 @@ public void testRemoveLocalizedResource() throws IOException { restartStateStore(); RecoveredLocalizationState state = stateStore.loadLocalizationState(); LocalResourceTrackerState pubts = state.getPublicTrackerState(); - assertTrue(pubts.getInProgressResources().isEmpty()); - assertEquals(1, pubts.getLocalizedResources().size()); + List completedResources = + loadCompletedResources(pubts.getCompletedResourcesIterator()); + Map startedResources = + loadStartedResources(pubts.getStartedResourcesIterator()); + assertTrue(startedResources.isEmpty()); + assertEquals(1, completedResources.size()); assertEquals(pubLocalizedProto1, - pubts.getLocalizedResources().iterator().next()); + completedResources.iterator().next()); Map userResources = loadUserResources(state.getIterator()); assertTrue(userResources.isEmpty());