diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java index 7122578..3be66c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerExitStatus.java @@ -46,4 +46,9 @@ * Containers preempted by the framework. */ public static final int PREEMPTED = -102; + + /** + * AuxServices fail + */ + public static final int AUXSERVICE_ERROR = -103; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java index 0e0e766..8a89fbc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java @@ -33,13 +33,18 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceStateChangeListener; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; import com.google.common.base.Preconditions; @@ -174,40 +179,46 @@ public void stateChanged(Service service) { public void handle(AuxServicesEvent event) { LOG.info("Got event " + event.getType() + " for appId " + event.getApplicationID()); - switch (event.getType()) { - case APPLICATION_INIT: - LOG.info("Got APPLICATION_INIT for service " + event.getServiceID()); - AuxiliaryService service = serviceMap.get(event.getServiceID()); - if (null == service) { - LOG.info("service is null"); - // TODO kill all containers waiting on Application - return; - } - service.initializeApplication(new ApplicationInitializationContext(event - .getUser(), event.getApplicationID(), event.getServiceData())); - break; - case APPLICATION_STOP: - for (AuxiliaryService serv : serviceMap.values()) { - serv.stopApplication(new ApplicationTerminationContext(event - .getApplicationID())); - } - break; - case CONTAINER_INIT: - for (AuxiliaryService serv : serviceMap.values()) { - serv.initializeContainer(new ContainerInitializationContext( - event.getUser(), event.getContainer().getContainerId(), - event.getContainer().getResource())); - } - break; - case CONTAINER_STOP: - for (AuxiliaryService serv : serviceMap.values()) { - serv.stopContainer(new ContainerTerminationContext( - event.getUser(), event.getContainer().getContainerId(), - event.getContainer().getResource())); + try { + switch (event.getType()) { + case APPLICATION_INIT: + LOG.info("Got APPLICATION_INIT for service " + event.getServiceID()); + AuxiliaryService service = serviceMap.get(event.getServiceID()); + if (null == service) { + LOG.info("service is null"); + throw new YarnException("The service is null." + + "kill all containers waiting for the service: " + + event.getServiceID()); + } + service + .initializeApplication(new ApplicationInitializationContext(event + .getUser(), event.getApplicationID(), event.getServiceData())); + break; + case APPLICATION_STOP: + for (AuxiliaryService serv : serviceMap.values()) { + serv.stopApplication(new ApplicationTerminationContext(event + .getApplicationID())); + } + break; + case CONTAINER_INIT: + for (AuxiliaryService serv : serviceMap.values()) { + serv.initializeContainer(new ContainerInitializationContext( + event.getUser(), event.getContainer().getContainerId(), + event.getContainer().getResource())); + } + break; + case CONTAINER_STOP: + for (AuxiliaryService serv : serviceMap.values()) { + serv.stopContainer(new ContainerTerminationContext( + event.getUser(), event.getContainer().getContainerId(), + event.getContainer().getResource())); + } + break; + default: + throw new RuntimeException("Unknown type: " + event.getType()); } - break; - default: - throw new RuntimeException("Unknown type: " + event.getType()); + } catch (Throwable th) { + handleAuxServiceFail(event, th); } } @@ -217,4 +228,22 @@ private boolean validateAuxServiceName(String name) { } return p.matcher(name).matches(); } + + private void handleAuxServiceFail(AuxServicesEvent event, Throwable th) { + if (event.getType() instanceof AuxServicesEventType) { + Container container = event.getContainer(); + if (container != null) { + StringBuilder diagnostics = new StringBuilder(); + diagnostics.append("AuxService error at Container " + + container.getContainerId() + " : " + th.getMessage()); + container.handle(new ContainerExitEvent( + container.getContainerId(), + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + ContainerExitStatus.AUXSERVICE_ERROR, + diagnostics.toString())); + } + } else { + throw new RuntimeException(th.getMessage()); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index c2d32b5..759c1e8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; -import java.io.IOException; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -157,6 +156,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING, ContainerEventType.KILL_CONTAINER, new KillDuringLocalizationTransition()) + .addTransition(ContainerState.LOCALIZING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(false)) // From LOCALIZATION_FAILED State .addTransition(ContainerState.LOCALIZATION_FAILED, @@ -180,6 +183,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, .addTransition(ContainerState.LOCALIZATION_FAILED, ContainerState.LOCALIZATION_FAILED, ContainerEventType.RESOURCE_FAILED) + .addTransition(ContainerState.LOCALIZATION_FAILED, ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(false)) // From LOCALIZED State .addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING, @@ -222,6 +228,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.EXITED_WITH_SUCCESS, ContainerEventType.KILL_CONTAINER) + .addTransition(ContainerState.EXITED_WITH_SUCCESS, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) // From EXITED_WITH_FAILURE State .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE, @@ -232,8 +242,15 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) .addTransition(ContainerState.EXITED_WITH_FAILURE, - ContainerState.EXITED_WITH_FAILURE, - ContainerEventType.KILL_CONTAINER) + ContainerState.EXITED_WITH_FAILURE, + EnumSet.of( + ContainerEventType.KILL_CONTAINER, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + ContainerEventType.RESOURCE_LOCALIZED, + ContainerEventType.RESOURCE_FAILED, + ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST)) // From KILLING State. .addTransition(ContainerState.KILLING, @@ -281,6 +298,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, ContainerEventType.KILL_CONTAINER) + .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(false)) // From DONE .addTransition(ContainerState.DONE, ContainerState.DONE, @@ -295,6 +316,14 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, // that container) fails for some reason .addTransition(ContainerState.DONE, ContainerState.DONE, ContainerEventType.RESOURCE_FAILED) + .addTransition(ContainerState.DONE, ContainerState.DONE, + EnumSet.of( + ContainerEventType.RESOURCE_LOCALIZED, + ContainerEventType.CONTAINER_LAUNCHED, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST)) // create the topology tables .installTopology(); @@ -511,12 +540,13 @@ public ContainerState transition(ContainerImpl container, if (csd != null) { // This can happen more than once per Application as each container may // have distinct service data + ApplicationId appId = + container.containerId.getApplicationAttemptId().getApplicationId(); for (Map.Entry service : csd.entrySet()) { container.dispatcher.getEventHandler().handle( new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT, - container.user, container.containerId - .getApplicationAttemptId().getApplicationId(), - service.getKey().toString(), service.getValue())); + container.user, appId, + service.getKey().toString(), service.getValue(), container)); } } @@ -687,6 +717,7 @@ public ExitedWithFailureTransition(boolean clCleanupRequired) { public void transition(ContainerImpl container, ContainerEvent event) { ContainerExitEvent exitEvent = (ContainerExitEvent) event; container.exitCode = exitEvent.getExitCode(); + container.diagnostics.append(exitEvent.getDiagnosticInfo()).append("\n"); // TODO: Add containerWorkDir to the deletion service. // TODO: Add containerOuputDir to the deletion service. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java index a58f3bd..e545d48 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java @@ -39,6 +39,7 @@ import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -50,9 +51,22 @@ import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container .Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container .ContainerImpl; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; public class TestAuxServices { private static final Log LOG = LogFactory.getLog(TestAuxServices.class); @@ -139,6 +153,100 @@ public ServiceB() { } } + static class ExceptionService extends LightService { + + public ExceptionService() { + super("ExceptionService", 'E', 67, ByteBuffer.wrap("ExceptionService" + .getBytes())); + } + + @Override + public void initializeApplication(ApplicationInitializationContext context) { + throw new RuntimeException("Throw Exception when initiate Application"); + } + + @Override + public void stopApplication(ApplicationTerminationContext context) { + throw new RuntimeException("Throw Exception when stop Application"); + } + + @Override + public void initializeContainer( + ContainerInitializationContext initContainerContext) { + throw new RuntimeException("Throw Exception when initiate Container"); + } + + @Override + public void stopContainer( + ContainerTerminationContext stopContainerContext) { + throw new RuntimeException("Throw Exception when stop container"); + } + } + + @Test + public void testAuxEventDispatchThrowsException() { + Configuration conf = new Configuration(); + conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] { "ExceptionSrv"}); + conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "ExceptionSrv"), + ExceptionService.class, Service.class); + final AuxServices aux = new AuxServices(); + aux.init(conf); + aux.start(); + + String serviceMetaName = "ExceptionSrv"; + ByteBuffer serviceMeta = aux.getMetaData().get(serviceMetaName); + + Container container = mock(ContainerImpl.class); + when(container.getUser()).thenReturn("test"); + ApplicationId appId = ApplicationId.newInstance(0, 67); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 0); + ContainerId containerId = ContainerId.newInstance(appAttemptId, 0); + when(container.getContainerId()).thenReturn(containerId); + doNothing().when(container).handle(any(ContainerEvent.class)); + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + Object arg = invocation.getArguments()[0]; + Assert.assertEquals(((ContainerExitEvent) arg).getExitCode(), + ContainerExitStatus.AUXSERVICE_ERROR); + return null; + }}).when(container).handle(any(ContainerEvent.class)); + + AuxServicesEvent event = + new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT, + container.getUser(), appId, + serviceMetaName, serviceMeta, container); + aux.handle(event); + verify(container).handle(any(ContainerExitEvent.class)); + + event = + new AuxServicesEvent(AuxServicesEventType.APPLICATION_STOP, + container.getUser(), appId, + serviceMetaName, serviceMeta, null); + aux.handle(event); + verify(container, times(1)).handle(any(ContainerExitEvent.class)); + + event = + new AuxServicesEvent(AuxServicesEventType.CONTAINER_INIT, + container.getUser(), appId, + serviceMetaName, serviceMeta, container); + aux.handle(event); + verify(container, times(2)).handle(any(ContainerExitEvent.class)); + + event = + new AuxServicesEvent(AuxServicesEventType.CONTAINER_STOP, + container.getUser(), appId, + serviceMetaName, serviceMeta, container); + aux.handle(event); + verify(container, times(3)).handle(any(ContainerExitEvent.class)); + + event = + new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT, + container.getUser(), appId, + "fakeServiceName", null, container); + aux.handle(event); + verify(container, times(4)).handle(any(ContainerExitEvent.class)); + } + @Test public void testAuxEventDispatch() { Configuration conf = new Configuration(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index ebc400a..efca62f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -53,6 +53,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -495,6 +496,22 @@ public boolean matches(Object o) { } @Test + public void testAuxServiceFail() throws Exception { + WrappedContainer wc = null; + try { + wc = new WrappedContainer(9, 314159265358979L, 4344, "yak", false, true); + assertEquals(ContainerState.NEW, wc.c.getContainerState()); + wc.initContainer(); + wc.containerFailed(ContainerExitStatus.AUXSERVICE_ERROR); + assertEquals(ContainerState.EXITED_WITH_FAILURE, wc.c.getContainerState()); + } finally { + if (wc != null) { + wc.finished(); + } + } + } + + @Test public void testLaunchAfterKillRequest() throws Exception { WrappedContainer wc = null; try {