diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index cfaf3567b61..e158d9cdf1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -59,10 +59,12 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.service.api.ServiceApiConstants; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ConfigFile; import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy; +import org.apache.hadoop.yarn.service.component.ComponentState; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; @@ -80,6 +82,8 @@ import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils; import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.util.BoundedAppender; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,6 +141,8 @@ private ServiceTimelinePublisher serviceTimelinePublisher; + private boolean timelineServiceEnabled; + // Global diagnostics that will be reported to RM on eRxit. // The unit the number of characters. This will be limited to 64 * 1024 // characters. @@ -169,6 +175,9 @@ private volatile FinalApplicationStatus finalApplicationStatus = FinalApplicationStatus.ENDED; + private Clock systemClock; + + // For unit test override since we don't want to terminate UT process. private ServiceUtils.ProcessTerminationHandler terminationHandler = new ServiceUtils.ProcessTerminationHandler(); @@ -176,6 +185,8 @@ public ServiceScheduler(ServiceContext context) { super(context.getService().getName()); this.context = context; + this.app = context.getService(); + this.systemClock = SystemClock.getInstance(); } public void buildInstance(ServiceContext context, Configuration configuration) @@ -253,6 +264,11 @@ public void buildInstance(ServiceContext context, Configuration configuration) YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS, YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS, app.getConfiguration(), getConfig()); + + if (YarnConfiguration + .timelineServiceV2Enabled(getConfig())) { + timelineServiceEnabled = true; + } } protected YarnRegistryViewForProviders createYarnRegistryOperations( @@ -301,17 +317,12 @@ public void serviceStop() throws Exception { // (e.g. via client RPC, not through the AM receiving a SIGTERM) if (gracefulStop) { if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - // mark component-instances/containers as STOPPED - for (ContainerId containerId : getLiveInstances().keySet()) { - serviceTimelinePublisher.componentInstanceFinished(containerId, - KILLED_AFTER_APP_COMPLETION, diagnostics.toString()); - } // mark attempt as unregistered serviceTimelinePublisher - .serviceAttemptUnregistered(context, diagnostics.toString()); + .serviceAttemptUnregistered(context, finalApplicationStatus, diagnostics.toString()); } // unregister AM - amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED, + amRMClient.unregisterApplicationMaster(finalApplicationStatus, diagnostics.toString(), ""); LOG.info("Service {} unregistered with RM, with attemptId = {} " + ", diagnostics = {} ", app.getName(), context.attemptId, diagnostics); @@ -910,7 +921,22 @@ public synchronized void terminateServiceIfAllComponentsFinished() { for (Component comp : getAllComponents().values()) { ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler(); - if (!restartPolicy.shouldTerminate(comp)) { + + if (restartPolicy.shouldTerminate(comp)) { + if (restartPolicy.hasCompletedSuccessfully(comp)) { + comp.getComponentSpec().setState(org.apache.hadoop + .yarn.service.api.records.ComponentState.SUCCEEDED); + } else { + comp.getComponentSpec().setState(org.apache.hadoop + .yarn.service.api.records.ComponentState.FAILED); + } + + if (timelineServiceEnabled) { + // record in ATS + serviceTimelinePublisher.componentFinished(comp.getComponentSpec(), + comp.getComponentSpec().getState(), systemClock.getTime()); + } + } else { shouldTerminate = false; break; } @@ -934,16 +960,25 @@ public synchronized void terminateServiceIfAllComponentsFinished() { LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils .join(failedComponents, ",") + "]"); + int exitStatus = EXIT_SUCCESS; if (failedComponents.isEmpty()) { setGracefulStop(FinalApplicationStatus.SUCCEEDED); - getTerminationHandler().terminate(EXIT_SUCCESS); + app.setState(ServiceState.SUCCEEDED); } else{ setGracefulStop(FinalApplicationStatus.FAILED); - getTerminationHandler().terminate(EXIT_FALSE); + app.setState(ServiceState.FAILED); + exitStatus = EXIT_FALSE; } + + getTerminationHandler().terminate(exitStatus); } } + + public boolean isTimelineServiceEnabled() { + return timelineServiceEnabled; + } + public ServiceUtils.ProcessTerminationHandler getTerminationHandler() { return terminationHandler; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java index 3e7ed11a257..472f3749f70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java @@ -26,5 +26,5 @@ @InterfaceStability.Unstable @ApiModel(description = "The current state of a component.") public enum ComponentState { - FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING; + FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING, SUCCEEDED, FAILED; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java index 6e390737e70..cac527a5482 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java @@ -26,5 +26,6 @@ @InterfaceAudience.Public @InterfaceStability.Unstable public enum ContainerState { - RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING; + RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING, SUCCEEDED, + FAILED; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index b6ae38bdeee..f2599c51e5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -30,5 +30,6 @@ @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") public enum ServiceState { ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING, + SUCCEEDED, UPGRADING_AUTO_FINALIZE; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index aaa23da4851..f0998953fcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -610,6 +610,7 @@ private void assignContainerToCompInstance(Container container) { "[COMPONENT {}]: {} allocated, num pending component instances reduced to {}", getName(), container.getId(), pendingInstances.size()); instance.setContainer(container); + scheduler.addLiveCompInstance(container.getId(), instance); LOG.info( "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 3499d92a005..86b4b4f50be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -242,15 +242,16 @@ public void transition(ComponentInstance compInstance, } @VisibleForTesting - static void handleComponentInstanceRelaunch( - ComponentInstance compInstance, ComponentInstanceEvent event, - boolean failureBeforeLaunch) { + static void handleComponentInstanceRelaunch(ComponentInstance compInstance, + ComponentInstanceEvent event, boolean failureBeforeLaunch, + String containerDiag) { Component comp = compInstance.getComponent(); // Do we need to relaunch the service? boolean hasContainerFailed = hasContainerFailed(event.getStatus()); ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler(); + ContainerState containerState = ContainerState.STOPPED; if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) { // re-ask the failed container. @@ -264,7 +265,22 @@ static void handleComponentInstanceRelaunch( event.getStatus().getExitStatus(), failureBeforeLaunch ? FAILED_BEFORE_LAUNCH_DIAG : event.getStatus().getDiagnostics()); + + compInstance.getContainerSpec().setState(containerState); + + if (compInstance.timelineServiceEnabled) { + // record in ATS + LOG.info("Publising comp instance state to ATS {} {}", + compInstance.getCompInstanceName(), containerState); + compInstance.serviceTimelinePublisher + .componentInstanceFinished(event.getContainerId(), + event.getStatus().getExitStatus(), containerState, containerDiag); + } } else { + + // record in ATS + containerState = hasContainerFailed ? + ContainerState.FAILED : ContainerState.SUCCEEDED; // When no relaunch, update component's #succeeded/#failed // instances. if (hasContainerFailed) { @@ -272,11 +288,25 @@ static void handleComponentInstanceRelaunch( } else { comp.markAsSucceeded(compInstance); } + + compInstance.getContainerSpec().setState(containerState); + + if (compInstance.timelineServiceEnabled) { + LOG.info("Publising comp instance state to ATS {} {}", + compInstance.getCompInstanceName(), containerState); + // record in ATS + compInstance.serviceTimelinePublisher + .componentInstanceFinished(event.getContainerId(), + event.getStatus().getExitStatus(), containerState, containerDiag); + } + LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ? " succeeded" : " failed") + " without retry, exitStatus=" + event.getStatus()); comp.getScheduler().terminateServiceIfAllComponentsFinished(); } + + } public static boolean hasContainerFailed(ContainerStatus containerStatus) { @@ -345,14 +375,6 @@ public void transition(ComponentInstance compInstance, // so no need remove. compInstance.scheduler.executorService .submit(() -> compInstance.cleanupRegistry(event.getContainerId())); - - if (compInstance.timelineServiceEnabled) { - // record in ATS - compInstance.serviceTimelinePublisher - .componentInstanceFinished(event.getContainerId(), - event.getStatus().getExitStatus(), containerDiag); - } - compInstance.containerSpec.setState(ContainerState.STOPPED); } // remove the failed ContainerId -> CompInstance mapping @@ -361,7 +383,7 @@ public void transition(ComponentInstance compInstance, // According to component restart policy, handle container restart // or finish the service (if all components finished) handleComponentInstanceRelaunch(compInstance, event, - failedBeforeLaunching); + failedBeforeLaunching, containerDiag); if (shouldFailService) { scheduler.getTerminationHandler().terminate(-1); @@ -575,7 +597,7 @@ public void destroy() { if (timelineServiceEnabled) { serviceTimelinePublisher.componentInstanceFinished(containerId, - KILLED_BY_APPMASTER, diagnostics.toString()); + KILLED_BY_APPMASTER, ContainerState.STOPPED, diagnostics.toString()); } cancelContainerStatusRetriever(); scheduler.executorService.submit(() -> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java index 6c3428a748d..832dad729ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java @@ -32,5 +32,8 @@ COMPONENT_INSTANCE_IP_HOST_UPDATE, - COMPONENT_INSTANCE_BECOME_READY + COMPONENT_INSTANCE_BECOME_READY, + + COMPONENT_FINISHED + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java index 6c73ebb8d67..1e900af9f80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java @@ -28,6 +28,9 @@ import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.api.records.*; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.component.*; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.slf4j.Logger; @@ -130,12 +133,11 @@ public void serviceAttemptUpdated(Service service) { } public void serviceAttemptUnregistered(ServiceContext context, - String diagnostics) { + FinalApplicationStatus status, String diagnostics) { TimelineEntity entity = createServiceAttemptEntity( context.attemptId.getApplicationId().toString()); Map entityInfos = new HashMap(); - entityInfos.put(ServiceTimelineMetricsConstants.STATE, - FinalApplicationStatus.ENDED); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, status); entityInfos.put(DIAGNOSTICS_INFO, diagnostics); entity.addInfo(entityInfos); @@ -180,7 +182,7 @@ public void componentInstanceStarted(Container container, } public void componentInstanceFinished(ContainerId containerId, - int exitCode, String diagnostics) { + int exitCode, ContainerState state, String diagnostics) { TimelineEntity entity = createComponentInstanceEntity( containerId.toString()); @@ -189,7 +191,7 @@ public void componentInstanceFinished(ContainerId containerId, entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE, exitCode); entityInfos.put(DIAGNOSTICS_INFO, diagnostics); - entityInfos.put(ServiceTimelineMetricsConstants.STATE, STOPPED); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, state); entity.addInfo(entityInfos); // add an event @@ -375,4 +377,25 @@ private void putEntity(TimelineEntity entity) { log.error("Error when publishing entity " + entity, e); } } + + public void componentFinished( + Component comp, + ComponentState state, long finishTime) { + createComponentEntity(comp.getName()); + TimelineEntity entity = createComponentEntity(comp.getName()); + + // create info keys + Map entityInfos = new HashMap(); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, state); + entity.addInfo(entityInfos); + + // add an event + TimelineEvent startEvent = new TimelineEvent(); + startEvent + .setId(ServiceTimelineEvent.COMPONENT_FINISHED.toString()); + startEvent.setTimestamp(finishTime); + entity.addEvent(startEvent); + + putEntity(entity); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java index 89888c5cf97..091a0c789e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.mockito.stubbing.Answer; import java.io.IOException; @@ -92,7 +94,18 @@ public NMClientAsync createNMClient() { public ContainerLaunchService getContainerLaunchService() { return mockLaunchService; } + + @Override public ServiceUtils.ProcessTerminationHandler + getTerminationHandler() { + return new + ServiceUtils.ProcessTerminationHandler() { + public void terminate(int exitCode) { + } + }; + } }; + + this.scheduler.init(fsWatcher.getConf()); ServiceTestUtils.createServiceManager(this); @@ -116,8 +129,10 @@ private void stabilizeComponents(ServiceContext context) { Component component = new org.apache.hadoop.yarn.service.component. Component(componentSpec, 1L, context); componentState.put(component.getName(), component); - component.handle(new ComponentEvent(component.getName(), - ComponentEventType.FLEX)); + component.handle( + new ComponentEvent(component.getName(), ComponentEventType.FLEX) + .setDesired( + component.getComponentSpec().getNumberOfContainers())); for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) { counter++; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 170c20b84b9..00eb4dfd5a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -119,14 +119,10 @@ public static Service createTerminatingJobExample(String serviceName) { Component.RestartPolicyEnum.NEVER, null)); exampleApp.addComponent( createComponent("terminating-comp2", 2, "sleep 1000", - Component.RestartPolicyEnum.ON_FAILURE, new ArrayList() {{ - add("terminating-comp1"); - }})); + Component.RestartPolicyEnum.ON_FAILURE, null)); exampleApp.addComponent( createComponent("terminating-comp3", 2, "sleep 1000", - Component.RestartPolicyEnum.ON_FAILURE, new ArrayList() {{ - add("terminating-comp2"); - }})); + Component.RestartPolicyEnum.ON_FAILURE, null)); return exampleApp; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index 2e17c7f4a1f..e1a4d9d7553 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.service.TestServiceManager; import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; @@ -147,7 +148,8 @@ public void testContainerCompletedWhenUpgrading() throws Exception { } @Test - public void testComponentStateUpdatesWithTerminatingComponents() throws + public void testComponentStateReachesStableStateWithTerminatingComponents() + throws Exception { final String serviceName = "testComponentStateUpdatesWithTerminatingComponents"; @@ -198,6 +200,57 @@ public void testComponentStateUpdatesWithTerminatingComponents() throws } } + @Test + public void testComponentStateUpdatesWithTerminatingComponents() + throws + Exception { + final String serviceName = + "testComponentStateUpdatesWithTerminatingComponents"; + + Service testService = ServiceTestUtils.createTerminatingJobExample( + serviceName); + TestServiceManager.createDef(serviceName, testService); + + ServiceContext context = new MockRunningServiceContext(rule, testService); + + for (Component comp : context.scheduler.getAllComponents().values()) { + Iterator instanceIter = comp. + getAllComponentInstances().iterator(); + + while (instanceIter.hasNext()) { + + ComponentInstance componentInstance = instanceIter.next(); + Container instanceContainer = componentInstance.getContainer(); + + //stop 1 container + ContainerStatus containerStatus = ContainerStatus.newInstance( + instanceContainer.getId(), + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + "successful", 0); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus) + .setContainerId(instanceContainer.getId())); + componentInstance.handle( + new ComponentInstanceEvent(componentInstance.getContainer().getId(), + ComponentInstanceEventType.STOP).setStatus(containerStatus)); + } + + ComponentState componentState = + comp.getComponentSpec().getState(); + Assert.assertEquals( + ComponentState.SUCCEEDED, + componentState); + } + + ServiceState serviceState = + testService.getState(); + Assert.assertEquals( + ServiceState.SUCCEEDED, + serviceState); + } + + + private static org.apache.hadoop.yarn.service.api.records.Component createSpecWithEnv(String serviceName, String compName, String key, String val) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index bb480ba80bd..5c748b37788 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -204,6 +204,8 @@ private ComponentInstance createComponentInstance(Component component, when(componentInstance.getComponent()).thenReturn(component); when(componentInstance.getCompInstanceName()).thenReturn( "compInstance" + instanceId); + Container container = mock(Container.class); + when(componentInstance.getContainerSpec()).thenReturn(container); ServiceUtils.ProcessTerminationHandler terminationHandler = mock( ServiceUtils.ProcessTerminationHandler.class); @@ -244,8 +246,9 @@ public void testComponentRestartPolicy() { ComponentInstance componentInstance = comp.getAllComponentInstances().iterator().next(); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); @@ -262,7 +265,7 @@ public void testComponentRestartPolicy() { componentInstance = comp.getAllComponentInstances().iterator().next(); containerStatus.setExitStatus(1); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, times(1)).reInsertPendingInstance( @@ -286,7 +289,7 @@ public void testComponentRestartPolicy() { when(comp.getNumSucceededInstances()).thenReturn(new Long(1)); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, times(1)).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, times(0)).reInsertPendingInstance( @@ -304,7 +307,7 @@ public void testComponentRestartPolicy() { when(comp.getNumFailedInstances()).thenReturn(new Long(1)); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, times(1)).markAsFailed(any(ComponentInstance.class)); @@ -323,7 +326,7 @@ public void testComponentRestartPolicy() { componentInstance = comp.getAllComponentInstances().iterator().next(); containerStatus.setExitStatus(1); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, times(1)).reInsertPendingInstance( @@ -340,7 +343,7 @@ public void testComponentRestartPolicy() { componentInstance = comp.getAllComponentInstances().iterator().next(); containerStatus.setExitStatus(1); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, times(1)).markAsFailed(any(ComponentInstance.class)); verify(comp, times(0)).reInsertPendingInstance( @@ -363,7 +366,7 @@ public void testComponentRestartPolicy() { containerStatus.setExitStatus(1); ComponentInstance commponentInstance = iter.next(); ComponentInstance.handleComponentInstanceRelaunch(commponentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); @@ -404,7 +407,7 @@ public void testComponentRestartPolicy() { when(component2Instance.getComponent().getNumFailedInstances()) .thenReturn(new Long(failed2Instances.size())); ComponentInstance.handleComponentInstanceRelaunch(component2Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); } Map failed1Instances = new HashMap<>(); @@ -418,7 +421,7 @@ public void testComponentRestartPolicy() { when(component1Instance.getComponent().getNumFailedInstances()) .thenReturn(new Long(failed1Instances.size())); ComponentInstance.handleComponentInstanceRelaunch(component1Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); } verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); @@ -458,7 +461,7 @@ public void testComponentRestartPolicy() { when(component2Instance.getComponent().getNumSucceededInstances()) .thenReturn(new Long(succeeded2Instances.size())); ComponentInstance.handleComponentInstanceRelaunch(component2Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); } Map succeeded1Instances = new HashMap<>(); @@ -471,7 +474,7 @@ public void testComponentRestartPolicy() { when(component1Instance.getComponent().getNumSucceededInstances()) .thenReturn(new Long(succeeded1Instances.size())); ComponentInstance.handleComponentInstanceRelaunch(component1Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); } verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class)); @@ -500,7 +503,7 @@ public void testComponentRestartPolicy() { for (ComponentInstance component2Instance : component2Instances) { ComponentInstance.handleComponentInstanceRelaunch(component2Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); } succeeded1Instances = new HashMap<>(); @@ -511,7 +514,7 @@ public void testComponentRestartPolicy() { when(component1Instance.getComponent().getSucceededInstances()) .thenReturn(succeeded1Instances.values()); ComponentInstance.handleComponentInstanceRelaunch(component1Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); } verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java index cff7229db34..a77e6c8d317 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; import org.apache.hadoop.yarn.client.api.TimelineV2Client; @@ -122,7 +123,8 @@ public void testServiceAttemptEntity() { context.attemptId = ApplicationAttemptId .newInstance(ApplicationId.fromString(service.getId()), 1); String exitDiags = "service killed"; - serviceTimelinePublisher.serviceAttemptUnregistered(context, exitDiags); + serviceTimelinePublisher.serviceAttemptUnregistered(context, + FinalApplicationStatus.ENDED, exitDiags); lastPublishedEntities = ((DummyTimelineClient) timelineClient).getLastPublishedEntities(); for (TimelineEntity timelineEntity : lastPublishedEntities) {