diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 956ea33..b819fbb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -73,4 +73,6 @@ boolean getDecommissioned(); void setDecommissioned(boolean isDecommissioned); + + public NodeStatusUpdater getNodeStatusUpdater(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a479be2..38e4e46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -246,7 +246,8 @@ protected void serviceInit(Configuration conf) throws Exception { // StatusUpdater should be added last so that it get started last // so that we make sure everything is up before registering with RM. addService(nodeStatusUpdater); - + ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater); + super.serviceInit(conf); // TODO add local dirs to del } @@ -326,6 +327,7 @@ public void run() { .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class); private final NMStateStoreService stateStore; private boolean isDecommissioned = false; + private NodeStatusUpdater nodeStatusUpdater; public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, @@ -420,6 +422,15 @@ public boolean getDecommissioned() { public void setDecommissioned(boolean isDecommissioned) { this.isDecommissioned = isDecommissioned; } + + @Override + public NodeStatusUpdater getNodeStatusUpdater() { + return this.nodeStatusUpdater; + } + + public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { + this.nodeStatusUpdater = nodeStatusUpdater; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 12166e0..af7edec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -301,7 +301,7 @@ private void recoverContainer(RecoveredContainerState rcs) Container container = new ContainerImpl(getConfig(), dispatcher, context.getNMStateStore(), req.getContainerLaunchContext(), credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), - rcs.getDiagnostics(), rcs.getKilled()); + rcs.getDiagnostics(), rcs.getKilled(), context); context.getContainers().put(containerId, container); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); @@ -806,7 +806,7 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, Container container = new ContainerImpl(getConfig(), this.dispatcher, context.getNMStateStore(), launchContext, - credentials, metrics, containerTokenIdentifier); + credentials, metrics, containerTokenIdentifier, context); ApplicationId applicationID = containerId.getApplicationAttemptId().getApplicationId(); if (context.getContainers().putIfAbsent(containerId, container) != null) { @@ -950,10 +950,6 @@ private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, NMAuditLogger.logSuccess(container.getUser(), AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID .getApplicationAttemptId().getApplicationId(), containerID); - - // TODO: Move this code to appropriate place once kill_container is - // implemented. - nodeStatusUpdater.sendOutofBandHeartBeat(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index fa54ee1..c467df1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; @@ -110,11 +111,12 @@ RecoveredContainerStatus.REQUESTED; // whether container was marked as killed after recovery private boolean recoveredAsKilled = false; + private Context context; public ContainerImpl(Configuration conf, Dispatcher dispatcher, NMStateStoreService stateStore, ContainerLaunchContext launchContext, Credentials creds, NodeManagerMetrics metrics, - ContainerTokenIdentifier containerTokenIdentifier) { + ContainerTokenIdentifier containerTokenIdentifier, Context context) { this.daemonConf = conf; this.dispatcher = dispatcher; this.stateStore = stateStore; @@ -129,6 +131,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); + this.context = context; stateMachine = stateMachineFactory.make(this); } @@ -139,9 +142,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier, RecoveredContainerStatus recoveredStatus, int exitCode, - String diagnostics, boolean wasKilled) { + String diagnostics, boolean wasKilled, Context context) { this(conf, dispatcher, stateStore, launchContext, creds, metrics, - containerTokenIdentifier); + containerTokenIdentifier, context); this.recoveredStatus = recoveredStatus; this.exitCode = exitCode; this.recoveredAsKilled = wasKilled; @@ -933,13 +936,14 @@ public void transition(ContainerImpl container, ContainerEvent event) { @SuppressWarnings("unchecked") public void transition(ContainerImpl container, ContainerEvent event) { container.finished(); - //if the current state is NEW it means the CONTAINER_INIT was never + //if the current state is NEW it means the CONTAINER_INIT was never // sent for the event, thus no need to send the CONTAINER_STOP - if (container.getCurrentState() + if (container.getCurrentState() != org.apache.hadoop.yarn.api.records.ContainerState.NEW) { container.dispatcher.getEventHandler().handle(new AuxServicesEvent (AuxServicesEventType.CONTAINER_STOP, container)); } + container.context.getNodeStatusUpdater().sendOutofBandHeartBeat(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index fabb03b..c8cb137 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -131,6 +131,7 @@ public long getRMIdentifier() { nodeStatusUpdater.init(conf); ((NMContext)context).setContainerManager(containerManager); nodeStatusUpdater.start(); + ((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater); containerManager.init(conf); containerManager.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index f2a3a4a..a4f795b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -228,7 +228,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); Container container = new ContainerImpl(conf, mockDispatcher, - stateStore, launchContext, null, mockMetrics, containerToken); + stateStore, launchContext, null, mockMetrics, containerToken, null); this.context.getContainers().put(firstContainerID, container); } else if (heartBeatID == 2) { // Checks on the RM end @@ -258,7 +258,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) .getCanonicalHostName(), 1234, user, resource, currentTime + 10000, 123, "password".getBytes(), currentTime)); Container container = new ContainerImpl(conf, mockDispatcher, - stateStore, launchContext, null, mockMetrics, containerToken); + stateStore, launchContext, null, mockMetrics, containerToken, null); this.context.getContainers().put(secondContainerID, container); } else if (heartBeatID == 3) { // Checks on the RM end diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index 1907e1a..c63b4a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -182,6 +182,7 @@ public void setup() throws IOException { nodeStatusUpdater.init(conf); containerManager.init(conf); nodeStatusUpdater.start(); + ((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater); } protected ContainerManagerImpl diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index 59cc947..7247199 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -192,7 +192,7 @@ public void testAuxEventDispatch() { ContainerId.newInstance(attemptId, 1), "", "", Resource.newInstance(1, 1), 0,0,0, Priority.newInstance(0), 0); Container container = new ContainerImpl(null, null, null, null, null, - null, cti); + null, cti, null); ContainerId containerId = container.getContainerId(); Resource resource = container.getResource(); event = new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT,container); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index a813e98..8b4f6db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; @@ -247,6 +248,7 @@ public void testInitWhileDone() throws Exception { wc.containerSuccessful(); wc.containerResourcesCleanup(); assertEquals(ContainerState.DONE, wc.c.getContainerState()); + verifyOutofBandHeartBeat(wc); assertNull(wc.c.getLocalizedResources()); // Now in DONE, issue INIT wc.initContainer(); @@ -276,6 +278,7 @@ public void testLocalizationFailureAtDone() throws Exception { wc.containerSuccessful(); wc.containerResourcesCleanup(); assertEquals(ContainerState.DONE, wc.c.getContainerState()); + verifyOutofBandHeartBeat(wc); assertNull(wc.c.getLocalizedResources()); // Now in DONE, issue RESOURCE_FAILED as done by LocalizeRunner wc.resourceFailedContainer(); @@ -321,6 +324,7 @@ public void testKillOnNew() throws Exception { assertEquals(ContainerState.NEW, wc.c.getContainerState()); wc.killContainer(); assertEquals(ContainerState.DONE, wc.c.getContainerState()); + verifyOutofBandHeartBeat(wc); assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, wc.c.cloneAndGetContainerStatus().getExitStatus()); assertTrue(wc.c.cloneAndGetContainerStatus().getDiagnostics() @@ -569,6 +573,10 @@ private void verifyCleanupCall(WrappedContainer wc) throws Exception { verify(wc.localizerBus).handle(argThat(matchesReq)); } + private void verifyOutofBandHeartBeat(WrappedContainer wc) { + verify(wc.context.getNodeStatusUpdater()).sendOutofBandHeartBeat(); + } + private static class ResourcesReleasedMatcher extends ArgumentMatcher { final HashSet resources = @@ -695,6 +703,7 @@ public boolean matches(Object o) { final Container c; final Map localResources; final Map serviceData; + final Context context = mock(Context.class); WrappedContainer(int appId, long timestamp, int id, String user) throws IOException { @@ -720,11 +729,12 @@ public boolean matches(Object o) { dispatcher.register(ApplicationEventType.class, appBus); dispatcher.register(LogHandlerEventType.class, LogBus); - Context context = mock(Context.class); when(context.getApplications()).thenReturn( new ConcurrentHashMap()); NMNullStateStoreService stateStore = new NMNullStateStoreService(); when(context.getNMStateStore()).thenReturn(stateStore); + NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class); + when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater); ContainerExecutor executor = mock(ContainerExecutor.class); launcher = new ContainersLauncher(context, dispatcher, executor, null, null); @@ -781,7 +791,7 @@ public boolean matches(Object o) { when(ctxt.getServiceData()).thenReturn(serviceData); c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(), - ctxt, null, metrics, identifier); + ctxt, null, metrics, identifier, context); dispatcher.register(ContainerEventType.class, new EventHandler() { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index a7006e0..ad47c28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -211,7 +211,7 @@ public boolean isPmemCheckEnabled() { Container container = new ContainerImpl(conf, dispatcher, stateStore, launchContext, null, metrics, - BuilderUtils.newContainerTokenIdentifier(containerToken)) { + BuilderUtils.newContainerTokenIdentifier(containerToken), null) { @Override public ContainerState getContainerState() {