diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index ae81dc11187..b0a38cae938 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager; import static java.util.concurrent.TimeUnit.SECONDS; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,16 +98,22 @@ public void delete(DeletionTask deletionTask) { private void recover(NMStateStoreService.RecoveredDeletionServiceState state) throws IOException { - List taskProtos = state.getTasks(); Map idToInfoMap = - new HashMap<>(taskProtos.size()); - Set successorTasks = new HashSet<>(); - for (DeletionServiceDeleteTaskProto proto : taskProtos) { - DeletionTaskRecoveryInfo info = - NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this); - idToInfoMap.put(info.getTask().getTaskId(), info); - nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId())); - successorTasks.addAll(info.getSuccessorTaskIds()); + new HashMap(); + Set successorTasks = new HashSet(); + + RecoveryIterator it = state.getIterator(); + try { + while(it.hasNext()){ + DeletionServiceDeleteTaskProto proto = it.next(); + DeletionTaskRecoveryInfo info = + NMProtoUtils.convertProtoToDeletionTaskRecoveryInfo(proto, this); + idToInfoMap.put(info.getTask().getTaskId(), info); + nextTaskId.set(Math.max(nextTaskId.get(), info.getTask().getTaskId())); + successorTasks.addAll(info.getSuccessorTaskIds()); + } + } finally { + it.close(); } // restore the task dependencies and schedule the deletion tasks that diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index ce240bc12a2..5ab1de3288c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -22,6 +22,7 @@ import com.google.protobuf.ByteString; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -355,19 +356,29 @@ private void recover() throws IOException, URISyntaxException { stateStore.loadLocalizationState()); RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); - for (ContainerManagerApplicationProto proto : - appsState.getApplications()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering application with state: " + proto.toString()); + RecoveryIterator appsStateIterator = appsState.getIterator(); + try{ + while (appsStateIterator.hasNext()) { + ContainerManagerApplicationProto applicationProto = appsStateIterator.next(); + recoverApplication(applicationProto); } - recoverApplication(proto); + } finally { + appsStateIterator.close(); } - for (RecoveredContainerState rcs : stateStore.loadContainersState()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering container with state: " + rcs); + RecoveryIterator rcsIterator = stateStore.getRCSIterator(); + try{ + while(rcsIterator.hasNext()) { + RecoveredContainerState rcs = rcsIterator.next(); + if (rcs.getStartRequest() != null) { + recoverContainer(rcs); + } + else { + stateStore.removeContainer(rcs.getContainerId()); + } } - recoverContainer(rcs); + } finally { + rcsIterator.close(); } // Recovery AMRMProxy state after apps and containers are recovered diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 4ca6720ff26..6e0a0b43599 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -19,6 +19,8 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; + +import org.apache.hadoop.yarn.server.nodemanager.recovery.RecoveryIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -295,43 +297,47 @@ public void onDirsChanged() { //Recover localized resources after an NM restart public void recoverLocalizedResources(RecoveredLocalizationState state) - throws URISyntaxException { + throws URISyntaxException, IOException { LocalResourceTrackerState trackerState = state.getPublicTrackerState(); recoverTrackerResources(publicRsrc, trackerState); - - for (Map.Entry userEntry : - state.getUserResources().entrySet()) { - String user = userEntry.getKey(); - RecoveredUserResources userResources = userEntry.getValue(); - trackerState = userResources.getPrivateTrackerState(); - if (!trackerState.isEmpty()) { - LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - null, dispatcher, true, super.getConfig(), stateStore, dirsHandler); - LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, - tracker); - if (oldTracker != null) { - tracker = oldTracker; - } - recoverTrackerResources(tracker, trackerState); - } - - for (Map.Entry appEntry : - userResources.getAppTrackerStates().entrySet()) { - trackerState = appEntry.getValue(); + RecoveryIterator> it = state.getIterator(); + try{ + while(it.hasNext()){ + Map.Entry userEntry = it.next(); + String user = userEntry.getKey(); + RecoveredUserResources userResources = userEntry.getValue(); + trackerState = userResources.getPrivateTrackerState(); if (!trackerState.isEmpty()) { - ApplicationId appId = appEntry.getKey(); - String appIdStr = appId.toString(); LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, - appId, dispatcher, false, super.getConfig(), stateStore, - dirsHandler); - LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, + null, dispatcher, true, super.getConfig(), stateStore, dirsHandler); + LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user, tracker); if (oldTracker != null) { tracker = oldTracker; } recoverTrackerResources(tracker, trackerState); } + + for (Map.Entry appEntry : + userResources.getAppTrackerStates().entrySet()) { + trackerState = appEntry.getValue(); + if (!trackerState.isEmpty()) { + ApplicationId appId = appEntry.getKey(); + String appIdStr = appId.toString(); + LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user, + appId, dispatcher, false, super.getConfig(), stateStore, + dirsHandler); + LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr, + tracker); + if (oldTracker != null) { + tracker = oldTracker; + } + recoverTrackerResources(tracker, trackerState); + } + } } + } finally { + it.close(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 44f5e18ab76..9651b7eac7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -66,6 +66,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -225,23 +226,62 @@ boolean isHealthy() { return isHealthy; } - @Override - public List loadContainersState() - throws IOException { - ArrayList containers = - new ArrayList(); - ArrayList containersToRemove = - new ArrayList(); - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(CONTAINERS_KEY_PREFIX)); + // LeveldbIterator starting at startkey + private LeveldbIterator getLevelDBIterator(String startKey){ + LeveldbIterator it = new LeveldbIterator(db); + it.seek(bytes(startKey)); + return it; + } - while (iter.hasNext()) { - Entry entry = iter.peekNext(); + // Container Recovery Iterator + private class RCSIterator implements RecoveryIterator { + LeveldbIterator it; + RecoveredContainerState rcs; + + RCSIterator(LeveldbIterator it) { + this.it = it; + this.rcs = null; + } + + @Override + public boolean hasNext() throws IOException { + if (rcs != null){ + return true; + } else { + rcs = getNextRecoveredContainer(it); + return (rcs != null); + } + } + + @Override + public RecoveredContainerState next() throws IOException { + RecoveredContainerState tmp = rcs; + if (rcs != null) { + rcs = null; + return tmp; + } else{ + tmp = getNextRecoveredContainer(it); + if (tmp == null) throw new IOException(); + return tmp; + } + } + + @Override + public void close() throws IOException { + if (it != null) { + it.close(); + } + } + } + + private RecoveredContainerState getNextRecoveredContainer(LeveldbIterator it) throws IOException { + RecoveredContainerState rcs = null; + try { + if (it.hasNext()) { + Entry entry = it.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(CONTAINERS_KEY_PREFIX)) { - break; + return null; } int idEndPos = key.indexOf('/', CONTAINERS_KEY_PREFIX.length()); @@ -250,38 +290,20 @@ boolean isHealthy() { } ContainerId containerId = ContainerId.fromString( key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos)); - String keyPrefix = key.substring(0, idEndPos+1); - RecoveredContainerState rcs = loadContainerState(containerId, - iter, keyPrefix); - // Don't load container without StartContainerRequest - if (rcs.startRequest != null) { - containers.add(rcs); - } else { - containersToRemove.add(containerId); - } + String keyPrefix = key.substring(0, idEndPos + 1); + rcs = loadContainerState(containerId, it, keyPrefix); + rcs.setContainerId(containerId); } - } catch (DBException e) { + } catch (DBException e){ throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return rcs; + } - // remove container without StartContainerRequest - for (ContainerId containerId : containersToRemove) { - LOG.warn("Remove container " + containerId + - " with incomplete records"); - try { - removeContainer(containerId); - // TODO: kill and cleanup the leaked container - } catch (IOException e) { - LOG.error("Unable to remove container " + containerId + - " in store", e); - } - } - return containers; + @Override + public RecoveryIterator getRCSIterator() { + return new RCSIterator(getLevelDBIterator(CONTAINERS_KEY_PREFIX)); } private RecoveredContainerState loadContainerState(ContainerId containerId, @@ -674,35 +696,73 @@ public void removeContainer(ContainerId containerId) } - @Override - public RecoveredApplicationsState loadApplicationsState() + // Application Recovery Iterator + private class RASIterator implements RecoveryIterator { + LeveldbIterator it; + ContainerManagerApplicationProto applicationProto; + + RASIterator(LeveldbIterator it){ + this.it = it; + this.applicationProto = null; + } + + @Override + public boolean hasNext() throws IOException { + if (applicationProto != null){ + return true; + } + else { + applicationProto = getNextRecoveredApplication(it); + return (applicationProto != null); + } + } + + @Override + public ContainerManagerApplicationProto next() throws IOException { + ContainerManagerApplicationProto tmp = applicationProto; + if (tmp != null) { + applicationProto = null; + return tmp; + } + else{ + tmp = getNextRecoveredApplication(it); + if (tmp == null) throw new IOException(); + return tmp; + } + } + + @Override + public void close() throws IOException { + if (it != null){ + it.close(); + } + } + } + + private ContainerManagerApplicationProto getNextRecoveredApplication(LeveldbIterator it) throws IOException { - RecoveredApplicationsState state = new RecoveredApplicationsState(); - state.applications = new ArrayList(); - String keyPrefix = APPLICATIONS_KEY_PREFIX; - LeveldbIterator iter = null; + ContainerManagerApplicationProto applicationProto = null; try { - iter = new LeveldbIterator(db); - iter.seek(bytes(keyPrefix)); - while (iter.hasNext()) { - Entry entry = iter.next(); + if (it.hasNext()) { + Entry entry = it.next(); String key = asString(entry.getKey()); - if (!key.startsWith(keyPrefix)) { - break; + if (!key.startsWith(APPLICATIONS_KEY_PREFIX)) { + return null; } - state.applications.add( - ContainerManagerApplicationProto.parseFrom(entry.getValue())); + applicationProto = ContainerManagerApplicationProto.parseFrom(entry.getValue()); } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return applicationProto; + } + @Override + public RecoveredApplicationsState loadApplicationsState() + throws IOException { + RecoveredApplicationsState state = new RecoveredApplicationsState(); + state.it = new RASIterator(getLevelDBIterator(APPLICATIONS_KEY_PREFIX)); cleanupDeprecatedFinishedApps(); - return state; } @@ -746,24 +806,57 @@ public void removeApplication(ApplicationId appId) } - @Override - public RecoveredLocalizationState loadLocalizationState() - throws IOException { - RecoveredLocalizationState state = new RecoveredLocalizationState(); + // User Resource Recovery Iterator. + private class RURIterator implements RecoveryIterator + > { + LeveldbIterator it; + Entry userEntry; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(LOCALIZATION_PUBLIC_KEY_PREFIX)); - state.publicTrackerState = loadResourceTrackerState(iter, - LOCALIZATION_PUBLIC_KEY_PREFIX); + RURIterator(LeveldbIterator it){ + this.it = it; + userEntry = null; + } - iter.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX)); - while (iter.hasNext()) { - Entry entry = iter.peekNext(); + @Override + public boolean hasNext() throws IOException { + if (userEntry != null){ + return true; + } else { + userEntry = getNextRecoveredLocalizationEntry(it); + return (userEntry != null); + } + } + + @Override + public Entry next() throws IOException { + Entry tmp = userEntry; + if (tmp != null){ + userEntry = null; + return tmp; + } else { + tmp = getNextRecoveredLocalizationEntry(it); + if (tmp == null) throw new IOException(); + return tmp; + } + } + + @Override + public void close() throws IOException { + if (it != null){ + it.close(); + } + } + } + + private Entry getNextRecoveredLocalizationEntry + (LeveldbIterator it) throws IOException{ + Entry localEntry = null; + try{ + if (it.hasNext()){ + Entryentry = it.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(LOCALIZATION_PRIVATE_KEY_PREFIX)) { - break; + return null; } int userEndPos = key.indexOf('/', @@ -774,17 +867,24 @@ public RecoveredLocalizationState loadLocalizationState() } String user = key.substring( LOCALIZATION_PRIVATE_KEY_PREFIX.length(), userEndPos); - state.userResources.put(user, loadUserLocalizedResources(iter, - key.substring(0, userEndPos+1))); - } - } catch (DBException e) { - throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); + RecoveredUserResources val = loadUserLocalizedResources(it, + key.substring(0, userEndPos+1)); + localEntry = new AbstractMap.SimpleEntry<>(user, val); } + } catch (DBException e){ + throw new IOException(); } + return localEntry; + } + @Override + public RecoveredLocalizationState loadLocalizationState() + throws IOException { + RecoveredLocalizationState state = new RecoveredLocalizationState(); + LeveldbIterator it = getLevelDBIterator(LOCALIZATION_PUBLIC_KEY_PREFIX); + state.publicTrackerState = loadResourceTrackerState(it, LOCALIZATION_PUBLIC_KEY_PREFIX); + it.seek(bytes(LOCALIZATION_PRIVATE_KEY_PREFIX)); + state.it = new RURIterator(it); return state; } @@ -976,31 +1076,70 @@ private String getResourceTrackerKeyPrefix(String user, } - @Override - public RecoveredDeletionServiceState loadDeletionServiceState() + // Deletion State Recovery Iterator. + private class RDSIterator implements RecoveryIterator{ + LeveldbIterator it; + DeletionServiceDeleteTaskProto deletionService; + + RDSIterator(LeveldbIterator it){ + this.it = it; + this.deletionService = null; + } + + @Override + public boolean hasNext() throws IOException{ + if (deletionService != null){ + return true; + } else{ + deletionService = getNextRecoveredDeletionService(it); + return (deletionService != null); + } + } + + @Override + public DeletionServiceDeleteTaskProto next() throws IOException { + DeletionServiceDeleteTaskProto tmp = deletionService; + if (tmp != null){ + deletionService = null; + return tmp; + } else { + tmp = getNextRecoveredDeletionService(it); + if (tmp == null) throw new IOException(); + return tmp; + } + } + + @Override + public void close() throws IOException { + if (it != null){ + it.close(); + } + } + } + + private DeletionServiceDeleteTaskProto getNextRecoveredDeletionService(LeveldbIterator it) throws IOException { - RecoveredDeletionServiceState state = new RecoveredDeletionServiceState(); - state.tasks = new ArrayList(); - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); - iter.seek(bytes(DELETION_TASK_KEY_PREFIX)); - while (iter.hasNext()) { - Entry entry = iter.next(); + DeletionServiceDeleteTaskProto deleteProto = null; + try{ + if (it.hasNext()){ + Entry entry = it.next(); String key = asString(entry.getKey()); if (!key.startsWith(DELETION_TASK_KEY_PREFIX)) { - break; + return null; } - state.tasks.add( - DeletionServiceDeleteTaskProto.parseFrom(entry.getValue())); + deleteProto = DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()); } - } catch (DBException e) { + } catch (DBException e){ throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } + return deleteProto; + } + + @Override + public RecoveredDeletionServiceState loadDeletionServiceState() + throws IOException { + RecoveredDeletionServiceState state = new RecoveredDeletionServiceState(); + state.it = new RDSIterator(getLevelDBIterator(DELETION_TASK_KEY_PREFIX)); return state; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index dfad9cfee33..6176d64ce3f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -65,8 +65,7 @@ public void removeApplication(ApplicationId appId) throws IOException { } @Override - public List loadContainersState() - throws IOException { + public RecoveryIterator getRCSIterator() { throw new UnsupportedOperationException( "Recovery not supported by this state store"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 70decdba743..29b532c7387 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -67,12 +67,11 @@ public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { } public static class RecoveredApplicationsState { - List applications; + RecoveryIterator it = null; - public List getApplications() { - return applications; + public RecoveryIterator getIterator() { + return it; } - } /** @@ -107,6 +106,16 @@ public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { private long startTime; private ResourceMappings resMappings = new ResourceMappings(); + private ContainerId containerId; + + public ContainerId getContainerId() { + return containerId; + } + + public void setContainerId(ContainerId containerId) { + this.containerId = containerId; + } + public RecoveredContainerStatus getStatus() { return status; } @@ -248,23 +257,23 @@ public LocalResourceTrackerState getPrivateTrackerState() { public static class RecoveredLocalizationState { LocalResourceTrackerState publicTrackerState = new LocalResourceTrackerState(); - Map userResources = - new HashMap(); + RecoveryIterator> it = null; public LocalResourceTrackerState getPublicTrackerState() { return publicTrackerState; } - public Map getUserResources() { - return userResources; + public RecoveryIterator> + getIterator() { + return it; } } public static class RecoveredDeletionServiceState { - List tasks; + RecoveryIterator it = null; - public List getTasks() { - return tasks; + public RecoveryIterator getIterator(){ + return it; } } @@ -400,12 +409,10 @@ public abstract void removeApplication(ApplicationId appId) /** - * Load the state of containers - * @return recovered state for containers - * @throws IOException + * get the Recovered Container State Iterator + * @return recovery iterator */ - public abstract List loadContainersState() - throws IOException; + public abstract RecoveryIterator getRCSIterator(); /** * Record a container start request diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java new file mode 100644 index 00000000000..8be52903ebd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/RecoveryIterator.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.recovery; + +import java.io.IOException; + +/** + * A wrapper for a Iterator to translate the raw RuntimeExceptions that + * can be thrown into IOException. + */ +public interface RecoveryIterator { + + /** + * Returns true if the iteration has more elements. + */ + boolean hasNext() throws IOException; + + /** + * Returns the next element in the iteration. + */ + T next() throws IOException; + + /** + * Closes the iterator. + */ + void close() throws IOException; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index c5428d184b7..4d4de3570ed 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -86,13 +87,35 @@ protected void startStorage() { protected void closeStorage() { } + // Recovery Iterator Implementation. + private class NMMemoryRecoveryIterator implements RecoveryIterator{ + + private Iterator it; + + NMMemoryRecoveryIterator(Iterator it){ + this.it = it; + } + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public T next() { + return it.next(); + } + + @Override + public void close() throws IOException { } + } @Override public synchronized RecoveredApplicationsState loadApplicationsState() throws IOException { RecoveredApplicationsState state = new RecoveredApplicationsState(); - state.applications = new ArrayList( - apps.values()); + state.it = new NMMemoryRecoveryIterator + (apps.values().iterator()); return state; } @@ -111,8 +134,7 @@ public synchronized void removeApplication(ApplicationId appId) } @Override - public synchronized List loadContainersState() - throws IOException { + public RecoveryIterator getRCSIterator() { // return a copy so caller can't modify our state List result = new ArrayList(containerStates.size()); @@ -131,7 +153,9 @@ public synchronized void removeApplication(ApplicationId appId) rcsCopy.setResourceMappings(rcs.getResourceMappings()); result.add(rcsCopy); } - return result; + NMMemoryRecoveryIterator it = + new NMMemoryRecoveryIterator(result.iterator()); + return it; } @Override @@ -284,6 +308,7 @@ private TrackerState getTrackerState(TrackerKey key) { @Override public synchronized RecoveredLocalizationState loadLocalizationState() { RecoveredLocalizationState result = new RecoveredLocalizationState(); + Map userResources = new HashMap<>(); for (Map.Entry e : trackerStates.entrySet()) { TrackerKey tk = e.getKey(); TrackerState ts = e.getValue(); @@ -294,10 +319,10 @@ public synchronized RecoveredLocalizationState loadLocalizationState() { if (tk.user == null) { result.publicTrackerState = loadTrackerState(ts); } else { - RecoveredUserResources rur = result.userResources.get(tk.user); + RecoveredUserResources rur = userResources.get(tk.user); if (rur == null) { rur = new RecoveredUserResources(); - result.userResources.put(tk.user, rur); + userResources.put(tk.user, rur); } if (tk.appId == null) { rur.privateTrackerState = loadTrackerState(ts); @@ -306,6 +331,8 @@ public synchronized RecoveredLocalizationState loadLocalizationState() { } } } + result.it = new NMMemoryRecoveryIterator> + (userResources.entrySet().iterator()); return result; } @@ -341,8 +368,8 @@ public synchronized RecoveredDeletionServiceState loadDeletionServiceState() throws IOException { RecoveredDeletionServiceState result = new RecoveredDeletionServiceState(); - result.tasks = new ArrayList( - deleteTasks.values()); + result.it = new NMMemoryRecoveryIterator + (deleteTasks.values().iterator()); return result; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index c8c07d1fdb4..1560abc6cdd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -123,6 +123,49 @@ public void cleanup() throws IOException { FileUtil.fullyDelete(TMP_DIR); } + private List loadContainersState + (RecoveryIterator it) throws IOException { + List containers = new ArrayList(); + while(it.hasNext()){ + RecoveredContainerState rcs = it.next(); + if (rcs.startRequest != null) { + containers.add(rcs); + } + } + return containers; + } + + private List loadApplicationProtos + (RecoveryIterator it) throws IOException { + List applicationProtos = + new ArrayList(); + while(it.hasNext()){ + applicationProtos.add(it.next()); + } + return applicationProtos; + } + + private List loadDeletionTaskProtos + (RecoveryIterator it) throws IOException { + List deleteTaskProtos = + new ArrayList(); + while (it.hasNext()){ + deleteTaskProtos.add(it.next()); + } + return deleteTaskProtos; + } + + private Map loadUserResources + (RecoveryIterator> it) throws IOException { + Map userResources = + new HashMap(); + while (it.hasNext()){ + Map.Entry entry = it.next(); + userResources.put(entry.getKey(), entry.getValue()); + } + return userResources; + } + private void restartStateStore() throws IOException { // need to close so leveldb releases database lock if (stateStore != null) { @@ -140,7 +183,7 @@ private void verifyEmptyState() throws IOException { assertNotNull(pubts); assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); - assertTrue(state.getUserResources().isEmpty()); + assertTrue(loadUserResources(state.getIterator()).isEmpty()); } @Test @@ -190,7 +233,8 @@ public void testCheckVersion() throws IOException { public void testApplicationStorage() throws IOException { // test empty when no state RecoveredApplicationsState state = stateStore.loadApplicationsState(); - assertTrue(state.getApplications().isEmpty()); + List apps = loadApplicationProtos(state.getIterator()); + assertTrue(apps.isEmpty()); // store an application and verify recovered final ApplicationId appId1 = ApplicationId.newInstance(1234, 1); @@ -202,8 +246,9 @@ public void testApplicationStorage() throws IOException { stateStore.storeApplication(appId1, appProto1); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(1, state.getApplications().size()); - assertEquals(appProto1, state.getApplications().get(0)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(1, apps.size()); + assertEquals(appProto1, apps.get(0)); // add a new app final ApplicationId appId2 = ApplicationId.newInstance(1234, 2); @@ -214,23 +259,25 @@ public void testApplicationStorage() throws IOException { stateStore.storeApplication(appId2, appProto2); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(2, state.getApplications().size()); - assertTrue(state.getApplications().contains(appProto1)); - assertTrue(state.getApplications().contains(appProto2)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(2, apps.size()); + assertTrue(apps.contains(appProto1)); + assertTrue(apps.contains(appProto2)); // test removing an application stateStore.removeApplication(appId2); restartStateStore(); state = stateStore.loadApplicationsState(); - assertEquals(1, state.getApplications().size()); - assertEquals(appProto1, state.getApplications().get(0)); + apps = loadApplicationProtos(state.getIterator()); + assertEquals(1, apps.size()); + assertEquals(appProto1, apps.get(0)); } @Test public void testContainerStorage() throws IOException { // test empty when no state List recoveredContainers = - stateStore.loadContainersState(); + loadContainersState(stateStore.getRCSIterator()); assertTrue(recoveredContainers.isEmpty()); // create a container request @@ -252,7 +299,7 @@ public void testContainerStorage() throws IOException { stateStore.getContainerVersionKey(containerId.toString())))); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getRCSIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); @@ -267,14 +314,14 @@ public void testContainerStorage() throws IOException { // store a new container record without StartContainerRequest ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); stateStore.storeContainerLaunched(containerId1); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getRCSIterator()); // check whether the new container record is discarded assertEquals(1, recoveredContainers.size()); // queue the container, and verify recovered stateStore.storeContainerQueued(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getRCSIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); @@ -290,7 +337,7 @@ public void testContainerStorage() throws IOException { diags.append("some diags for container"); stateStore.storeContainerDiagnostics(containerId, diags); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getRCSIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); @@ -303,7 +350,7 @@ public void testContainerStorage() throws IOException { // pause the container, and verify recovered stateStore.storeContainerPaused(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getRCSIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus()); @@ -314,7 +361,7 @@ public void testContainerStorage() throws IOException { // Resume the container stateStore.removeContainerPaused(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getRCSIterator()); assertEquals(1, recoveredContainers.size()); // increase the container size, and verify recovered @@ -326,7 +373,7 @@ public void testContainerStorage() throws IOException { stateStore .storeContainerUpdateToken(containerId, updateTokenIdentifier); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getRCSIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(0, rcs.getVersion()); @@ -340,7 +387,7 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerKilled(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getRCSIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); @@ -356,7 +403,7 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerDiagnostics(containerId, diags); stateStore.storeContainerCompleted(containerId, 21); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getRCSIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); @@ -369,7 +416,7 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerWorkDir(containerId, "/test/workdir"); stateStore.storeContainerLogDir(containerId, "/test/logdir"); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getRCSIterator()); assertEquals(1, recoveredContainers.size()); rcs = recoveredContainers.get(0); assertEquals(6, rcs.getRemainingRetryAttempts()); @@ -380,7 +427,7 @@ public void testContainerStorage() throws IOException { // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getRCSIterator()); assertTrue(recoveredContainers.isEmpty()); } @@ -392,7 +439,7 @@ private void validateRetryAttempts(ContainerId containerId) stateStore.storeContainerRestartTimes(containerId, finishTimeForRetryAttempts); restartStateStore(); - RecoveredContainerState rcs = stateStore.loadContainersState().get(0); + RecoveredContainerState rcs = loadContainersState(stateStore.getRCSIterator()).get(0); List recoveredRestartTimes = rcs.getRestartTimes(); assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0)); assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1)); @@ -474,7 +521,7 @@ public void testStartResourceLocalization() throws IOException { assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); Map userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); @@ -528,7 +575,7 @@ public void testStartResourceLocalization() throws IOException { pubts.getInProgressResources().get(pubRsrcProto1)); assertEquals(pubRsrcLocalPath2, pubts.getInProgressResources().get(pubRsrcProto2)); - userResources = state.getUserResources(); + userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); @@ -577,7 +624,7 @@ public void testFinishResourceLocalization() throws IOException { assertTrue(pubts.getLocalizedResources().isEmpty()); assertTrue(pubts.getInProgressResources().isEmpty()); Map userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); RecoveredUserResources rur = userResources.get(user); LocalResourceTrackerState privts = rur.getPrivateTrackerState(); @@ -647,7 +694,7 @@ public void testFinishResourceLocalization() throws IOException { assertEquals(1, pubts.getInProgressResources().size()); assertEquals(pubRsrcLocalPath2, pubts.getInProgressResources().get(pubRsrcProto2)); - userResources = state.getUserResources(); + userResources = loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); rur = userResources.get(user); privts = rur.getPrivateTrackerState(); @@ -755,7 +802,7 @@ public void testRemoveLocalizedResource() throws IOException { assertEquals(pubLocalizedProto1, pubts.getLocalizedResources().iterator().next()); Map userResources = - state.getUserResources(); + loadUserResources(state.getIterator()); assertTrue(userResources.isEmpty()); } @@ -764,7 +811,8 @@ public void testDeletionTaskStorage() throws IOException { // test empty when no state RecoveredDeletionServiceState state = stateStore.loadDeletionServiceState(); - assertTrue(state.getTasks().isEmpty()); + List deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertTrue(deleteTaskProtos.isEmpty()); // store a deletion task and verify recovered DeletionServiceDeleteTaskProto proto = @@ -781,8 +829,9 @@ public void testDeletionTaskStorage() throws IOException { stateStore.storeDeletionTask(proto.getId(), proto); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertEquals(1, state.getTasks().size()); - assertEquals(proto, state.getTasks().get(0)); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(1, deleteTaskProtos.size()); + assertEquals(proto, deleteTaskProtos.get(0)); // store another deletion task DeletionServiceDeleteTaskProto proto2 = @@ -795,23 +844,26 @@ public void testDeletionTaskStorage() throws IOException { stateStore.storeDeletionTask(proto2.getId(), proto2); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertEquals(2, state.getTasks().size()); - assertTrue(state.getTasks().contains(proto)); - assertTrue(state.getTasks().contains(proto2)); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(2, deleteTaskProtos.size()); + assertTrue(deleteTaskProtos.contains(proto)); + assertTrue(deleteTaskProtos.contains(proto2)); + // delete a task and verify gone after recovery stateStore.removeDeletionTask(proto2.getId()); restartStateStore(); - state = stateStore.loadDeletionServiceState(); - assertEquals(1, state.getTasks().size()); - assertEquals(proto, state.getTasks().get(0)); + state = stateStore.loadDeletionServiceState(); + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertEquals(1, deleteTaskProtos.size()); + assertEquals(proto, deleteTaskProtos.get(0)); // delete the last task and verify none left stateStore.removeDeletionTask(proto.getId()); restartStateStore(); state = stateStore.loadDeletionServiceState(); - assertTrue(state.getTasks().isEmpty()); - } + deleteTaskProtos = loadDeletionTaskProtos(state.getIterator()); + assertTrue(deleteTaskProtos.isEmpty()); } @Test public void testNMTokenStorage() throws IOException { @@ -1022,8 +1074,8 @@ protected DB openDatabase(Configuration conf) { @Test public void testUnexpectedKeyDoesntThrowException() throws IOException { // test empty when no state - List recoveredContainers = stateStore - .loadContainersState(); + List recoveredContainers = + loadContainersState(stateStore.getRCSIterator()); assertTrue(recoveredContainers.isEmpty()); ApplicationId appId = ApplicationId.newInstance(1234, 3); @@ -1038,7 +1090,7 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException { + containerId.toString() + "/invalidKey1234").getBytes(); stateStore.getDB().put(invalidKey, new byte[1]); restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers =loadContainersState(stateStore.getRCSIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); @@ -1155,8 +1207,7 @@ public void testAMRMProxyStorage() throws IOException { @Test public void testStateStoreForResourceMapping() throws IOException { // test empty when no state - List recoveredContainers = stateStore - .loadContainersState(); + List recoveredContainers = loadContainersState(stateStore.getRCSIterator()); assertTrue(recoveredContainers.isEmpty()); ApplicationId appId = ApplicationId.newInstance(1234, 3); @@ -1183,7 +1234,7 @@ public void testStateStoreForResourceMapping() throws IOException { // add a invalid key restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); + recoveredContainers = loadContainersState(stateStore.getRCSIterator()); assertEquals(1, recoveredContainers.size()); RecoveredContainerState rcs = recoveredContainers.get(0); List res = rcs.getResourceMappings() @@ -1246,7 +1297,7 @@ public void testEmptyRestartTimes() throws IOException { stateStore.storeContainerRestartTimes(containerId, restartTimes); restartStateStore(); - RecoveredContainerState rcs = stateStore.loadContainersState().get(0); + RecoveredContainerState rcs = loadContainersState(stateStore.getRCSIterator()).get(0); List recoveredRestartTimes = rcs.getRestartTimes(); assertTrue(recoveredRestartTimes.isEmpty()); }