diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 0801ad052db..062101f5a22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.service.api.ServiceApiConstants; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ConfigFile; @@ -80,6 +81,8 @@ import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils; import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.util.BoundedAppender; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,7 +105,8 @@ import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.registry.client.api.RegistryConstants.*; -import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_AFTER_APP_COMPLETION; +import static org.apache.hadoop.yarn.api.records.ContainerExitStatus + .KILLED_AFTER_APP_COMPLETION; import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*; import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes @@ -137,6 +141,8 @@ private ServiceTimelinePublisher serviceTimelinePublisher; + private boolean timelineServiceEnabled; + // Global diagnostics that will be reported to RM on eRxit. // The unit the number of characters. This will be limited to 64 * 1024 // characters. @@ -169,6 +175,8 @@ private volatile FinalApplicationStatus finalApplicationStatus = FinalApplicationStatus.ENDED; + private Clock systemClock; + // For unit test override since we don't want to terminate UT process. private ServiceUtils.ProcessTerminationHandler terminationHandler = new ServiceUtils.ProcessTerminationHandler(); @@ -176,6 +184,8 @@ public ServiceScheduler(ServiceContext context) { super(context.getService().getName()); this.context = context; + this.app = context.getService(); + this.systemClock = SystemClock.getInstance(); } public void buildInstance(ServiceContext context, Configuration configuration) @@ -253,6 +263,11 @@ public void buildInstance(ServiceContext context, Configuration configuration) YarnServiceConf.CONTAINER_RECOVERY_TIMEOUT_MS, YarnServiceConf.DEFAULT_CONTAINER_RECOVERY_TIMEOUT_MS, app.getConfiguration(), getConfig()); + + if (YarnConfiguration + .timelineServiceV2Enabled(getConfig())) { + timelineServiceEnabled = true; + } } protected YarnRegistryViewForProviders createYarnRegistryOperations( @@ -300,18 +315,32 @@ public void serviceStop() throws Exception { // only stop the entire service when a graceful stop has been initiated // (e.g. via client RPC, not through the AM receiving a SIGTERM) if (gracefulStop) { + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - // mark component-instances/containers as STOPPED - for (ContainerId containerId : getLiveInstances().keySet()) { - serviceTimelinePublisher.componentInstanceFinished(containerId, - KILLED_AFTER_APP_COMPLETION, diagnostics.toString()); + + // mark other component-instances/containers as STOPPED + final Map liveInstances = + getLiveInstances(); + for (Map.Entry instance : liveInstances + .entrySet()) { + if (!ComponentInstance.isFinalState(instance.getValue() + .getContainerSpec() + .getState())) { + serviceTimelinePublisher + .componentInstanceFinished(instance.getKey(), + KILLED_AFTER_APP_COMPLETION, ContainerState.STOPPED, + getDiagnostics().toString()); + } } + // mark attempt as unregistered serviceTimelinePublisher - .serviceAttemptUnregistered(context, diagnostics.toString()); + .serviceAttemptUnregistered(context, finalApplicationStatus, diagnostics.toString()); } + + // unregister AM - amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED, + amRMClient.unregisterApplicationMaster(finalApplicationStatus, diagnostics.toString(), ""); LOG.info("Service {} unregistered with RM, with attemptId = {} " + ", diagnostics = {} ", app.getName(), context.attemptId, diagnostics); @@ -911,7 +940,22 @@ public synchronized void terminateServiceIfAllComponentsFinished() { for (Component comp : getAllComponents().values()) { ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler(); - if (!restartPolicy.shouldTerminate(comp)) { + + if (restartPolicy.shouldTerminate(comp)) { + if (restartPolicy.hasCompletedSuccessfully(comp)) { + comp.getComponentSpec().setState(org.apache.hadoop + .yarn.service.api.records.ComponentState.SUCCEEDED); + } else { + comp.getComponentSpec().setState(org.apache.hadoop + .yarn.service.api.records.ComponentState.FAILED); + } + + if (timelineServiceEnabled) { + // record in ATS + serviceTimelinePublisher.componentFinished(comp.getComponentSpec(), + comp.getComponentSpec().getState(), systemClock.getTime()); + } + } else { shouldTerminate = false; break; } @@ -935,16 +979,24 @@ public synchronized void terminateServiceIfAllComponentsFinished() { LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils .join(failedComponents, ",") + "]"); + int exitStatus = EXIT_SUCCESS; if (failedComponents.isEmpty()) { setGracefulStop(FinalApplicationStatus.SUCCEEDED); - getTerminationHandler().terminate(EXIT_SUCCESS); + app.setState(ServiceState.SUCCEEDED); } else{ setGracefulStop(FinalApplicationStatus.FAILED); - getTerminationHandler().terminate(EXIT_FALSE); + app.setState(ServiceState.FAILED); + exitStatus = EXIT_FALSE; } + + getTerminationHandler().terminate(exitStatus); } } + public Clock getSystemClock() { + return systemClock; + } + public ServiceUtils.ProcessTerminationHandler getTerminationHandler() { return terminationHandler; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java index 3e7ed11a257..472f3749f70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java @@ -26,5 +26,5 @@ @InterfaceStability.Unstable @ApiModel(description = "The current state of a component.") public enum ComponentState { - FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING; + FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING, SUCCEEDED, FAILED; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java index 6e390737e70..cac527a5482 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java @@ -26,5 +26,6 @@ @InterfaceAudience.Public @InterfaceStability.Unstable public enum ContainerState { - RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING; + RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING, SUCCEEDED, + FAILED; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index b6ae38bdeee..f2599c51e5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -30,5 +30,6 @@ @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") public enum ServiceState { ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING, + SUCCEEDED, UPGRADING_AUTO_FINALIZE; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 41a2fcd104b..e750d188d32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -610,6 +610,7 @@ private void assignContainerToCompInstance(Container container) { "[COMPONENT {}]: {} allocated, num pending component instances reduced to {}", getName(), container.getId(), pendingInstances.size()); instance.setContainer(container); + scheduler.addLiveCompInstance(container.getId(), instance); LOG.info( "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 11a6caa901d..7e70aa10559 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -41,7 +42,9 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.api.records.Artifact; +import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; @@ -68,6 +71,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import static org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes.*; + +import static org.apache.hadoop.yarn.api.records.ContainerExitStatus + .KILLED_AFTER_APP_COMPLETION; import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_BY_APPMASTER; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState.*; @@ -242,15 +248,22 @@ public void transition(ComponentInstance compInstance, } @VisibleForTesting - static void handleComponentInstanceRelaunch( - ComponentInstance compInstance, ComponentInstanceEvent event, - boolean failureBeforeLaunch) { + static void handleComponentInstanceRelaunch(ComponentInstance compInstance, + ComponentInstanceEvent event, boolean failureBeforeLaunch, + String containerDiag) { Component comp = compInstance.getComponent(); // Do we need to relaunch the service? - boolean hasContainerFailed = hasContainerFailed(event.getStatus()); + boolean hasContainerFailed = failureBeforeLaunch || hasContainerFailed(event + .getStatus()); ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler(); + ContainerState containerState = hasContainerFailed ? + ContainerState.FAILED : ContainerState.SUCCEEDED; + + if (compInstance.getContainerSpec() != null) { + compInstance.getContainerSpec().setState(containerState); + } if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) { // re-ask the failed container. @@ -265,11 +278,20 @@ static void handleComponentInstanceRelaunch( builder.append(", diagnostics="); builder.append(failureBeforeLaunch ? FAILED_BEFORE_LAUNCH_DIAG : event.getStatus().getDiagnostics()); - if (event.getStatus().getExitStatus() != 0) { + if (event.getStatus() != null && event.getStatus().getExitStatus() != 0) { LOG.error(builder.toString()); - } else { + } else{ LOG.info(builder.toString()); } + + if (compInstance.timelineServiceEnabled) { + // record in ATS + compInstance.serviceTimelinePublisher + .componentInstanceFinished(event.getContainerId(), + event.getStatus().getExitStatus(), containerState, + containerDiag); + } + } else { // When no relaunch, update component's #succeeded/#failed // instances. @@ -278,17 +300,27 @@ static void handleComponentInstanceRelaunch( } else { comp.markAsSucceeded(compInstance); } + + if (compInstance.timelineServiceEnabled) { + // record in ATS + compInstance.serviceTimelinePublisher + .componentInstanceFinished(event.getContainerId(), + event.getStatus().getExitStatus(), containerState, containerDiag); + } + LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ? " succeeded" : " failed") + " without retry, exitStatus=" + event.getStatus()); comp.getScheduler().terminateServiceIfAllComponentsFinished(); } + + } public static boolean hasContainerFailed(ContainerStatus containerStatus) { //Mark conainer as failed if we cant get its exit status i.e null? - return containerStatus == null || containerStatus.getExitStatus() != - ContainerExitStatus.SUCCESS; + return containerStatus == null || containerStatus + .getExitStatus() != ContainerExitStatus.SUCCESS; } private static class ContainerStoppedTransition extends BaseTransition { @@ -340,6 +372,40 @@ public void transition(ComponentInstance compInstance, scheduler.getDiagnostics().append(containerDiag); scheduler.getDiagnostics().append(exitDiag); LOG.warn(exitDiag); + + compInstance.getContainerSpec().setState(ContainerState.FAILED); + comp.getComponentSpec().setState(ComponentState.FAILED); + comp.getScheduler().getApp().setState(ServiceState.FAILED); + + if (compInstance.timelineServiceEnabled) { + // record in ATS + compInstance.scheduler.getServiceTimelinePublisher() + .componentInstanceFinished(compInstance.getContainer().getId(), + failedBeforeLaunching ? -1 : event.getStatus() + .getExitStatus(), ContainerState.FAILED, containerDiag); + + // mark other component-instances/containers as STOPPED + for (ContainerId containerId : scheduler.getLiveInstances().keySet()) { + if (!compInstance.container.getId().equals(containerId) && + !isFinalState(compInstance.getContainerSpec().getState())) { + compInstance.getContainerSpec().setState(ContainerState.STOPPED); + compInstance.scheduler.getServiceTimelinePublisher() + .componentInstanceFinished(containerId, + KILLED_AFTER_APP_COMPLETION, ContainerState.STOPPED, + scheduler.getDiagnostics().toString()); + } + } + + compInstance.scheduler.getServiceTimelinePublisher() + .componentFinished(comp.getComponentSpec(), ComponentState.FAILED, + scheduler.getSystemClock().getTime()); + + compInstance.scheduler.getServiceTimelinePublisher() + .serviceAttemptUnregistered(comp.getContext(), + FinalApplicationStatus.FAILED, + scheduler.getDiagnostics().toString()); + } + shouldFailService = true; } @@ -351,30 +417,28 @@ public void transition(ComponentInstance compInstance, // so no need remove. compInstance.scheduler.executorService .submit(() -> compInstance.cleanupRegistry(event.getContainerId())); - - if (compInstance.timelineServiceEnabled) { - // record in ATS - compInstance.serviceTimelinePublisher - .componentInstanceFinished(event.getContainerId(), - event.getStatus().getExitStatus(), containerDiag); - } - compInstance.containerSpec.setState(ContainerState.STOPPED); } // remove the failed ContainerId -> CompInstance mapping scheduler.removeLiveCompInstance(event.getContainerId()); - // According to component restart policy, handle container restart - // or finish the service (if all components finished) - handleComponentInstanceRelaunch(compInstance, event, - failedBeforeLaunching); - if (shouldFailService) { scheduler.getTerminationHandler().terminate(-1); + } else { + // According to component restart policy, handle container restart + // or finish the service (if all components finished) + handleComponentInstanceRelaunch(compInstance, event, + failedBeforeLaunching, containerDiag); } } } + public static boolean isFinalState(ContainerState state) { + return ContainerState.FAILED.equals(state) || ContainerState.STOPPED + .equals + (state) || ContainerState.SUCCEEDED.equals(state); + } + private static class ContainerUpgradeTransition extends BaseTransition { @Override @@ -581,7 +645,7 @@ public void destroy() { if (timelineServiceEnabled) { serviceTimelinePublisher.componentInstanceFinished(containerId, - KILLED_BY_APPMASTER, diagnostics.toString()); + KILLED_BY_APPMASTER, ContainerState.STOPPED, diagnostics.toString()); } cancelContainerStatusRetriever(); scheduler.executorService.submit(() -> diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java index 6c3428a748d..832dad729ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java @@ -32,5 +32,8 @@ COMPONENT_INSTANCE_IP_HOST_UPDATE, - COMPONENT_INSTANCE_BECOME_READY + COMPONENT_INSTANCE_BECOME_READY, + + COMPONENT_FINISHED + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java index 6c73ebb8d67..1e900af9f80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java @@ -28,6 +28,9 @@ import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.api.records.*; +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.component.*; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.slf4j.Logger; @@ -130,12 +133,11 @@ public void serviceAttemptUpdated(Service service) { } public void serviceAttemptUnregistered(ServiceContext context, - String diagnostics) { + FinalApplicationStatus status, String diagnostics) { TimelineEntity entity = createServiceAttemptEntity( context.attemptId.getApplicationId().toString()); Map entityInfos = new HashMap(); - entityInfos.put(ServiceTimelineMetricsConstants.STATE, - FinalApplicationStatus.ENDED); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, status); entityInfos.put(DIAGNOSTICS_INFO, diagnostics); entity.addInfo(entityInfos); @@ -180,7 +182,7 @@ public void componentInstanceStarted(Container container, } public void componentInstanceFinished(ContainerId containerId, - int exitCode, String diagnostics) { + int exitCode, ContainerState state, String diagnostics) { TimelineEntity entity = createComponentInstanceEntity( containerId.toString()); @@ -189,7 +191,7 @@ public void componentInstanceFinished(ContainerId containerId, entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE, exitCode); entityInfos.put(DIAGNOSTICS_INFO, diagnostics); - entityInfos.put(ServiceTimelineMetricsConstants.STATE, STOPPED); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, state); entity.addInfo(entityInfos); // add an event @@ -375,4 +377,25 @@ private void putEntity(TimelineEntity entity) { log.error("Error when publishing entity " + entity, e); } } + + public void componentFinished( + Component comp, + ComponentState state, long finishTime) { + createComponentEntity(comp.getName()); + TimelineEntity entity = createComponentEntity(comp.getName()); + + // create info keys + Map entityInfos = new HashMap(); + entityInfos.put(ServiceTimelineMetricsConstants.STATE, state); + entity.addInfo(entityInfos); + + // add an event + TimelineEvent startEvent = new TimelineEvent(); + startEvent + .setId(ServiceTimelineEvent.COMPONENT_FINISHED.toString()); + startEvent.setTimestamp(finishTime); + entity.addEvent(startEvent); + + putEntity(entity); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java index 89888c5cf97..091a0c789e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.mockito.stubbing.Answer; import java.io.IOException; @@ -92,7 +94,18 @@ public NMClientAsync createNMClient() { public ContainerLaunchService getContainerLaunchService() { return mockLaunchService; } + + @Override public ServiceUtils.ProcessTerminationHandler + getTerminationHandler() { + return new + ServiceUtils.ProcessTerminationHandler() { + public void terminate(int exitCode) { + } + }; + } }; + + this.scheduler.init(fsWatcher.getConf()); ServiceTestUtils.createServiceManager(this); @@ -116,8 +129,10 @@ private void stabilizeComponents(ServiceContext context) { Component component = new org.apache.hadoop.yarn.service.component. Component(componentSpec, 1L, context); componentState.put(component.getName(), component); - component.handle(new ComponentEvent(component.getName(), - ComponentEventType.FLEX)); + component.handle( + new ComponentEvent(component.getName(), ComponentEventType.FLEX) + .setDesired( + component.getComponentSpec().getNumberOfContainers())); for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) { counter++; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 170c20b84b9..00eb4dfd5a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -119,14 +119,10 @@ public static Service createTerminatingJobExample(String serviceName) { Component.RestartPolicyEnum.NEVER, null)); exampleApp.addComponent( createComponent("terminating-comp2", 2, "sleep 1000", - Component.RestartPolicyEnum.ON_FAILURE, new ArrayList() {{ - add("terminating-comp1"); - }})); + Component.RestartPolicyEnum.ON_FAILURE, null)); exampleApp.addComponent( createComponent("terminating-comp3", 2, "sleep 1000", - Component.RestartPolicyEnum.ON_FAILURE, new ArrayList() {{ - add("terminating-comp2"); - }})); + Component.RestartPolicyEnum.ON_FAILURE, null)); return exampleApp; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index 2e17c7f4a1f..e1a4d9d7553 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.service.TestServiceManager; import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; @@ -147,7 +148,8 @@ public void testContainerCompletedWhenUpgrading() throws Exception { } @Test - public void testComponentStateUpdatesWithTerminatingComponents() throws + public void testComponentStateReachesStableStateWithTerminatingComponents() + throws Exception { final String serviceName = "testComponentStateUpdatesWithTerminatingComponents"; @@ -198,6 +200,57 @@ public void testComponentStateUpdatesWithTerminatingComponents() throws } } + @Test + public void testComponentStateUpdatesWithTerminatingComponents() + throws + Exception { + final String serviceName = + "testComponentStateUpdatesWithTerminatingComponents"; + + Service testService = ServiceTestUtils.createTerminatingJobExample( + serviceName); + TestServiceManager.createDef(serviceName, testService); + + ServiceContext context = new MockRunningServiceContext(rule, testService); + + for (Component comp : context.scheduler.getAllComponents().values()) { + Iterator instanceIter = comp. + getAllComponentInstances().iterator(); + + while (instanceIter.hasNext()) { + + ComponentInstance componentInstance = instanceIter.next(); + Container instanceContainer = componentInstance.getContainer(); + + //stop 1 container + ContainerStatus containerStatus = ContainerStatus.newInstance( + instanceContainer.getId(), + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + "successful", 0); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus) + .setContainerId(instanceContainer.getId())); + componentInstance.handle( + new ComponentInstanceEvent(componentInstance.getContainer().getId(), + ComponentInstanceEventType.STOP).setStatus(containerStatus)); + } + + ComponentState componentState = + comp.getComponentSpec().getState(); + Assert.assertEquals( + ComponentState.SUCCEEDED, + componentState); + } + + ServiceState serviceState = + testService.getState(); + Assert.assertEquals( + ServiceState.SUCCEEDED, + serviceState); + } + + + private static org.apache.hadoop.yarn.service.api.records.Component createSpecWithEnv(String serviceName, String compName, String key, String val) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index f4289047f98..c3965daad98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -204,6 +204,8 @@ private ComponentInstance createComponentInstance(Component component, when(componentInstance.getComponent()).thenReturn(component); when(componentInstance.getCompInstanceName()).thenReturn( "compInstance" + instanceId); + Container container = mock(Container.class); + when(componentInstance.getContainerSpec()).thenReturn(container); ServiceUtils.ProcessTerminationHandler terminationHandler = mock( ServiceUtils.ProcessTerminationHandler.class); @@ -244,8 +246,9 @@ public void testComponentRestartPolicy() { ComponentInstance componentInstance = comp.getAllComponentInstances().iterator().next(); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); @@ -262,7 +265,7 @@ public void testComponentRestartPolicy() { componentInstance = comp.getAllComponentInstances().iterator().next(); containerStatus.setExitStatus(1); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, times(1)).reInsertPendingInstance( @@ -286,7 +289,7 @@ public void testComponentRestartPolicy() { when(comp.getNumSucceededInstances()).thenReturn(new Long(1)); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, times(1)).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, times(0)).reInsertPendingInstance( @@ -304,7 +307,7 @@ public void testComponentRestartPolicy() { when(comp.getNumFailedInstances()).thenReturn(new Long(1)); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, times(1)).markAsFailed(any(ComponentInstance.class)); @@ -323,7 +326,7 @@ public void testComponentRestartPolicy() { componentInstance = comp.getAllComponentInstances().iterator().next(); containerStatus.setExitStatus(1); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); verify(comp, times(1)).reInsertPendingInstance( @@ -340,7 +343,7 @@ public void testComponentRestartPolicy() { componentInstance = comp.getAllComponentInstances().iterator().next(); containerStatus.setExitStatus(1); ComponentInstance.handleComponentInstanceRelaunch(componentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, times(1)).markAsFailed(any(ComponentInstance.class)); verify(comp, times(0)).reInsertPendingInstance( @@ -363,7 +366,7 @@ public void testComponentRestartPolicy() { containerStatus.setExitStatus(1); ComponentInstance commponentInstance = iter.next(); ComponentInstance.handleComponentInstanceRelaunch(commponentInstance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); verify(comp, never()).markAsFailed(any(ComponentInstance.class)); @@ -404,7 +407,7 @@ public void testComponentRestartPolicy() { when(component2Instance.getComponent().getNumFailedInstances()) .thenReturn(new Long(failed2Instances.size())); ComponentInstance.handleComponentInstanceRelaunch(component2Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); } Map failed1Instances = new HashMap<>(); @@ -418,7 +421,7 @@ public void testComponentRestartPolicy() { when(component1Instance.getComponent().getNumFailedInstances()) .thenReturn(new Long(failed1Instances.size())); ComponentInstance.handleComponentInstanceRelaunch(component1Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); } verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); @@ -458,7 +461,7 @@ public void testComponentRestartPolicy() { when(component2Instance.getComponent().getNumSucceededInstances()) .thenReturn(new Long(succeeded2Instances.size())); ComponentInstance.handleComponentInstanceRelaunch(component2Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); } Map succeeded1Instances = new HashMap<>(); @@ -471,7 +474,7 @@ public void testComponentRestartPolicy() { when(component1Instance.getComponent().getNumSucceededInstances()) .thenReturn(new Long(succeeded1Instances.size())); ComponentInstance.handleComponentInstanceRelaunch(component1Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); } verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class)); @@ -500,7 +503,7 @@ public void testComponentRestartPolicy() { for (ComponentInstance component2Instance : component2Instances) { ComponentInstance.handleComponentInstanceRelaunch(component2Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); } succeeded1Instances = new HashMap<>(); @@ -511,7 +514,7 @@ public void testComponentRestartPolicy() { when(component1Instance.getComponent().getSucceededInstances()) .thenReturn(succeeded1Instances.values()); ComponentInstance.handleComponentInstanceRelaunch(component1Instance, - componentInstanceEvent, false); + componentInstanceEvent, false, null); } verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java index cff7229db34..a77e6c8d317 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/timelineservice/TestServiceTimelinePublisher.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier; import org.apache.hadoop.yarn.client.api.TimelineV2Client; @@ -122,7 +123,8 @@ public void testServiceAttemptEntity() { context.attemptId = ApplicationAttemptId .newInstance(ApplicationId.fromString(service.getId()), 1); String exitDiags = "service killed"; - serviceTimelinePublisher.serviceAttemptUnregistered(context, exitDiags); + serviceTimelinePublisher.serviceAttemptUnregistered(context, + FinalApplicationStatus.ENDED, exitDiags); lastPublishedEntities = ((DummyTimelineClient) timelineClient).getLastPublishedEntities(); for (TimelineEntity timelineEntity : lastPublishedEntities) {