diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java index 7122578..4f591b1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java @@ -46,4 +46,9 @@ * Containers preempted by the framework. */ public static final int PREEMPTED = -102; + + /** + * AuxServices fail + */ + public static final int AUXSERVICE_FAILED = -103; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 13f4365..7cb3a50 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -32,6 +32,7 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; @@ -39,6 +40,9 @@ import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; public class AuxServices extends AbstractService implements ServiceStateChangeListener, EventHandler { @@ -171,27 +175,44 @@ public void handle(AuxServicesEvent event) { // TODO kill all containers waiting on Application return; } - service.initializeApplication(new ApplicationInitializationContext(event - .getUser(), event.getApplicationID(), event.getServiceData())); + try { + service + .initializeApplication(new ApplicationInitializationContext(event + .getUser(), event.getApplicationID(), event.getServiceData())); + } catch (Throwable th) { + handleAuxServiceFail(event, th); + } break; case APPLICATION_STOP: - for (AuxiliaryService serv : serviceMap.values()) { - serv.stopApplication(new ApplicationTerminationContext(event - .getApplicationID())); + try { + for (AuxiliaryService serv : serviceMap.values()) { + serv.stopApplication(new ApplicationTerminationContext(event + .getApplicationID())); + } + } catch (Throwable ex) { + //Do nothing. The application is already finished. } break; case CONTAINER_INIT: - for (AuxiliaryService serv : serviceMap.values()) { - serv.initializeContainer(new ContainerInitializationContext( - event.getUser(), event.getContainer().getContainerId(), - event.getContainer().getResource())); + try { + for (AuxiliaryService serv : serviceMap.values()) { + serv.initializeContainer(new ContainerInitializationContext( + event.getUser(), event.getContainer().getContainerId(), + event.getContainer().getResource())); + } + } catch (Throwable th) { + handleAuxServiceFail(event, th); } break; case CONTAINER_STOP: - for (AuxiliaryService serv : serviceMap.values()) { - serv.stopContainer(new ContainerTerminationContext( - event.getUser(), event.getContainer().getContainerId(), - event.getContainer().getResource())); + try { + for (AuxiliaryService serv : serviceMap.values()) { + serv.stopContainer(new ContainerTerminationContext( + event.getUser(), event.getContainer().getContainerId(), + event.getContainer().getResource())); + } + } catch (Throwable ex) { + //Do nothing. The container is already finished } break; default: @@ -199,4 +220,15 @@ public void handle(AuxServicesEvent event) { } } + private void handleAuxServiceFail(AuxServicesEvent event, Throwable th) { + Container container = event.getContainer(); + StringBuilder diagnostics = new StringBuilder(); + diagnostics.append("AuxService failed at Container " + + container.getContainerId() + " : " + th.getMessage()); + container.handle(new ContainerExitEvent( + container.getContainerId(), + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + ContainerExitStatus.AUXSERVICE_FAILED, + diagnostics.toString())); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index c2d32b5..00b27ee 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -142,6 +142,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.NEW, ContainerState.DONE, ContainerEventType.KILL_CONTAINER, CONTAINER_DONE_TRANSITION) + .addTransition(ContainerState.NEW, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) // From LOCALIZING State .addTransition(ContainerState.LOCALIZING, @@ -157,6 +161,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillDuringLocalizationTransition()) + .addTransition(ContainerState.LOCALIZING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) // From LOCALIZATION_FAILED State .addTransition(ContainerState.LOCALIZATION_FAILED, @@ -180,6 +188,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, .addTransition(ContainerState.LOCALIZATION_FAILED, ContainerState.LOCALIZATION_FAILED, ContainerEventType.RESOURCE_FAILED) + .addTransition(ContainerState.LOCALIZATION_FAILED, ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) // From LOCALIZED State .addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING, @@ -233,7 +244,12 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.EXITED_WITH_FAILURE, - ContainerEventType.KILL_CONTAINER) + EnumSet.of( + ContainerEventType.KILL_CONTAINER, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + ContainerEventType.RESOURCE_LOCALIZED, + ContainerEventType.RESOURCE_FAILED, + ContainerEventType.CONTAINER_LAUNCHED)) // From KILLING State. .addTransition(ContainerState.KILLING, @@ -295,6 +311,12 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, // that container) fails for some reason .addTransition(ContainerState.DONE, ContainerState.DONE, ContainerEventType.RESOURCE_FAILED) + .addTransition(ContainerState.DONE, ContainerState.DONE, + EnumSet.of( + ContainerEventType.RESOURCE_LOCALIZED, + ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)) // create the topology tables .installTopology(); @@ -511,12 +533,13 @@ public ContainerState transition(ContainerImpl container, if (csd != null) { // This can happen more than once per Application as each container may // have distinct service data + ApplicationId appId = + container.containerId.getApplicationAttemptId().getApplicationId(); for (Map.Entry service : csd.entrySet()) { container.dispatcher.getEventHandler().handle( new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT, - container.user, container.containerId - .getApplicationAttemptId().getApplicationId(), - service.getKey().toString(), service.getValue())); + container.user, appId, + service.getKey().toString(), service.getValue(), container)); } } @@ -687,6 +710,7 @@ public ExitedWithFailureTransition(boolean clCleanupRequired) { public void transition(ContainerImpl container, ContainerEvent event) { ContainerExitEvent exitEvent = (ContainerExitEvent) event; container.exitCode = exitEvent.getExitCode(); + container.diagnostics.append(exitEvent.getDiagnosticInfo()).append("\n"); // TODO: Add containerWorkDir to the deletion service. // TODO: Add containerOuputDir to the deletion service. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 14d445f..bc4879c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -53,6 +53,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; @@ -88,6 +90,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; public class TestContainer { @@ -465,6 +468,52 @@ public boolean matches(Object o) { } @Test + public void testAuxServiceFail() throws Exception { + WrappedContainer wc = null; + try { + wc = new WrappedContainer(9, 314159265358979L, 4344, "yak", false, true) { + @Override + public EventHandler createAuxEventHandler() { + return new AuxEventHandler(); + } + + class AuxEventHandler implements EventHandler { + + @Override + public void handle(AuxServicesEvent event) { + switch (event.getType()) { + case APPLICATION_INIT: + Container container = event.getContainer(); + StringBuilder diagnostics = new StringBuilder(); + diagnostics.append("AuxService failed at Container " + + container.getContainerId()); + container.handle(new ContainerExitEvent( + container.getContainerId(), + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + ContainerExitStatus.AUXSERVICE_FAILED, + diagnostics.toString())); + break; + case APPLICATION_STOP: + case CONTAINER_INIT: + case CONTAINER_STOP: + break; + default: + throw new RuntimeException("Unknown type: " + event.getType()); + } + } + } + }; + assertEquals(ContainerState.NEW, wc.c.getContainerState()); + wc.initContainer(); + assertEquals(ContainerState.EXITED_WITH_FAILURE, wc.c.getContainerState()); + } finally { + if (wc != null) { + wc.finished(); + } + } + } + + @Test public void testLaunchAfterKillRequest() throws Exception { WrappedContainer wc = null; try { @@ -637,7 +686,7 @@ public boolean matches(Object o) { localizerBus = mock(EventHandler.class); launcherBus = mock(EventHandler.class); monitorBus = mock(EventHandler.class); - auxBus = mock(EventHandler.class); + auxBus = createAuxEventHandler(); appBus = mock(EventHandler.class); LogBus = mock(EventHandler.class); dispatcher.register(LocalizationEventType.class, localizerBus); @@ -840,5 +889,9 @@ public int getLocalResourceCount() { public String getDiagnostics() { return c.cloneAndGetContainerStatus().getDiagnostics(); } + + public EventHandler createAuxEventHandler() { + return mock(EventHandler.class); + } } }