diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 3455874..a2536be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -74,6 +74,7 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.nio.ByteBuffer; import static org.fusesource.leveldbjni.JniDBFactory.asString; import static org.fusesource.leveldbjni.JniDBFactory.bytes; @@ -251,6 +252,7 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, LeveldbIterator iter, String keyPrefix) throws IOException { RecoveredContainerState rcs = new RecoveredContainerState(); rcs.status = RecoveredContainerStatus.REQUESTED; + long latestTime=0; while (iter.hasNext()) { Entry entry = iter.peekNext(); String key = asString(entry.getKey()); @@ -258,8 +260,15 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, break; } iter.next(); + long tempLatestTime=0; String suffix = key.substring(keyPrefix.length()-1); // start with '/' + if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX) || suffix + .equals(CONTAINER_LAUNCHED_KEY_SUFFIX) || suffix + .equals(CONTAINER_PAUSED_KEY_SUFFIX)) { + tempLatestTime = bytesToLong(entry.getValue()); + } + if (suffix.equals(CONTAINER_REQUEST_KEY_SUFFIX)) { rcs.startRequest = new StartContainerRequestPBImpl( StartContainerRequestProto.parseFrom(entry.getValue())); @@ -270,19 +279,18 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) { rcs.diagnostics = asString(entry.getValue()); } else if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX)) { - if (rcs.status == RecoveredContainerStatus.REQUESTED) { + if (tempLatestTime > latestTime) { + latestTime = tempLatestTime; rcs.status = RecoveredContainerStatus.QUEUED; } } else if (suffix.equals(CONTAINER_PAUSED_KEY_SUFFIX)) { - if ((rcs.status == RecoveredContainerStatus.LAUNCHED) - ||(rcs.status == RecoveredContainerStatus.QUEUED) - ||(rcs.status == RecoveredContainerStatus.REQUESTED)) { + if (tempLatestTime > latestTime) { + latestTime = tempLatestTime; rcs.status = RecoveredContainerStatus.PAUSED; } } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) { - if ((rcs.status == RecoveredContainerStatus.REQUESTED) - || (rcs.status == RecoveredContainerStatus.QUEUED) - ||(rcs.status == RecoveredContainerStatus.PAUSED)) { + if (tempLatestTime > latestTime) { + latestTime = tempLatestTime; rcs.status = RecoveredContainerStatus.LAUNCHED; } } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) { @@ -290,6 +298,7 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) { rcs.status = RecoveredContainerStatus.COMPLETED; rcs.exitCode = Integer.parseInt(asString(entry.getValue())); + latestTime = Long.MAX_VALUE; } else if (suffix.equals(CONTAINER_UPDATE_TOKEN_SUFFIX)) { ContainerTokenIdentifierProto tokenIdentifierProto = ContainerTokenIdentifierProto.parseFrom(entry.getValue()); @@ -376,7 +385,7 @@ public void storeContainerQueued(ContainerId containerId) throws IOException { String key = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_QUEUED_KEY_SUFFIX; try { - db.put(bytes(key), EMPTY_VALUE); + db.put(bytes(key), longToBytes(System.currentTimeMillis())); } catch (DBException e) { throw new IOException(e); } @@ -406,7 +415,7 @@ public void storeContainerPaused(ContainerId containerId) throws IOException { String key = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_PAUSED_KEY_SUFFIX; try { - db.put(bytes(key), EMPTY_VALUE); + db.put(bytes(key), longToBytes(System.currentTimeMillis())); } catch (DBException e) { throw new IOException(e); } @@ -457,7 +466,7 @@ public void storeContainerLaunched(ContainerId containerId) String key = CONTAINERS_KEY_PREFIX + containerId.toString() + CONTAINER_LAUNCHED_KEY_SUFFIX; try { - db.put(bytes(key), EMPTY_VALUE); + db.put(bytes(key), longToBytes(System.currentTimeMillis())); } catch (DBException e) { throw new IOException(e); } @@ -1528,16 +1537,16 @@ Version loadVersion() throws IOException { private void storeVersion() throws IOException { dbStoreVersion(CURRENT_VERSION_INFO); } - + // Only used for test @VisibleForTesting void storeVersion(Version state) throws IOException { dbStoreVersion(state); } - + private void dbStoreVersion(Version state) throws IOException { String key = DB_SCHEMA_VERSION_KEY; - byte[] data = + byte[] data = ((VersionPBImpl) state).getProto().toByteArray(); try { db.put(bytes(key), data); @@ -1549,7 +1558,7 @@ private void dbStoreVersion(Version state) throws IOException { Version getCurrentVersion() { return CURRENT_VERSION_INFO; } - + @VisibleForTesting DB getDB() { return db; @@ -1576,8 +1585,23 @@ protected void checkVersion() throws IOException { storeVersion(); } else { throw new IOException( - "Incompatible version for NM state: expecting NM state version " + "Incompatible version for NM state: expecting NM state version " + getCurrentVersion() + ", but loading version " + loadedVersion); } } + + public byte[] longToBytes(long x) { + int byteSize = Long.SIZE / Byte.SIZE; + ByteBuffer buffer = ByteBuffer.allocate(byteSize); + buffer.putLong(x); + return buffer.array(); + } + + public long bytesToLong(byte[] bytes) { + int byteSize = Long.SIZE / Byte.SIZE; + ByteBuffer buffer = ByteBuffer.allocate(byteSize); + buffer.put(bytes); + buffer.flip(); + return buffer.getLong(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 3cac5b4..b80d834 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -1165,6 +1165,26 @@ public void testStateStoreForResourceMapping() throws IOException { resourceMappings.getAssignedResources("numa").equals(numaRes)); } + @Test public void testContainerStateRetrieval() throws IOException { + + ApplicationId appId = ApplicationId.newInstance(1234, 3); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 4); + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); + StartContainerRequest containerReq = createContainerRequest(containerId); + + long containerStartTime = System.currentTimeMillis(); + stateStore.storeContainer(containerId, 0, containerStartTime, containerReq); + stateStore.storeContainerLaunched(containerId); + stateStore.storeContainerPaused(containerId); + stateStore.storeContainerLaunched(containerId); + restartStateStore(); + List recoveredContainers = + stateStore.loadContainersState(); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + } + private StartContainerRequest storeMockContainer(ContainerId containerId) throws IOException { // create a container request