diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 34709104264..45afa08fcde 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -475,6 +475,11 @@ private void recoverContainer(RecoveredContainerState rcs) "Due to invalid StateStore info container was killed" + " during recovery")); } + if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED + && rcs.getCapability() != null) { + metrics.launchedContainer(); + metrics.allocateContainer(rcs.getCapability()); + } } else { if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) { LOG.warn(containerId + " has no corresponding application!"); @@ -1141,7 +1146,7 @@ protected void startContainerInternal( } this.context.getNMStateStore().storeContainer(containerId, - containerTokenIdentifier.getVersion(), containerStartTime, request); + containerTokenIdentifier, containerStartTime, request); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 6f643b04d5f..82440e91a85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -121,6 +121,8 @@ private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued"; private static final String CONTAINER_PAUSED_KEY_SUFFIX = "/paused"; + private static final String CONTAINER_START_TOKEN_SUFFIX = + "/startToken"; private static final String CONTAINER_UPDATE_TOKEN_SUFFIX = "/updateToken"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; @@ -237,7 +239,7 @@ boolean isHealthy() { iter.seek(bytes(CONTAINERS_KEY_PREFIX)); while (iter.hasNext()) { - Entry entry = iter.peekNext(); + Entry entry = iter.peekNext(); String key = asString(entry.getKey()); if (!key.startsWith(CONTAINERS_KEY_PREFIX)) { break; @@ -337,6 +339,10 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, rcs.startRequest.setContainerToken(updatedToken); rcs.capability = new ResourcePBImpl(tokenIdentifierProto.getResource()); rcs.version = tokenIdentifierProto.getVersion(); + } else if (suffix.equals(CONTAINER_START_TOKEN_SUFFIX)) { + ContainerTokenIdentifierProto tokenIdentifierProto = + ContainerTokenIdentifierProto.parseFrom(entry.getValue()); + rcs.capability = new ResourcePBImpl(tokenIdentifierProto.getResource()); } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { rcs.setRemainingRetryAttempts( Integer.parseInt(asString(entry.getValue()))); @@ -375,31 +381,41 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } @Override - public void storeContainer(ContainerId containerId, int containerVersion, + public void storeContainer(ContainerId containerId, + ContainerTokenIdentifier containerTokenIdentifier, long startTime, StartContainerRequest startRequest) throws IOException { String idStr = containerId.toString(); if (LOG.isDebugEnabled()) { LOG.debug("storeContainer: containerId= " + idStr + ", startRequest= " + startRequest); } - String keyRequest = getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX); - String keyVersion = getContainerVersionKey(idStr); - String keyStartTime = + final String keyVersion = getContainerVersionKey(idStr); + final int containerVersion = containerTokenIdentifier.getVersion(); + + final String keyRequest = + getContainerKey(idStr, CONTAINER_REQUEST_KEY_SUFFIX); + final StartContainerRequestProto startContainerRequest = + ((StartContainerRequestPBImpl) startRequest).getProto(); + + final String keyStartTime = getContainerKey(idStr, CONTAINER_START_TIME_KEY_SUFFIX); + final String startTimeValue = Long.toString(startTime); + + final String keyStartToken = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_START_TOKEN_SUFFIX; + final ContainerTokenIdentifierProto containerToken = + containerTokenIdentifier.getProto(); + try { - WriteBatch batch = db.createWriteBatch(); - try { - batch.put(bytes(keyRequest), - ((StartContainerRequestPBImpl) startRequest).getProto(). - toByteArray()); - batch.put(bytes(keyStartTime), bytes(Long.toString(startTime))); + try (WriteBatch batch = db.createWriteBatch()) { + batch.put(bytes(keyRequest), startContainerRequest.toByteArray()); + batch.put(bytes(keyStartTime), bytes(startTimeValue)); if (containerVersion != 0) { batch.put(bytes(keyVersion), - bytes(Integer.toString(containerVersion))); + bytes(Integer.toString(containerVersion))); } + batch.put(bytes(keyStartToken), containerToken.toByteArray()); db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { markStoreUnHealthy(e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index f217f2f8605..eb4af4b3b5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -72,8 +72,9 @@ public void removeApplication(ApplicationId appId) throws IOException { } @Override - public void storeContainer(ContainerId containerId, int version, - long startTime, StartContainerRequest startRequest) throws IOException { + public void storeContainer(ContainerId containerId, + ContainerTokenIdentifier containerTokenIdentifier, + long startTime, StartContainerRequest startRequest) { } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 0ea0ef3b86c..b4604bfe7b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -410,13 +410,14 @@ public abstract void removeApplication(ApplicationId appId) /** * Record a container start request * @param containerId the container ID - * @param containerVersion the container Version + * @param containerTokenIdentifier the container token identifier * @param startTime container start time * @param startRequest the container start request * @throws IOException */ public abstract void storeContainer(ContainerId containerId, - int containerVersion, long startTime, StartContainerRequest startRequest) + ContainerTokenIdentifier containerTokenIdentifier, + long startTime, StartContainerRequest startRequest) throws IOException; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 93d0afb1185..b9698756bc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -107,7 +107,7 @@ protected static File remoteLogDir; protected static File tmpDir; - protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); + protected NodeManagerMetrics metrics = NodeManagerMetrics.create(); public BaseContainerManagerTest() throws UnsupportedFileSystemException { localFS = FileContext.getLocalFSFileContext(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index bf8b500b87f..b685a48a7aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.ServerSocketUtil; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -104,6 +105,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.metrics.TestNodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -398,6 +400,61 @@ public void testNMRecoveryForAppFinishedWithLogAggregationFailure() cm.stop(); } + @Test + public void testNodeManagerMetricsRecovery() throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + Context context = createContext(conf, stateStore); + ContainerManagerImpl cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + metrics.addResource(Resource.newInstance(10240, 8)); + + // add an application by starting a container + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + Map containerEnv = Collections.emptyMap(); + Map serviceData = Collections.emptyMap(); + Map localResources = Collections.emptyMap(); + List commands = Arrays.asList("sleep 60s".split(" ")); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, containerEnv, commands, serviceData, + null, null); + StartContainersResponse startResponse = startContainer(context, cm, cid, + clc, null); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + Application app = context.getApplications().get(appId); + assertNotNull(app); + + // make sure the container reaches RUNNING state + waitForNMContainerState(cm, cid, + org.apache.hadoop.yarn.server.nodemanager + .containermanager.container.ContainerState.RUNNING); + TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7); + + // restart and verify metrics could be recovered + cm.stop(); + DefaultMetricsSystem.shutdown(); + metrics = NodeManagerMetrics.create(); + metrics.addResource(Resource.newInstance(10240, 8)); + TestNodeManagerMetrics.checkMetrics(0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 8); + context = createContext(conf, stateStore); + cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + TestNodeManagerMetrics.checkMetrics(1, 0, 0, 0, 0, 1, 1, 1, 9, 1, 7); + cm.stop(); + } + @Test public void testContainerResizeRecovery() throws Exception { conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java index d21e7ad7706..c5f80ba958a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/metrics/TestNodeManagerMetrics.java @@ -113,8 +113,8 @@ public void testReferenceOfSingletonJvmMetrics() { assertGauge("AvailableVCores", 19, rb); } - private void checkMetrics(int launched, int completed, int failed, int killed, - int initing, int running, int allocatedGB, + public static void checkMetrics(int launched, int completed, int failed, + int killed, int initing, int running, int allocatedGB, int allocatedContainers, int availableGB, int allocatedVCores, int availableVCores) { MetricsRecordBuilder rb = getMetrics("NodeManagerMetrics"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index b67d11fceb3..0b459be47e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -132,11 +132,12 @@ public synchronized void removeApplication(ApplicationId appId) @Override public synchronized void storeContainer(ContainerId containerId, - int version, long startTime, StartContainerRequest startRequest) - throws IOException { + ContainerTokenIdentifier containerTokenIdentifier, + long startTime, StartContainerRequest startRequest) { RecoveredContainerState rcs = new RecoveredContainerState(); rcs.startRequest = startRequest; - rcs.version = version; + rcs.version = containerTokenIdentifier.getVersion(); + rcs.capability = containerTokenIdentifier.getResource(); rcs.setStartTime(startTime); containerStates.put(containerId, rcs); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 265b3e68833..115c20d8706 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -242,7 +242,12 @@ public void testContainerStorage() throws IOException { // store a container and verify recovered long containerStartTime = System.currentTimeMillis(); - stateStore.storeContainer(containerId, 0, containerStartTime, containerReq); + + Resource containerResource = Resource.newInstance(1024, 2); + ContainerTokenIdentifier containerTokenIdentifier = + createContainerTokenIdentifier(containerId, containerResource); + stateStore.storeContainer(containerId, containerTokenIdentifier, + containerStartTime, containerReq); // verify the container version key is not stored for new containers DB db = stateStore.getDB(); @@ -260,6 +265,7 @@ public void testContainerStorage() throws IOException { assertEquals(false, rcs.getKilled()); assertEquals(containerReq, rcs.getStartRequest()); assertTrue(rcs.getDiagnostics().isEmpty()); + assertEquals(containerResource, rcs.getCapability()); // store a new container record without StartContainerRequest ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); @@ -279,6 +285,7 @@ public void testContainerStorage() throws IOException { assertEquals(false, rcs.getKilled()); assertEquals(containerReq, rcs.getStartRequest()); assertTrue(rcs.getDiagnostics().isEmpty()); + assertEquals(containerResource, rcs.getCapability()); // launch the container, add some diagnostics, and verify recovered StringBuilder diags = new StringBuilder(); @@ -294,6 +301,7 @@ public void testContainerStorage() throws IOException { assertEquals(false, rcs.getKilled()); assertEquals(containerReq, rcs.getStartRequest()); assertEquals(diags.toString(), rcs.getDiagnostics()); + assertEquals(containerResource, rcs.getCapability()); // pause the container, and verify recovered stateStore.storeContainerPaused(containerId); @@ -379,6 +387,12 @@ public void testContainerStorage() throws IOException { assertTrue(recoveredContainers.isEmpty()); } + private ContainerTokenIdentifier createContainerTokenIdentifier( + ContainerId containerId, Resource res) { + return new ContainerTokenIdentifier(containerId, "host", "user", res, + 9876543210L, 42, 2468, Priority.newInstance(7), 13579); + } + private void validateRetryAttempts(ContainerId containerId) throws IOException { // store finishTimeForRetryAttempts @@ -1268,7 +1282,11 @@ private StartContainerRequest storeMockContainer(ContainerId containerId) "tokenservice"); StartContainerRequest containerReq = StartContainerRequest.newInstance(clc, containerToken); - stateStore.storeContainer(containerId, 0, 0, containerReq); + ContainerTokenIdentifier containerTokenIdentifier = + createContainerTokenIdentifier(containerId, + Resource.newInstance(1024, 2)); + stateStore.storeContainer(containerId, containerTokenIdentifier, 0, + containerReq); return containerReq; }