diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index dabff2c..f988c49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -116,6 +116,8 @@ import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*; import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START; +import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants + .CONTAINER_STATE_REPORT_AS_SERVICE_STATE; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes .EXIT_FALSE; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes @@ -946,13 +948,53 @@ public boolean hasAtLeastOnePlacementConstraint() { return hasAtLeastOnePlacementConstraint; } + public boolean terminateServiceIfNeeded(Component component) { + boolean serviceIsTerminated = + terminateServiceIfServiceStateComponentsFinished(component) || + terminateServiceIfAllComponentsFinished(); + return serviceIsTerminated; + } + + /** + * If the service state component is finished, The service will be terminated. + * @param component + */ + public boolean terminateServiceIfServiceStateComponentsFinished(Component + component) { + boolean shouldTerminate = false; + boolean componentIsDominant = component.getComponentSpec() + .getConfiguration().getPropertyBool( + CONTAINER_STATE_REPORT_AS_SERVICE_STATE, false); + if (componentIsDominant) { + ComponentRestartPolicy restartPolicy = + component.getRestartPolicyHandler(); + if (restartPolicy.shouldTerminate(component)) { + shouldTerminate = true; + boolean isSucceeded = restartPolicy.hasCompletedSuccessfully(component); + org.apache.hadoop.yarn.service.api.records.ComponentState state + = isSucceeded ? + org.apache.hadoop.yarn.service.api.records.ComponentState.SUCCEEDED + : org.apache.hadoop.yarn.service.api.records.ComponentState.FAILED; + LOG.info("{} Component state changed from {} to {}", + component.getName(), component.getComponentSpec().getState(), + state); + component.getComponentSpec().setState(state); + LOG.info("Dominate component {} finished, exiting Service Master... " + + ", final status=" + (isSucceeded ? "Succeeded" : "Failed"), + component.getName()); + terminateService(isSucceeded); + } + } + return shouldTerminate; + } + /* -* Check if all components of the scheduler finished. -* If all components finished -* (which #failed-instances + #suceeded-instances = #total-n-containers) -* The service will be terminated. -*/ - public void terminateServiceIfAllComponentsFinished() { + * Check if all components of the scheduler finished. + * If all components finished + * (which #failed-instances + #suceeded-instances = #total-n-containers) + * The service will be terminated. + */ + public boolean terminateServiceIfAllComponentsFinished() { boolean shouldTerminate = true; // Succeeded comps and failed comps, for logging purposes. @@ -964,19 +1006,19 @@ public void terminateServiceIfAllComponentsFinished() { if (restartPolicy.shouldTerminate(comp)) { if (restartPolicy.hasCompletedSuccessfully(comp)) { - comp.getComponentSpec().setState(org.apache.hadoop - .yarn.service.api.records.ComponentState.SUCCEEDED); LOG.info("{} Component state changed from {} to {}", comp.getName(), comp.getComponentSpec().getState(), org.apache.hadoop .yarn.service.api.records.ComponentState.SUCCEEDED); - } else { comp.getComponentSpec().setState(org.apache.hadoop - .yarn.service.api.records.ComponentState.FAILED); + .yarn.service.api.records.ComponentState.SUCCEEDED); + } else { LOG.info("{} Component state changed from {} to {}", comp.getName(), comp.getComponentSpec().getState(), org.apache.hadoop .yarn.service.api.records.ComponentState.FAILED); + comp.getComponentSpec().setState(org.apache.hadoop + .yarn.service.api.records.ComponentState.FAILED); } if (isTimelineServiceEnabled()) { @@ -1008,18 +1050,23 @@ public void terminateServiceIfAllComponentsFinished() { LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils .join(failedComponents, ",") + "]"); - int exitStatus = EXIT_SUCCESS; - if (failedComponents.isEmpty()) { - setGracefulStop(FinalApplicationStatus.SUCCEEDED); - app.setState(ServiceState.SUCCEEDED); - } else { - setGracefulStop(FinalApplicationStatus.FAILED); - app.setState(ServiceState.FAILED); - exitStatus = EXIT_FALSE; - } + terminateService(failedComponents.isEmpty()); + } + return shouldTerminate; + } - getTerminationHandler().terminate(exitStatus); + private void terminateService(boolean isSucceeded) { + int exitStatus = EXIT_SUCCESS; + if (isSucceeded) { + setGracefulStop(FinalApplicationStatus.SUCCEEDED); + app.setState(ServiceState.SUCCEEDED); + } else { + setGracefulStop(FinalApplicationStatus.FAILED); + app.setState(ServiceState.FAILED); + exitStatus = EXIT_FALSE; } + + getTerminationHandler().terminate(exitStatus); } public Clock getSystemClock() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index ef844a5..6c0c022 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -364,7 +364,7 @@ static void handleComponentInstanceRelaunch(ComponentInstance compInstance, LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ? " succeeded" : " failed") + " without retry, exitStatus=" + event.getStatus()); - comp.getScheduler().terminateServiceIfAllComponentsFinished(); + comp.getScheduler().terminateServiceIfNeeded(comp); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java index d081606..05135fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java @@ -95,4 +95,6 @@ String PRINCIPAL = "yarn.service.am.principal"; String UPGRADE_DIR = "upgrade"; + String CONTAINER_STATE_REPORT_AS_SERVICE_STATE = + "yarn.service.container-state-report-as-service-state"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index b4859af..84301f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -74,6 +74,9 @@ import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_ENABLED; import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.AM_RESOURCE_MEM; import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH; + +import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants + .CONTAINER_STATE_REPORT_AS_SERVICE_STATE; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -126,6 +129,23 @@ public static Service createTerminatingJobExample(String serviceName) { return exampleApp; } + public static Service createTerminatingServiceStateComponentJobExample( + String serviceName) { + Service exampleApp = new Service(); + exampleApp.setName(serviceName); + exampleApp.setVersion("v1"); + Component serviceStateComponent = createComponent("terminating-comp1", 2, + "sleep 1000", Component.RestartPolicyEnum.NEVER, null); + serviceStateComponent.getConfiguration().setProperty( + CONTAINER_STATE_REPORT_AS_SERVICE_STATE, "true"); + exampleApp.addComponent(serviceStateComponent); + exampleApp.addComponent( + createComponent("terminating-comp2", 2, "sleep 60000", + Component.RestartPolicyEnum.ON_FAILURE, null)); + + return exampleApp; + } + public static Component createComponent(String name) { return createComponent(name, 2L, "sleep 1000", Component.RestartPolicyEnum.ALWAYS, null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index 1961ff4..e89299e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -41,6 +41,10 @@ import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP; + +import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants + .CONTAINER_STATE_REPORT_AS_SERVICE_STATE; + /** * Tests for {@link Component}. */ @@ -440,6 +444,61 @@ public void testComponentStateUpdatesWithTerminatingComponents() serviceState); } + @Test + public void testComponentStateUpdatesWithTerminatingServiceStateComponents() + throws + Exception { + final String serviceName = + "testComponentStateUpdatesWithTerminatingServiceStateComponents"; + + Service testService = + ServiceTestUtils.createTerminatingServiceStateComponentJobExample( + serviceName); + TestServiceManager.createDef(serviceName, testService); + + ServiceContext context = new MockRunningServiceContext(rule, testService); + + for (Component comp : context.scheduler.getAllComponents().values()) { + boolean componentIsDominant = comp.getComponentSpec() + .getConfiguration().getPropertyBool( + CONTAINER_STATE_REPORT_AS_SERVICE_STATE, false); + if (componentIsDominant) { + Iterator instanceIter = comp. + getAllComponentInstances().iterator(); + + while (instanceIter.hasNext()) { + + ComponentInstance componentInstance = instanceIter.next(); + Container instanceContainer = componentInstance.getContainer(); + + //stop 1 container + ContainerStatus containerStatus = ContainerStatus.newInstance( + instanceContainer.getId(), + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + "successful", 0); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus) + .setContainerId(instanceContainer.getId())); + componentInstance.handle( + new ComponentInstanceEvent(componentInstance.getContainer(). + getId(), ComponentInstanceEventType.STOP). + setStatus(containerStatus)); + } + ComponentState componentState = + comp.getComponentSpec().getState(); + Assert.assertEquals( + ComponentState.SUCCEEDED, + componentState); + } + } + + ServiceState serviceState = + testService.getState(); + Assert.assertEquals( + ServiceState.SUCCEEDED, + serviceState); + } + private static org.apache.hadoop.yarn.service.api.records.Component createSpecWithEnv(String serviceName, String compName, String key, String val) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index f857353..c3b1602 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.api.records.Configuration; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; @@ -348,6 +349,8 @@ private Component createComponent(ServiceScheduler scheduler, org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock( org.apache.hadoop.yarn.service.api.records.Component.class); when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy); + Configuration conf = new Configuration(); + when(componentSpec.getConfiguration()).thenReturn(conf); when(comp.getRestartPolicyHandler()).thenReturn( Component.getRestartPolicyHandler(restartPolicy)); when(componentSpec.getNumberOfContainers()).thenReturn( @@ -401,6 +404,8 @@ private Component createComponent(ServiceScheduler scheduler, org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock( org.apache.hadoop.yarn.service.api.records.Component.class); when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy); + Configuration conf = new Configuration(); + when(componentSpec.getConfiguration()).thenReturn(conf); when(comp.getRestartPolicyHandler()).thenReturn( Component.getRestartPolicyHandler(restartPolicy)); when(componentSpec.getNumberOfContainers()).thenReturn( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java index 06d33b2..842f4ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java @@ -65,6 +65,9 @@ import java.util.zip.ZipOutputStream; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + +import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants + .CONTAINER_STATE_REPORT_AS_SERVICE_STATE; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS; import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; @@ -492,6 +495,8 @@ private void addWorkerComponent(Service service, if (taskType.equals(TaskType.PRIMARY_WORKER)) { workerComponent.setNumberOfContainers(1L); + workerComponent.getConfiguration().setProperty( + CONTAINER_STATE_REPORT_AS_SERVICE_STATE, "true"); } else{ workerComponent.setNumberOfContainers( (long) parameters.getNumWorkers() - 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java index 89905e5..a93baa0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -454,10 +454,15 @@ private void putEntity(TimelineEntity entity, ApplicationId appId) { } TimelineCollector timelineCollector = rmTimelineCollectorManager.get(appId); - TimelineEntities entities = new TimelineEntities(); - entities.addEntity(entity); - timelineCollector.putEntities(entities, - UserGroupInformation.getCurrentUser()); + + //timelineCollector can be null if containerFinished is called later than + //RMAppImpl.stopTimelineCollector + if(timelineCollector != null) { + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + timelineCollector.putEntities(entities, + UserGroupInformation.getCurrentUser()); + } } catch (IOException e) { LOG.error("Error when publishing entity " + entity); if (LOG.isDebugEnabled()) {