diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 7cf4921..9d54688 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -146,6 +146,8 @@ public boolean isNewlyCreated() { throws IOException { ArrayList containers = new ArrayList(); + ArrayList containersToRemove = + new ArrayList(); LeveldbIterator iter = null; try { iter = new LeveldbIterator(db); @@ -165,7 +167,14 @@ public boolean isNewlyCreated() { ContainerId containerId = ConverterUtils.toContainerId( key.substring(CONTAINERS_KEY_PREFIX.length(), idEndPos)); String keyPrefix = key.substring(0, idEndPos+1); - containers.add(loadContainerState(containerId, iter, keyPrefix)); + RecoveredContainerState rcs = loadContainerState(containerId, + iter, keyPrefix); + // Don't load container without StartContainerRequest + if (rcs.startRequest != null) { + containers.add(rcs); + } else { + containersToRemove.add(containerId); + } } } catch (DBException e) { throw new IOException(e); @@ -175,6 +184,19 @@ public boolean isNewlyCreated() { } } + // remove container without StartContainerRequest + for (ContainerId containerId : containersToRemove) { + LOG.warn("Remove container " + containerId + + " with incomplete records"); + try { + removeContainer(containerId); + // TODO: kill and cleanup the leaked container + } catch (IOException e) { + LOG.error("Unable to remove container " + containerId + + " in store", e); + } + } + return containers; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index db377f5..641250c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -274,6 +274,13 @@ public void testContainerStorage() throws IOException { assertEquals(containerReq, rcs.getStartRequest()); assertTrue(rcs.getDiagnostics().isEmpty()); + // store a new container record without StartContainerRequest + ContainerId containerId1 = ContainerId.newInstance(appAttemptId, 6); + stateStore.storeContainerLaunched(containerId1); + recoveredContainers = stateStore.loadContainersState(); + // check whether the new container record is discarded + assertEquals(1, recoveredContainers.size()); + // launch the container, add some diagnostics, and verify recovered StringBuilder diags = new StringBuilder(); stateStore.storeContainerLaunched(containerId);