diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index ae81dc11187..4c8c7c759b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager; import static java.util.concurrent.TimeUnit.SECONDS; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,16 +98,20 @@ public void delete(DeletionTask deletionTask) { private void recover(NMStateStoreService.RecoveredDeletionServiceState state) throws IOException { - List taskProtos = state.getTasks(); Map idToInfoMap = - new HashMap<>(taskProtos.size()); - Set successorTasks = new HashSet<>(); - for (DeletionServiceDeleteTaskProto proto : taskProtos) { - DeletionTaskRecoveryInfo info = - NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this); - idToInfoMap.put(info.getTask().getTaskId(), info); - nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId())); - successorTasks.addAll(info.getSuccessorTaskIds()); + new HashMap(); + Set successorTasks = new HashSet(); + + + try (RecoveryIterator it = state.getIterator()) { + while(it.hasNext()){ + DeletionServiceDeleteTaskProto proto = it.next(); + DeletionTaskRecoveryInfo info = + NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this); + idToInfoMap.put(info.getTask().getTaskId(), info); + nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId())); + successorTasks.addAll(info.getSuccessorTaskIds()); + } } // restore the task dependencies and schedule the deletion tasks that diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 8b3525820cf..9a0cb9f8466 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -356,19 +357,27 @@ private void recover() throws IOException, URISyntaxException { stateStore.loadLocalizationState()); RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); - for (ContainerManagerApplicationProto proto : - appsState.getApplications()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering application with state: " + proto.toString()); + try (RecoveryIterator appsStateIterator + = appsState.getIterator()) { + while (appsStateIterator.hasNext()) { + ContainerManagerApplicationProto applicationProto = appsStateIterator.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering application with state: " + applicationProto.toString()); + } + recoverApplication(applicationProto); } - recoverApplication(proto); } - for (RecoveredContainerState rcs : stateStore.loadContainersState()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering container with state: " + rcs); + + try (RecoveryIterator rcsIterator = + stateStore.getConstainerStateIterator()) { + while(rcsIterator.hasNext()) { + RecoveredContainerState rcs = rcsIterator.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering container with state: " + rcs); + } + recoverContainer(rcs); } - recoverContainer(rcs); } // Recovery AMRMProxy state after apps and containers are recovered diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 3834ecec6f9..1ce5e7d7c82 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -19,6 +19,8 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -295,42 +297,45 @@ public void onDirsChanged() { //Recover localized resources after an NM restart public void recoverLocalizedResources(RecoveredLocalizationState state) - throws URISyntaxException { + throws URISyntaxException, IOException { LocalResourceTrackerState trackerState = state.getPublicTrackerState(); recoverTrackerResources(publicRsrc, trackerState); - for (Map.Entry userEntry : - state.getUserResources().entrySet()) { - String user = userEntry.getKey(); - RecoveredUserResources userResources = userEntry.getValue(); - trackerState = userResources.getPrivateTrackerState(); - if (!trackerState.isEmpty()) { - LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - null, dispatcher, true, super.getConfig(), stateStore, dirsHandler); - LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, - tracker); - if (oldTracker != null) { - tracker = oldTracker; - } - recoverTrackerResources(tracker, trackerState); - } - - for (Map.Entry appEntry : - userResources.getAppTrackerStates().entrySet()) { - trackerState = appEntry.getValue(); + try (RecoveryIterator> it + = state.getIterator()) { + while(it.hasNext()){ + Map.Entry userEntry = it.next(); + String user = userEntry.getKey(); + RecoveredUserResources userResources = userEntry.getValue(); + trackerState = userResources.getPrivateTrackerState(); if (!trackerState.isEmpty()) { - ApplicationId appId = appEntry.getKey(); - String appIdStr = appId.toString(); LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - appId, dispatcher, false, super.getConfig(), stateStore, - dirsHandler); - LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, + null, dispatcher, true, super.getConfig(), stateStore, dirsHandler); + LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, tracker); if (oldTracker != null) { tracker = oldTracker; } recoverTrackerResources(tracker, trackerState); } + + for (Map.Entry appEntry : + userResources.getAppTrackerStates().entrySet()) { + trackerState = appEntry.getValue(); + if (!trackerState.isEmpty()) { + ApplicationId appId = appEntry.getKey(); + String appIdStr = appId.toString(); + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + appId, dispatcher, false, super.getConfig(), stateStore, + dirsHandler); + LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, + tracker); + if (oldTracker != null) { + tracker = oldTracker; + } + recoverTrackerResources(tracker, trackerState); + } + } } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 67f642df3e8..5073258bbc9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -66,6 +66,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -73,6 +74,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -225,23 +227,74 @@ boolean isHealthy() { return isHealthy; } - @Override - public List loadContainersState() - throws IOException { - ArrayList containers = - new ArrayList(); - ArrayList containersToRemove = - new ArrayList(); - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(CONTAINERS_KEY_PREFIX)); + // LeveldbIterator starting at startkey + private LeveldbIterator getLevelDBIterator(String startKey) { + LeveldbIterator it = new LeveldbIterator(db); + it.seek(bytes(startKey)); + return it; + } - while (iter.hasNext()) { - Entry entry = iter.peekNext(); + // Base Recovery Iterator + private abstract class BaseRecoveryIterator implements RecoveryIterator { + LeveldbIterator it; + T nextItem; + + BaseRecoveryIterator(String dbKey){ + this.it = getLevelDBIterator(dbKey); + this.nextItem = null; + } + + protected abstract T getNextItem(LeveldbIterator it) throws IOException; + + @Override + public boolean hasNext() throws IOException { + if (nextItem == null){ + nextItem = getNextItem(it); + } + return (nextItem != null); + } + + @Override + public T next() throws IOException, NoSuchElementException { + T tmp = nextItem; + if (tmp != null){ + nextItem = null; + return tmp; + } else { + tmp = getNextItem(it); + if (tmp == null) throw new NoSuchElementException(); + return tmp; + } + } + + @Override + public void close() throws IOException { + if (it != null){ + it.close(); + } + } + } + + // Container Recovery Iterator + private class RCSIterator extends BaseRecoveryIterator { + RCSIterator() { + super(CONTAINERS_KEY_PREFIX); + } + + @Override + protected RecoveredContainerState getNextItem(LeveldbIterator it) throws IOException { + return getNextRecoveredContainer(it); + } + } + + private RecoveredContainerState getNextRecoveredContainer(LeveldbIterator it) throws IOException { + RecoveredContainerState rcs = null; + try { + while (it.hasNext()) { + Entry entry = it.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(CONTAINERS_KEY_PREFIX)) { - break; + return null; } int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length()); @@ -250,42 +303,32 @@ boolean isHealthy() { } ContainerId containerId = ContainerId.fromString( key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos)); - String keyPrefix = key.substring(0, idEndPos+1); - RecoveredContainerState rcs = loadContainerState(containerId, - iter, keyPrefix); - // Don't load container without StartContainerRequest - if (rcs.startRequest != null) { - containers.add(rcs); - } else { - containersToRemove.add(containerId); + String keyPrefix = key.substring(0, idEndPos + 1); + rcs = loadContainerState(it, keyPrefix); + if (rcs.startRequest != null){ + break; + } + else { + removeContainer(containerId); + rcs = null; } } - } catch (DBException e) { + } catch (DBException e){ throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return rcs; + } - // remove container without StartContainerRequest - for (ContainerId containerId : containersToRemove) { - LOG.warn("Remove container " + containerId + - " with incomplete records"); - try { - removeContainer(containerId); - // TODO: kill and cleanup the leaked container - } catch (IOException e) { - LOG.error("Unable to remove container " + containerId + - " in store", e); - } - } - return containers; + @Override + public RecoveryIterator getConstainerStateIterator() { + return new RCSIterator(); } - private RecoveredContainerState loadContainerState(ContainerId containerId, + private RecoveredContainerState loadContainerState( LeveldbIterator iter, String keyPrefix) throws IOException { + ContainerId containerId = ContainerId.fromString( + keyPrefix.substring(CONTAINERS_KEY_PREFIX.length(), keyPrefix.length()-1)); RecoveredContainerState rcs = new RecoveredContainerState(); rcs.status = RecoveredContainerStatus.REQUESTED; while (iter.hasNext()) { @@ -680,35 +723,43 @@ public void removeContainer(ContainerId containerId) } - @Override - public RecoveredApplicationsState loadApplicationsState() + // Application Recovery Iterator + private class RASIterator extends BaseRecoveryIterator { + RASIterator() { + super(APPLICATIONS_KEY_PREFIX); + } + + @Override + protected ContainerManagerApplicationProto getNextItem(LeveldbIterator it) + throws IOException { + return getNextRecoveredApplication(it); + } + } + + private ContainerManagerApplicationProto getNextRecoveredApplication(LeveldbIterator it) throws IOException { - RecoveredApplicationsState state = new RecoveredApplicationsState(); - state.applications = new ArrayList(); - String keyPrefix = APPLICATIONS_KEY_PREFIX; - LeveldbIterator iter = null; + ContainerManagerApplicationProto applicationProto = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(keyPrefix)); - while (iter.hasNext()) { - Entry entry = iter.next(); + if (it.hasNext()) { + Entry entry = it.next(); String key = asString(entry.getKey()); - if (!key.startsWith(keyPrefix)) { - break; + if (!key.startsWith(APPLICATIONS_KEY_PREFIX)) { + return null; } - state.applications.add( - ContainerManagerApplicationProto.parseFrom(entry.getValue())); + applicationProto = ContainerManagerApplicationProto.parseFrom(entry.getValue()); } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return applicationProto; + } + @Override + public RecoveredApplicationsState loadApplicationsState() + throws IOException { + RecoveredApplicationsState state = new RecoveredApplicationsState(); + state.it = new RASIterator(); cleanupDeprecatedFinishedApps(); - return state; } @@ -752,24 +803,28 @@ public void removeApplication(ApplicationId appId) } - @Override - public RecoveredLocalizationState loadLocalizationState() - throws IOException { - RecoveredLocalizationState state = new RecoveredLocalizationState(); + // User Resource Recovery Iterator. + private class RURIterator extends BaseRecoveryIterator + > { + RURIterator(){ + super(LOCALIZATION_PRIVATE_KEY_PREFIX); + } - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX)); - state.publicTrackerState = loadResourceTrackerState(iter, - LOCALIZATION_PUBLIC_KEY_PREFIX); + @Override + protected Entry getNextItem(LeveldbIterator it) throws IOException { + return getNextRecoveredLocalizationEntry(it); + } + } - iter.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX)); - while (iter.hasNext()) { - Entry entry = iter.peekNext(); + private Entry getNextRecoveredLocalizationEntry + (LeveldbIterator it) throws IOException { + Entry localEntry = null; + try{ + if (it.hasNext()) { + Entryentry = it.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(LOCALIZATION_PRIVATE_KEY_PREFIX)) { - break; + return null; } int userEndPos = key.indexOf('/', @@ -780,17 +835,23 @@ public RecoveredLocalizationState loadLocalizationState() } String user = key.substring( LOCALIZATION_PRIVATE_KEY_PREFIX.length(), userEndPos); - state.userResources.put(user, loadUserLocalizedResources(iter, - key.substring(0, userEndPos+1))); + RecoveredUserResources val = loadUserLocalizedResources(it, + key.substring(0, userEndPos+1)); + localEntry = new AbstractMap.SimpleEntry<>(user, val); } } catch (DBException e) { - throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } + throw new IOException(); } + return localEntry; + } + @Override + public RecoveredLocalizationState loadLocalizationState() + throws IOException { + RecoveredLocalizationState state = new RecoveredLocalizationState(); + LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX); + state.publicTrackerState = loadResourceTrackerState(it, LOCALIZATION_PUBLIC_KEY_PREFIX); + state.it = new RURIterator(); return state; } @@ -981,32 +1042,41 @@ private String getResourceTrackerKeyPrefix(String user, + LOCALIZATION_APPCACHE_SUFFIX + appId + "/"; } + // Deletion State Recovery Iterator. + private class RDSIterator extends BaseRecoveryIterator { + RDSIterator() { + super(DELETION_TASK_KEY_PREFIX); + } - @Override - public RecoveredDeletionServiceState loadDeletionServiceState() + @Override + protected DeletionServiceDeleteTaskProto getNextItem(LeveldbIterator it) throws IOException { + return getNextRecoveredDeletionService(it); + } + } + + private DeletionServiceDeleteTaskProto getNextRecoveredDeletionService(LeveldbIterator it) throws IOException { - RecoveredDeletionServiceState state = new RecoveredDeletionServiceState(); - state.tasks = new ArrayList(); - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(DELETION_TASK_KEY_PREFIX)); - while (iter.hasNext()) { - Entry entry = iter.next(); + DeletionServiceDeleteTaskProto deleteProto = null; + try{ + if (it.hasNext()){ + Entry entry = it.next(); String key = asString(entry.getKey()); if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) { - break; + return null; } - state.tasks.add( - DeletionServiceDeleteTaskProto.parseFrom(entry.getValue())); + deleteProto = DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()); } - } catch (DBException e) { + } catch (DBException e){ throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return deleteProto; + } + + @Override + public RecoveredDeletionServiceState loadDeletionServiceState() + throws IOException { + RecoveredDeletionServiceState state = new RecoveredDeletionServiceState(); + state.it = new RDSIterator(); return state; } @@ -1033,28 +1103,46 @@ public void removeDeletionTask(int taskId) throws IOException { } } + private MasterKey getMasterKey(String key) throws IOException{ + MasterKey masterKey = null; + try (LeveldbIterator it = getLevelDBIterator(key)) { + if (it.hasNext()) { + Entry entry = it.next(); + String fullKey = asString(entry.getKey()); + if (fullKey.startsWith(key)) { + masterKey = parseMasterKey(entry.getValue()); + } + } + } + return masterKey; + } - @Override - public RecoveredNMTokensState loadNMTokensState() throws IOException { - RecoveredNMTokensState state = new RecoveredNMTokensState(); - state.applicationMasterKeys = - new HashMap(); - LeveldbIterator iter = null; + // Recover NMTokens Iterator + private class RNTIterator extends BaseRecoveryIterator + > { + + RNTIterator(){ + super(NM_TOKENS_KEY_PREFIX); + } + + @Override + protected Entry getNextItem(LeveldbIterator it) throws IOException { + return getNextMasterKeyEntry(it); + } + } + + private Entry getNextMasterKeyEntry + (LeveldbIterator it) throws IOException { + Entry masterKeyentry = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(NM_TOKENS_KEY_PREFIX)); - while (iter.hasNext()) { - Entry entry = iter.next(); + while (it.hasNext()){ + Entry entry = it.next(); String fullKey = asString(entry.getKey()); if (!fullKey.startsWith(NM_TOKENS_KEY_PREFIX)) { break; } String key = fullKey.substring(NM_TOKENS_KEY_PREFIX.length()); - if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) { - state.currentMasterKey = parseMasterKey(entry.getValue()); - } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) { - state.previousMasterKey = parseMasterKey(entry.getValue()); - } else if (key.startsWith( + if (key.startsWith( ApplicationAttemptId.appAttemptIdStrPrefix)) { ApplicationAttemptId attempt; try { @@ -1063,17 +1151,25 @@ public RecoveredNMTokensState loadNMTokensState() throws IOException { throw new IOException("Bad application master key state for " + fullKey, e); } - state.applicationMasterKeys.put(attempt, + masterKeyentry = new AbstractMap.SimpleEntry<>(attempt, parseMasterKey(entry.getValue())); + break; } } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return masterKeyentry; + } + + @Override + public RecoveredNMTokensState loadNMTokensState() throws IOException { + RecoveredNMTokensState state = new RecoveredNMTokensState(); + state.currentMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX + + CURRENT_MASTER_KEY_SUFFIX); + state.previousMasterKey = getMasterKey(NM_TOKENS_KEY_PREFIX + + PREV_MASTER_KEY_SUFFIX); + state.it = new RNTIterator(); return state; } @@ -1122,45 +1218,42 @@ private void storeMasterKey(String dbKey, MasterKey key) } } + // Recover ContainersToken Iterator. + private class RCTIterator extends BaseRecoveryIterator> { + RCTIterator(){ + super(CONTAINER_TOKENS_KEY_PREFIX); + } - @Override - public RecoveredContainerTokensState loadContainerTokensState() + @Override + protected Entry getNextItem(LeveldbIterator it) throws IOException{ + return getNextContainerToken(it); + } + } + + private Entry getNextContainerToken(LeveldbIterator it) throws IOException { - RecoveredContainerTokensState state = new RecoveredContainerTokensState(); - state.activeTokens = new HashMap(); - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX)); - final int containerTokensKeyPrefixLength = - CONTAINER_TOKENS_KEY_PREFIX.length(); - while (iter.hasNext()) { - Entry entry = iter.next(); + Entry containerTokenEntry = null; + try{ + while (it.hasNext()) { + Entry entry = it.next(); String fullKey = asString(entry.getKey()); if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) { break; } - String key = fullKey.substring(containerTokensKeyPrefixLength); - if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) { - state.currentMasterKey = parseMasterKey(entry.getValue()); - } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) { - state.previousMasterKey = parseMasterKey(entry.getValue()); - } else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) { - loadContainerToken(state, fullKey, key, entry.getValue()); + String key = fullKey.substring(CONTAINER_TOKENS_KEY_PREFIX.length()); + if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) { + containerTokenEntry = loadContainerToken(fullKey, key, entry.getValue()); + break; } } - } catch (DBException e) { + } catch (DBException e){ throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } - return state; + return containerTokenEntry; } - private static void loadContainerToken(RecoveredContainerTokensState state, - String key, String containerIdStr, byte[] value) throws IOException { + private static Entry loadContainerToken + (String key, String containerIdStr, byte[] value) throws IOException { ContainerId containerId; Long expTime; try { @@ -1169,7 +1262,18 @@ private static void loadContainerToken(RecoveredContainerTokensState state, } catch (IllegalArgumentException e) { throw new IOException("Bad container token state for " + key, e); } - state.activeTokens.put(containerId, expTime); + return new AbstractMap.SimpleEntry<>(containerId, expTime); + } + + @Override + public RecoveredContainerTokensState loadContainerTokensState() throws IOException { + RecoveredContainerTokensState state = new RecoveredContainerTokensState(); + state.currentMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX + + CURRENT_MASTER_KEY_SUFFIX); + state.previousMasterKey = getMasterKey(CONTAINER_TOKENS_KEY_PREFIX + + PREV_MASTER_KEY_SUFFIX); + state.it = new RCTIterator(); + return state; } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index dfad9cfee33..75ed78b62a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -65,8 +65,7 @@ public void removeApplication(ApplicationId appId) throws IOException { } @Override - public List loadContainersState() - throws IOException { + public RecoveryIterator getConstainerStateIterator() { throw new UnsupportedOperationException( "Recovery not supported by this state store"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 70decdba743..15b6ce89f19 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -67,12 +68,11 @@ public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { } public static class RecoveredApplicationsState { - List applications; + RecoveryIterator it = null; - public List getApplications() { - return applications; + public RecoveryIterator getIterator() { + return it; } - } /** @@ -248,30 +248,35 @@ public LocalResourceTrackerState getPrivateTrackerState() { public static class RecoveredLocalizationState { LocalResourceTrackerState publicTrackerState = new LocalResourceTrackerState(); - Map userResources = - new HashMap(); + RecoveryIterator> it = null; public LocalResourceTrackerState getPublicTrackerState() { return publicTrackerState; } - public Map getUserResources() { - return userResources; + public RecoveryIterator> + getIterator() { + return it; } } public static class RecoveredDeletionServiceState { - List tasks; + RecoveryIterator it = null; - public List getTasks() { - return tasks; + public RecoveryIterator getIterator(){ + return it; } } public static class RecoveredNMTokensState { MasterKey currentMasterKey; MasterKey previousMasterKey; - Map applicationMasterKeys; + RecoveryIterator> + it = null; + + public RecoveryIterator> getIterator() { + return it; + } public MasterKey getCurrentMasterKey() { return currentMasterKey; @@ -281,15 +286,16 @@ public MasterKey getPreviousMasterKey() { return previousMasterKey; } - public Map getApplicationMasterKeys() { - return applicationMasterKeys; - } } public static class RecoveredContainerTokensState { MasterKey currentMasterKey; MasterKey previousMasterKey; - Map activeTokens; + RecoveryIterator> it = null; + + public RecoveryIterator> getIterator() { + return it; + } public MasterKey getCurrentMasterKey() { return currentMasterKey; @@ -299,9 +305,6 @@ public MasterKey getPreviousMasterKey() { return previousMasterKey; } - public Map getActiveTokens() { - return activeTokens; - } } public static class RecoveredLogDeleterState { @@ -400,12 +403,10 @@ public abstract void removeApplication(ApplicationId appId) /** - * Load the state of containers - * @return recovered state for containers - * @throws IOException + * get the Recovered Container State Iterator + * @return recovery iterator */ - public abstract List loadContainersState() - throws IOException; + public abstract RecoveryIterator getConstainerStateIterator(); /** * Record a container start request diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java new file mode 100644 index 00000000000..0bb262a6b53 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.recovery; + +import java.io.Closeable; +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * A wrapper for a Iterator to translate the raw RuntimeExceptions that + * can be thrown into IOException. + */ +public interface RecoveryIterator extends Closeable { + + /** + * Returns true if the iteration has more elements. + */ + boolean hasNext() throws IOException; + + /** + * Returns the next element in the iteration. + */ + T next() throws IOException, NoSuchElementException; + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java index 256f649cabf..62eb94e7eae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,17 +92,20 @@ public synchronized void recover() super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1; } - for (Entry entry : state.getActiveTokens().entrySet()) { - ContainerId containerId = entry.getKey(); - Long expTime = entry.getValue(); - List containerList = - recentlyStartedContainerTracker.get(expTime); - if (containerList == null) { - containerList = new ArrayList(); - recentlyStartedContainerTracker.put(expTime, containerList); - } - if (!containerList.contains(containerId)) { - containerList.add(containerId); + try (RecoveryIterator> it = state.getIterator()){ + while (it.hasNext()){ + Entry entry = it.next(); + ContainerId containerId = entry.getKey(); + Long expTime = entry.getValue(); + List containerList = + recentlyStartedContainerTracker.get(expTime); + if (containerList == null) { + containerList = new ArrayList(); + recentlyStartedContainerTracker.put(expTime, containerList); + } + if (!containerList.contains(containerId)) { + containerList.add(containerId); + } } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java index 0956e77f7fa..bb45d71ba61 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,11 +89,13 @@ public synchronized void recover() super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1; } - for (Map.Entry entry : - state.getApplicationMasterKeys().entrySet()) { - key = entry.getValue(); - oldMasterKeys.put(entry.getKey(), - new MasterKeyData(key, createSecretKey(key.getBytes().array()))); + try(RecoveryIterator> it = state.getIterator()) { + while(it.hasNext()){ + Map.Entry entry = it.next(); + key = entry.getValue(); + oldMasterKeys.put(entry.getKey(), + new MasterKeyData(key, createSecretKey(key.getBytes().array()))); + } } // reconstruct app to app attempts map diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index c5428d184b7..2c0fd1a1bf7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -56,6 +57,8 @@ private Map deleteTasks; private RecoveredNMTokensState nmTokenState; private RecoveredContainerTokensState containerTokenState; + private Map applicationMasterKeys; + private Map activeTokens; private Map logDeleterState; private RecoveredAMRMProxyState amrmProxyState; @@ -68,10 +71,10 @@ protected void initStorage(Configuration conf) { apps = new HashMap(); containerStates = new HashMap(); nmTokenState = new RecoveredNMTokensState(); - nmTokenState.applicationMasterKeys = + applicationMasterKeys = new HashMap(); containerTokenState = new RecoveredContainerTokensState(); - containerTokenState.activeTokens = new HashMap(); + activeTokens = new HashMap(); trackerStates = new HashMap(); deleteTasks = new HashMap(); logDeleterState = new HashMap(); @@ -86,13 +89,35 @@ protected void startStorage() { protected void closeStorage() { } + // Recovery Iterator Implementation. + private class NMMemoryRecoveryIterator implements RecoveryIterator{ + + private Iterator it; + + NMMemoryRecoveryIterator(Iterator it){ + this.it = it; + } + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public T next() throws IOException { + return it.next(); + } + + @Override + public void close() throws IOException { } + } @Override public synchronized RecoveredApplicationsState loadApplicationsState() throws IOException { RecoveredApplicationsState state = new RecoveredApplicationsState(); - state.applications = new ArrayList( - apps.values()); + state.it = new NMMemoryRecoveryIterator + (apps.values().iterator()); return state; } @@ -111,8 +136,7 @@ public synchronized void removeApplication(ApplicationId appId) } @Override - public synchronized List loadContainersState() - throws IOException { + public RecoveryIterator getConstainerStateIterator() { // return a copy so caller can't modify our state List result = new ArrayList(containerStates.size()); @@ -131,7 +155,9 @@ public synchronized void removeApplication(ApplicationId appId) rcsCopy.setResourceMappings(rcs.getResourceMappings()); result.add(rcsCopy); } - return result; + NMMemoryRecoveryIterator it = + new NMMemoryRecoveryIterator(result.iterator()); + return it; } @Override @@ -284,6 +310,7 @@ private TrackerState getTrackerState(TrackerKey key) { @Override public synchronized RecoveredLocalizationState loadLocalizationState() { RecoveredLocalizationState result = new RecoveredLocalizationState(); + Map userResources = new HashMap<>(); for (Map.Entry e : trackerStates.entrySet()) { TrackerKey tk = e.getKey(); TrackerState ts = e.getValue(); @@ -294,10 +321,10 @@ public synchronized RecoveredLocalizationState loadLocalizationState() { if (tk.user == null) { result.publicTrackerState = loadTrackerState(ts); } else { - RecoveredUserResources rur = result.userResources.get(tk.user); + RecoveredUserResources rur = userResources.get(tk.user); if (rur == null) { rur = new RecoveredUserResources(); - result.userResources.put(tk.user, rur); + userResources.put(tk.user, rur); } if (tk.appId == null) { rur.privateTrackerState = loadTrackerState(ts); @@ -306,6 +333,8 @@ public synchronized RecoveredLocalizationState loadLocalizationState() { } } } + result.it = new NMMemoryRecoveryIterator> + (userResources.entrySet().iterator()); return result; } @@ -341,8 +370,8 @@ public synchronized RecoveredDeletionServiceState loadDeletionServiceState() throws IOException { RecoveredDeletionServiceState result = new RecoveredDeletionServiceState(); - result.tasks = new ArrayList( - deleteTasks.values()); + result.it = new NMMemoryRecoveryIterator + (deleteTasks.values().iterator()); return result; } @@ -365,9 +394,8 @@ public synchronized RecoveredNMTokensState loadNMTokensState() RecoveredNMTokensState result = new RecoveredNMTokensState(); result.currentMasterKey = nmTokenState.currentMasterKey; result.previousMasterKey = nmTokenState.previousMasterKey; - result.applicationMasterKeys = - new HashMap( - nmTokenState.applicationMasterKeys); + result.it = new NMMemoryRecoveryIterator> + (applicationMasterKeys.entrySet().iterator()); return result; } @@ -389,14 +417,14 @@ public synchronized void storeNMTokenPreviousMasterKey(MasterKey key) public synchronized void storeNMTokenApplicationMasterKey( ApplicationAttemptId attempt, MasterKey key) throws IOException { MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; - nmTokenState.applicationMasterKeys.put(attempt, + applicationMasterKeys.put(attempt, new MasterKeyPBImpl(keypb.getProto())); } @Override public synchronized void removeNMTokenApplicationMasterKey( ApplicationAttemptId attempt) throws IOException { - nmTokenState.applicationMasterKeys.remove(attempt); + applicationMasterKeys.remove(attempt); } @@ -408,8 +436,8 @@ public synchronized RecoveredContainerTokensState loadContainerTokensState() new RecoveredContainerTokensState(); result.currentMasterKey = containerTokenState.currentMasterKey; result.previousMasterKey = containerTokenState.previousMasterKey; - result.activeTokens = - new HashMap(containerTokenState.activeTokens); + result.it = new NMMemoryRecoveryIterator> + (activeTokens.entrySet().iterator()); return result; } @@ -432,13 +460,13 @@ public synchronized void storeContainerTokenPreviousMasterKey(MasterKey key) @Override public synchronized void storeContainerToken(ContainerId containerId, Long expirationTime) throws IOException { - containerTokenState.activeTokens.put(containerId, expirationTime); + activeTokens.put(containerId, expirationTime); } @Override public synchronized void removeContainerToken(ContainerId containerId) throws IOException { - containerTokenState.activeTokens.remove(containerId); + activeTokens.remove(containerId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 8a8cfa2644e..1f8e83537b2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -125,6 +125,69 @@ public void cleanup() throws IOException { FileUtil.fullyDelete(TMP_DIR); } + private List loadContainersState + (RecoveryIterator it) throws IOException { + List containers = new ArrayList(); + while(it.hasNext()){ + RecoveredContainerState rcs = it.next(); + if (rcs.startRequest != null) { + containers.add(rcs); + } + } + return containers; + } + + private List loadApplicationProtos + (RecoveryIterator it) throws IOException { + List applicationProtos = + new ArrayList(); + while(it.hasNext()){ + applicationProtos.add(it.next()); + } + return applicationProtos; + } + + private List loadDeletionTaskProtos + (RecoveryIterator it) throws IOException { + List deleteTaskProtos = + new ArrayList(); + while (it.hasNext()){ + deleteTaskProtos.add(it.next()); + } + return deleteTaskProtos; + } + + private Map loadUserResources + (RecoveryIterator> it) throws IOException { + Map userResources = + new HashMap(); + while (it.hasNext()){ + Map.Entry entry = it.next(); + userResources.put(entry.getKey(), entry.getValue()); + } + return userResources; + } + + private Map loadNMTokens + (RecoveryIterator > it) throws IOException { + Map nmTokens = new HashMap<>(); + while (it.hasNext()){ + Map.Entry entry = it.next(); + nmTokens.put(entry.getKey(), entry.getValue()); + } + return nmTokens; + } + + private Map loadContainerTokens + (RecoveryIterator > it) throws IOException { + Map containerTokens = new HashMap<>(); + while (it.hasNext()){ + Map.Entry entry = it.next(); + containerTokens.put(entry.getKey(), entry.getValue()); + } + return containerTokens; + } + private void restartStateStore() throws IOException { // need to close so leveldb releases database lock if (stateStore != null) { @@ -142,7 +205,7 @@ private void verifyEmptyState() throws IOException { assertNotNull(pubts); assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); - assertTrue(state.getUserResources().isEmpty()); + assertTrue(loadUserResources(state.getIterator()).isEmpty()); } @Test @@ -192,7 +255,8 @@ public void testCheckVersion() throws IOException { public void testApplicationStorage() throws IOException { // test empty when no state RecoveredApplicationsState state = stateStore.loadApplicationsState(); - assertTrue(state.getApplications().isEmpty()); + List apps = loadApplicationProtos(state.getIterator()); + assertTrue(apps.isEmpty()); // store an application and verify recovered final ApplicationId appId1 = ApplicationId.newInstance(1234, 1); @@ -204,8 +268,9 @@ public void testApplicationStorage() throws IOException { stateStore.storeApplication(appId1, appProto1); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(1, state.getApplications().size()); - assertEquals(appProto1, state.getApplications().get(0)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(1, apps.size()); + assertEquals(appProto1, apps.get(0)); // add a new app final ApplicationId appId2 = ApplicationId.newInstance(1234, 2); @@ -216,23 +281,25 @@ public void testApplicationStorage() throws IOException { stateStore.storeApplication(appId2, appProto2); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(2, state.getApplications().size()); - assertTrue(state.getApplications().contains(appProto1)); - assertTrue(state.getApplications().contains(appProto2)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(2, apps.size()); + assertTrue(apps.contains(appProto1)); + assertTrue(apps.contains(appProto2)); // test removing an application stateStore.removeApplication(appId2); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(1, state.getApplications().size()); - assertEquals(appProto1, state.getApplications().get(0)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(1, apps.size()); + assertEquals(appProto1, apps.get(0)); } @Test public void testContainerStorage() throws IOException { // test empty when no state List recoveredContainers = - stateStore.loadContainersState(); + loadContainersState(stateStore.getConstainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); // create a container request @@ -254,7 +321,7 @@ public void testContainerStorage() throws IOException { stateStore.getContainerVersionKey(containerId.toString())))); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); @@ -269,14 +336,14 @@ public void testContainerStorage() throws IOException { // store a new container record without StartContainerRequest ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); stateStore.storeContainerLaunched(containerId1); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); // check whether the new container record is discarded assertEquals(1, recoveredContainers.size()); // queue the container, and verify recovered stateStore.storeContainerQueued(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); @@ -292,7 +359,7 @@ public void testContainerStorage() throws IOException { diags.append("some diags for container"); stateStore.storeContainerDiagnostics(containerId, diags); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); @@ -305,7 +372,7 @@ public void testContainerStorage() throws IOException { // pause the container, and verify recovered stateStore.storeContainerPaused(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus()); @@ -316,7 +383,7 @@ public void testContainerStorage() throws IOException { // Resume the container stateStore.removeContainerPaused(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); assertEquals(1, recoveredContainers.size()); // increase the container size, and verify recovered @@ -328,7 +395,7 @@ public void testContainerStorage() throws IOException { stateStore .storeContainerUpdateToken(containerId, updateTokenIdentifier); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); @@ -342,7 +409,7 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerKilled(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); @@ -358,7 +425,7 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerCompleted(containerId, 21); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); @@ -371,7 +438,7 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerWorkDir(containerId, "/test/workdir"); stateStore.storeContainerLogDir(containerId, "/test/logdir"); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(6, rcs.getRemainingRetryAttempts()); @@ -382,12 +449,12 @@ public void testContainerStorage() throws IOException { // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); // recover again to check remove clears all containers restartStateStore(); NMStateStoreService nmStoreSpy = spy(stateStore); - nmStoreSpy.loadContainersState(); + loadContainersState(nmStoreSpy.getConstainerStateIterator()); verify(nmStoreSpy,times(0)).removeContainer(any(ContainerId.class)); } @@ -399,7 +466,7 @@ private void validateRetryAttempts(ContainerId containerId) stateStore.storeContainerRestartTimes(containerId, finishTimeForRetryAttempts); restartStateStore(); - RecoveredContainerState rcs = stateStore.loadContainersState().get(0); + RecoveredContainerState rcs = loadContainersState(stateStore.getConstainerStateIterator()).get(0); List recoveredRestartTimes = rcs.getRestartTimes(); assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0)); assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1)); @@ -481,7 +548,7 @@ public void testStartResourceLocalization() throws IOException { assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); Map userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); @@ -535,7 +602,7 @@ public void testStartResourceLocalization() throws IOException { pubts.getInProgressResources().get(pubRsrcProto1)); assertEquals(pubRsrcLocalPath2, pubts.getInProgressResources().get(pubRsrcProto2)); - userResources = state.getUserResources(); + userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); @@ -584,7 +651,7 @@ public void testFinishResourceLocalization() throws IOException { assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); Map userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); @@ -654,7 +721,7 @@ public void testFinishResourceLocalization() throws IOException { assertEquals(1, pubts.getInProgressResources().size()); assertEquals(pubRsrcLocalPath2, pubts.getInProgressResources().get(pubRsrcProto2)); - userResources = state.getUserResources(); + userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); @@ -762,7 +829,7 @@ public void testRemoveLocalizedResource() throws IOException { assertEquals(pubLocalizedProto1, pubts.getLocalizedResources().iterator().next()); Map userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertTrue(userResources.isEmpty()); } @@ -771,7 +838,8 @@ public void testDeletionTaskStorage() throws IOException { // test empty when no state RecoveredDeletionServiceState state = stateStore.loadDeletionServiceState(); - assertTrue(state.getTasks().isEmpty()); + List deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertTrue(deleteTaskProtos.isEmpty()); // store a deletion task and verify recovered DeletionServiceDeleteTaskProto proto = @@ -788,8 +856,9 @@ public void testDeletionTaskStorage() throws IOException { stateStore.storeDeletionTask(proto.getId(), proto); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertEquals(1, state.getTasks().size()); - assertEquals(proto, state.getTasks().get(0)); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(1, deleteTaskProtos.size()); + assertEquals(proto, deleteTaskProtos.get(0)); // store another deletion task DeletionServiceDeleteTaskProto proto2 = @@ -802,31 +871,36 @@ public void testDeletionTaskStorage() throws IOException { stateStore.storeDeletionTask(proto2.getId(), proto2); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertEquals(2, state.getTasks().size()); - assertTrue(state.getTasks().contains(proto)); - assertTrue(state.getTasks().contains(proto2)); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(2, deleteTaskProtos.size()); + assertTrue(deleteTaskProtos.contains(proto)); + assertTrue(deleteTaskProtos.contains(proto2)); + // delete a task and verify gone after recovery stateStore.removeDeletionTask(proto2.getId()); restartStateStore(); - state = stateStore.loadDeletionServiceState(); - assertEquals(1, state.getTasks().size()); - assertEquals(proto, state.getTasks().get(0)); + state = stateStore.loadDeletionServiceState(); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(1, deleteTaskProtos.size()); + assertEquals(proto, deleteTaskProtos.get(0)); // delete the last task and verify none left stateStore.removeDeletionTask(proto.getId()); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertTrue(state.getTasks().isEmpty()); - } + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertTrue(deleteTaskProtos.isEmpty()); } @Test public void testNMTokenStorage() throws IOException { // test empty when no state RecoveredNMTokensState state = stateStore.loadNMTokensState(); + Map loadedAppKeys = + loadNMTokens(state.getIterator()); assertNull(state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getApplicationMasterKeys().isEmpty()); + assertTrue(loadedAppKeys.isEmpty()); // store a master key and verify recovered NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest(); @@ -834,18 +908,20 @@ public void testNMTokenStorage() throws IOException { stateStore.storeNMTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getApplicationMasterKeys().isEmpty()); + assertTrue(loadedAppKeys.isEmpty()); // store a previous key and verify recovered MasterKey prevKey = secretMgr.generateKey(); stateStore.storeNMTokenPreviousMasterKey(prevKey); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - assertTrue(state.getApplicationMasterKeys().isEmpty()); + assertTrue(loadedAppKeys.isEmpty()); // store a few application keys and verify recovered ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance( @@ -858,10 +934,9 @@ public void testNMTokenStorage() throws IOException { stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - Map loadedAppKeys = - state.getApplicationMasterKeys(); assertEquals(2, loadedAppKeys.size()); assertEquals(attemptKey1, loadedAppKeys.get(attempt1)); assertEquals(attemptKey2, loadedAppKeys.get(attempt2)); @@ -880,9 +955,9 @@ public void testNMTokenStorage() throws IOException { stateStore.storeNMTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadNMTokensState(); + loadedAppKeys = loadNMTokens(state.getIterator()); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - loadedAppKeys = state.getApplicationMasterKeys(); assertEquals(2, loadedAppKeys.size()); assertNull(loadedAppKeys.get(attempt1)); assertEquals(attemptKey2, loadedAppKeys.get(attempt2)); @@ -894,9 +969,11 @@ public void testContainerTokenStorage() throws IOException { // test empty when no state RecoveredContainerTokensState state = stateStore.loadContainerTokensState(); + Map loadedActiveTokens = + loadContainerTokens(state.it); assertNull(state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getActiveTokens().isEmpty()); + assertTrue(loadedActiveTokens.isEmpty()); // store a master key and verify recovered ContainerTokenKeyGeneratorForTest keygen = @@ -905,18 +982,20 @@ public void testContainerTokenStorage() throws IOException { stateStore.storeContainerTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertNull(state.getPreviousMasterKey()); - assertTrue(state.getActiveTokens().isEmpty()); + assertTrue(loadedActiveTokens.isEmpty()); // store a previous key and verify recovered MasterKey prevKey = keygen.generateKey(); stateStore.storeContainerTokenPreviousMasterKey(prevKey); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - assertTrue(state.getActiveTokens().isEmpty()); + assertTrue(loadedActiveTokens.isEmpty()); // store a few container tokens and verify recovered ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1); @@ -927,10 +1006,9 @@ public void testContainerTokenStorage() throws IOException { stateStore.storeContainerToken(cid2, expTime2); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - Map loadedActiveTokens = - state.getActiveTokens(); assertEquals(2, loadedActiveTokens.size()); assertEquals(expTime1, loadedActiveTokens.get(cid1)); assertEquals(expTime2, loadedActiveTokens.get(cid2)); @@ -948,9 +1026,9 @@ public void testContainerTokenStorage() throws IOException { stateStore.storeContainerTokenCurrentMasterKey(currentKey); restartStateStore(); state = stateStore.loadContainerTokensState(); + loadedActiveTokens = loadContainerTokens(state.it); assertEquals(currentKey, state.getCurrentMasterKey()); assertEquals(prevKey, state.getPreviousMasterKey()); - loadedActiveTokens = state.getActiveTokens(); assertEquals(2, loadedActiveTokens.size()); assertNull(loadedActiveTokens.get(cid1)); assertEquals(expTime2, loadedActiveTokens.get(cid2)); @@ -1029,8 +1107,8 @@ protected DB openDatabase(Configuration conf) { @Test public void testUnexpectedKeyDoesntThrowException() throws IOException { // test empty when no state - List recoveredContainers = stateStore - .loadContainersState(); + List recoveredContainers = + loadContainersState(stateStore.getConstainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); ApplicationId appId = ApplicationId.newInstance(1234, 3); @@ -1045,7 +1123,7 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException { + containerId.toString() + "/invalidKey1234").getBytes(); stateStore.getDB().put(invalidKey, new byte[1]); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers =loadContainersState(stateStore.getConstainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); @@ -1162,8 +1240,7 @@ public void testAMRMProxyStorage() throws IOException { @Test public void testStateStoreForResourceMapping() throws IOException { // test empty when no state - List recoveredContainers = stateStore - .loadContainersState(); + List recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); assertTrue(recoveredContainers.isEmpty()); ApplicationId appId = ApplicationId.newInstance(1234, 3); @@ -1190,7 +1267,7 @@ public void testStateStoreForResourceMapping() throws IOException { // add a invalid key restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getConstainerStateIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); List res = rcs.getResourceMappings() @@ -1253,7 +1330,7 @@ public void testEmptyRestartTimes() throws IOException { stateStore.storeContainerRestartTimes(containerId, restartTimes); restartStateStore(); - RecoveredContainerState rcs = stateStore.loadContainersState().get(0); + RecoveredContainerState rcs = loadContainersState(stateStore.getConstainerStateIterator()).get(0); List recoveredRestartTimes = rcs.getRestartTimes(); assertTrue(recoveredRestartTimes.isEmpty()); }