diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index bddc7c3..81a0f18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -450,6 +450,7 @@ protected void serviceInit(Configuration conf) throws Exception { // so that we make sure everything is up before registering with RM. addService(nodeStatusUpdater); ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater); + nmStore.setNodeStatusUpdater(nodeStatusUpdater); super.serviceInit(conf); // TODO add local dirs to del diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 3455874..4db218c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.records.Version; @@ -155,6 +156,8 @@ private DB db; private boolean isNewlyCreated; + @VisibleForTesting + boolean isHealthy; private Timer compactionTimer; /** @@ -169,6 +172,8 @@ public NMLeveldbStateStoreService() { @Override protected void startStorage() throws IOException { + // Assume that we're healthy when we start + isHealthy = true; } @Override @@ -187,6 +192,31 @@ public boolean isNewlyCreated() { return isNewlyCreated; } + /** + * If the state store throws an error after recovery has been performed + * then we can not trust it any more to reflect the NM state. We need to + * mark the store and node unhealthy. + * Errors during the recovery will cause a service failure and thus a NM + * start failure. Do not need to mark the store unhealthy for those. + * @param dbErr Exception + */ + private void markStoreUnHealthy(DBException dbErr) { + // Always log the error here, we might not see the error in the caller + LOG.error("Statestore exception: ", dbErr); + // We have already been marked unhealthy so no need to do it again. + if (!isHealthy) { + return; + } + // Mark unhealthy, an out of band heartbeat will be sent and the state + // will remain unhealthy (not recoverable). + // No need to close the store: does not make any difference at this point. + isHealthy = false; + // We could get here before the nodeStatusUpdater is set + NodeStatusUpdater nsu = getNodeStatusUpdater(); + if (nsu != null) { + nsu.reportException(dbErr); + } + } @Override public List loadContainersState() @@ -354,6 +384,7 @@ public void storeContainer(ContainerId containerId, int containerVersion, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -378,6 +409,7 @@ public void storeContainerQueued(ContainerId containerId) throws IOException { try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -393,6 +425,7 @@ private void removeContainerQueued(ContainerId containerId) try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -408,6 +441,7 @@ public void storeContainerPaused(ContainerId containerId) throws IOException { try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -424,6 +458,7 @@ public void removeContainerPaused(ContainerId containerId) try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -441,6 +476,7 @@ public void storeContainerDiagnostics(ContainerId containerId, try { db.put(bytes(key), bytes(diagnostics.toString())); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -459,6 +495,7 @@ public void storeContainerLaunched(ContainerId containerId) try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -488,6 +525,7 @@ public void storeContainerUpdateToken(ContainerId containerId, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -504,6 +542,7 @@ public void storeContainerKilled(ContainerId containerId) try { db.put(bytes(key), EMPTY_VALUE); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -520,6 +559,7 @@ public void storeContainerCompleted(ContainerId containerId, try { db.put(bytes(key), bytes(Integer.toString(exitCode))); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -532,6 +572,7 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId, try { db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts))); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -544,6 +585,7 @@ public void storeContainerWorkDir(ContainerId containerId, try { db.put(bytes(key), bytes(workDir)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -556,6 +598,7 @@ public void storeContainerLogDir(ContainerId containerId, try { db.put(bytes(key), bytes(logDir)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -589,6 +632,7 @@ public void removeContainer(ContainerId containerId) batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -638,6 +682,7 @@ public void storeApplication(ApplicationId appId, try { db.put(bytes(key), p.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -659,6 +704,7 @@ public void removeApplication(ApplicationId appId) batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -815,6 +861,7 @@ public void startResourceLocalization(String user, ApplicationId appId, try { db.put(bytes(key), proto.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -838,6 +885,7 @@ public void finishResourceLocalization(String user, ApplicationId appId, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -861,6 +909,7 @@ public void removeLocalizedResource(String user, ApplicationId appId, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -926,6 +975,7 @@ public void storeDeletionTask(int taskId, try { db.put(bytes(key), taskProto.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -936,6 +986,7 @@ public void removeDeletionTask(int taskId) throws IOException { try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1009,6 +1060,7 @@ public void removeNMTokenApplicationMasterKey( try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1023,6 +1075,7 @@ private void storeMasterKey(String dbKey, MasterKey key) try { db.put(bytes(dbKey), pb.getProto().toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1096,6 +1149,7 @@ public void storeContainerToken(ContainerId containerId, Long expTime) try { db.put(bytes(key), bytes(expTime.toString())); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1107,6 +1161,7 @@ public void removeContainerToken(ContainerId containerId) try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1157,6 +1212,7 @@ public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto) try { db.put(bytes(key), proto.toByteArray()); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1167,6 +1223,7 @@ public void removeLogDeleter(ApplicationId appId) throws IOException { try { db.delete(bytes(key)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1198,6 +1255,7 @@ public void storeAssignedResources(Container container, batch.close(); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } @@ -1361,6 +1419,7 @@ public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException { try { db.delete(bytes(dbkey)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } return; @@ -1375,6 +1434,7 @@ public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, try { db.put(bytes(fullkey), data); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1386,6 +1446,7 @@ public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, try { db.delete(bytes(fullkey)); } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1409,6 +1470,7 @@ public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) candidates.add(key); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } finally { if (iter != null) { @@ -1422,6 +1484,7 @@ public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) db.delete(bytes(key)); } } catch (DBException e) { + markStoreUnHealthy(e); throw new IOException(e); } } @@ -1555,6 +1618,11 @@ DB getDB() { return db; } + @VisibleForTesting + void setDB(DB testDb) { + this.db = testDb; + } + /** * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. * 2) Any incompatible change of state-store is a major upgrade, and any diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 598ea9e..f9b86bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; @@ -51,10 +52,20 @@ @Unstable public abstract class NMStateStoreService extends AbstractService { + private NodeStatusUpdater nodeStatusUpdater = null; + public NMStateStoreService(String name) { super(name); } + protected NodeStatusUpdater getNodeStatusUpdater() { + return nodeStatusUpdater; + } + + public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { + this.nodeStatusUpdater = nodeStatusUpdater; + } + public static class RecoveredApplicationsState { List applications; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 3cac5b4..3b7f164 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; @@ -89,10 +90,12 @@ import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class TestNMLeveldbStateStoreService { private static final File TMP_DIR = new File( @@ -1165,6 +1168,39 @@ public void testStateStoreForResourceMapping() throws IOException { resourceMappings.getAssignedResources("numa").equals(numaRes)); } + @Test + public void testStateStoreNodeHealth() throws IOException { + // keep the working DB clean, break a temp DB + DB keepDB = stateStore.getDB(); + DB myMocked = mock(DB.class); + stateStore.setDB(myMocked); + + ApplicationId appId = ApplicationId.newInstance(1234, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + DBException toThrow = new DBException(); + Mockito.doThrow(toThrow).when(myMocked). + put(any(byte[].class), any(byte[].class)); + // write some data + try { + // chosen a simple method could be any of the "void" methods + ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1); + stateStore.storeContainerKilled(containerId); + } catch (IOException ioErr) { + // Cause should be wrapped DBException + if (ioErr.getCause() instanceof DBException) { + // check the store is marked unhealthy + assertFalse("Statestore should have been unhealthy", + stateStore.isHealthy); + return; + } + } finally { + // restore the working DB + stateStore.setDB(keepDB); + } + Assert.fail("Expected exception not thrown"); + } + private StartContainerRequest storeMockContainer(ContainerId containerId) throws IOException { // create a container request