diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index bddc7c3..81a0f18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -450,6 +450,7 @@ protected void serviceInit(Configuration conf) throws Exception { // so that we make sure everything is up before registering with RM. addService(nodeStatusUpdater); ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater); + nmStore.setNodeStatusUpdater(nodeStatusUpdater); super.serviceInit(conf); // TODO add local dirs to del diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 3455874..2faeefb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -155,6 +155,8 @@ private DB db; private boolean isNewlyCreated; + @VisibleForTesting + boolean isHealthy; private Timer compactionTimer; /** @@ -169,6 +171,8 @@ public NMLeveldbStateStoreService() { @Override protected void startStorage() throws IOException { + // Assume that we're healthy when we start + isHealthy = true; } @Override @@ -187,6 +191,30 @@ public boolean isNewlyCreated() { return isNewlyCreated; } + /** + * If the state store throws an error after recovery has been performed + * then we can not trust it any more to reflect the NM state. We need to + * mark the store and node unhealthy. + * Errors during the recovery will cause a service failure and thus a NM + * start failure. Do not need to mark the store unhealthy for those. + * @param dbErr Exception + */ + private void markStoreUnHealthy(DBException dbErr) { + // Always log the error here, we might not see the error in the caller + LOG.error("Statestore exception: ", dbErr); + // We have already been marked unhealthy so no need to do it again. + if (!isHealthy) { + return; + } + // Mark unhealthy, an out of band heartbeat will be sent and the state + // will remain unhealthy (not recoverable). + // No need to close the store: does not make any difference at this point. + isHealthy = false; + // We could get here before the nodeStatusUpdater is set + if (nodeStatusUpdater != null) { + nodeStatusUpdater.reportException(dbErr); + } + } @Override public List loadContainersState() @@ -354,6 +382,7 @@ public void storeContainer(ContainerId containerId, int containerVersion, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -378,6 +407,7 @@ public void storeContainerQueued(ContainerId containerId) throws IOException { try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -393,6 +423,7 @@ private void removeContainerQueued(ContainerId containerId) try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -408,6 +439,7 @@ public void storeContainerPaused(ContainerId containerId) throws IOException { try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -424,6 +456,7 @@ public void removeContainerPaused(ContainerId containerId) try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -441,6 +474,7 @@ public void storeContainerDiagnostics(ContainerId containerId, try { db.put(bytes(key), bytes(diagnostics.toString())); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -459,6 +493,7 @@ public void storeContainerLaunched(ContainerId containerId) try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -488,6 +523,7 @@ public void storeContainerUpdateToken(ContainerId containerId, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -504,6 +540,7 @@ public void storeContainerKilled(ContainerId containerId) try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -520,6 +557,7 @@ public void storeContainerCompleted(ContainerId containerId, try { db.put(bytes(key), bytes(Integer.toString(exitCode))); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -532,6 +570,7 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId, try { db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts))); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -544,6 +583,7 @@ public void storeContainerWorkDir(ContainerId containerId, try { db.put(bytes(key), bytes(workDir)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -556,6 +596,7 @@ public void storeContainerLogDir(ContainerId containerId, try { db.put(bytes(key), bytes(logDir)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -589,6 +630,7 @@ public void removeContainer(ContainerId containerId) batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -638,6 +680,7 @@ public void storeApplication(ApplicationId appId, try { db.put(bytes(key), p.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -659,6 +702,7 @@ public void removeApplication(ApplicationId appId) batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -815,6 +859,7 @@ public void startResourceLocalization(String user, ApplicationId appId, try { db.put(bytes(key), proto.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -838,6 +883,7 @@ public void finishResourceLocalization(String user, ApplicationId appId, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -861,6 +907,7 @@ public void removeLocalizedResource(String user, ApplicationId appId, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -926,6 +973,7 @@ public void storeDeletionTask(int taskId, try { db.put(bytes(key), taskProto.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -936,6 +984,7 @@ public void removeDeletionTask(int taskId) throws IOException { try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1009,6 +1058,7 @@ public void removeNMTokenApplicationMasterKey( try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1023,6 +1073,7 @@ private void storeMasterKey(String dbKey, MasterKey key) try { db.put(bytes(dbKey), pb.getProto().toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1096,6 +1147,7 @@ public void storeContainerToken(ContainerId containerId, Long expTime) try { db.put(bytes(key), bytes(expTime.toString())); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1107,6 +1159,7 @@ public void removeContainerToken(ContainerId containerId) try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1157,6 +1210,7 @@ public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto) try { db.put(bytes(key), proto.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1167,6 +1221,7 @@ public void removeLogDeleter(ApplicationId appId) throws IOException { try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1198,6 +1253,7 @@ public void storeAssignedResources(Container container, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } @@ -1361,6 +1417,7 @@ public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException { try { db.delete(bytes(dbkey)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } return; @@ -1375,6 +1432,7 @@ public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, try { db.put(bytes(fullkey), data); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1386,6 +1444,7 @@ public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, try { db.delete(bytes(fullkey)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1409,6 +1468,7 @@ public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) candidates.add(key); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } finally { if (iter != null) { @@ -1422,6 +1482,7 @@ public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) db.delete(bytes(key)); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1555,6 +1616,11 @@ DB getDB() { return db; } + @VisibleForTesting + void setDB(DB testDb) { + this.db = testDb; + } + /** * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. * 2) Any incompatible change of state-store is a major upgrade, and any diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 598ea9e..e491ea1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; @@ -51,6 +52,8 @@ @Unstable public abstract class NMStateStoreService extends AbstractService { + protected NodeStatusUpdater nodeStatusUpdater; + public NMStateStoreService(String name) { super(name); } @@ -351,6 +354,10 @@ public boolean isNewlyCreated() { return false; } + public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { + this.nodeStatusUpdater = nodeStatusUpdater; + } + /** * Load the state of applications. * @return recovered state for applications. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 3cac5b4..b9c4fa5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; @@ -89,10 +90,12 @@ import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class TestNMLeveldbStateStoreService { private static final File TMP_DIR = new File( @@ -1165,6 +1168,34 @@ public void testStateStoreForResourceMapping() throws IOException { resourceMappings.getAssignedResources("numa").equals(numaRes)); } + @Test + public void testStateStoreNodeHealth() throws IOException { + DB myMocked = mock(DB.class); + stateStore.setDB(myMocked); + + ApplicationId appId = ApplicationId.newInstance(1234, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + DBException toThrow = new DBException(); + Mockito.doThrow(toThrow).when(myMocked). + put(any(byte[].class), any(byte[].class)); + // write some data + try { + // chosen a simple method could be any of the "void" methods + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); + stateStore.storeContainerKilled(containerId); + } catch (IOException ioErr) { + // Cause should be wrapped DBException + if (ioErr.getCause() instanceof DBException) { + // check the store is marked unhealthy + assertFalse("Statestore should have been unhealthy", + stateStore.isHealthy); + return; + } + } + Assert.fail("Expected exception not thrown"); + } + private StartContainerRequest storeMockContainer(ContainerId containerId) throws IOException { // create a container request