diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml index 7b198a0..b5918a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml @@ -595,6 +595,14 @@ definitions: type: array items: $ref: '#/definitions/ConfigFile' + Configuration Properties: + description: Reserved component configuration properties used to control the lifecycle of component and service. + properties: + yarn.service.container-state-report-as-service-state: + type: string + format: boolean + description: A boolean flag indicates that if this component is finished, the service is also terminated. + default: false ConfigFile: description: A config file that needs to be created and made available as a volume in a service component container. properties: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index dabff2c..a5bd458 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -116,6 +116,8 @@ import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*; import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START; +import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants + .CONTAINER_STATE_REPORT_AS_SERVICE_STATE; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes .EXIT_FALSE; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes @@ -946,13 +948,53 @@ public boolean hasAtLeastOnePlacementConstraint() { return hasAtLeastOnePlacementConstraint; } + public boolean terminateServiceIfNeeded(Component component) { + boolean serviceIsTerminated = + terminateServiceIfDomiantCompnentFinished(component) || + terminateServiceIfAllComponentsFinished(); + return serviceIsTerminated; + } + + /** + * If the service state component is finished, the service is also terminated. + * @param component + */ + private boolean terminateServiceIfDomiantCompnentFinished(Component + component) { + boolean shouldTerminate = false; + boolean componentIsDominant = component.getComponentSpec() + .getConfiguration().getPropertyBool( + CONTAINER_STATE_REPORT_AS_SERVICE_STATE, false); + if (componentIsDominant) { + ComponentRestartPolicy restartPolicy = + component.getRestartPolicyHandler(); + if (restartPolicy.shouldTerminate(component)) { + shouldTerminate = true; + boolean isSucceeded = restartPolicy.hasCompletedSuccessfully(component); + org.apache.hadoop.yarn.service.api.records.ComponentState state + = isSucceeded ? + org.apache.hadoop.yarn.service.api.records.ComponentState.SUCCEEDED + : org.apache.hadoop.yarn.service.api.records.ComponentState.FAILED; + LOG.info("{} Component state changed from {} to {}", + component.getName(), component.getComponentSpec().getState(), + state); + component.getComponentSpec().setState(state); + LOG.info("Dominate component {} finished, exiting Service Master... " + + ", final status=" + (isSucceeded ? "Succeeded" : "Failed"), + component.getName()); + terminateService(isSucceeded); + } + } + return shouldTerminate; + } + /* -* Check if all components of the scheduler finished. -* If all components finished -* (which #failed-instances + #suceeded-instances = #total-n-containers) -* The service will be terminated. -*/ - public void terminateServiceIfAllComponentsFinished() { + * Check if all components of the scheduler finished. + * If all components finished + * (which #failed-instances + #suceeded-instances = #total-n-containers) + * The service will be terminated. + */ + private boolean terminateServiceIfAllComponentsFinished() { boolean shouldTerminate = true; // Succeeded comps and failed comps, for logging purposes. @@ -964,19 +1006,19 @@ public void terminateServiceIfAllComponentsFinished() { if (restartPolicy.shouldTerminate(comp)) { if (restartPolicy.hasCompletedSuccessfully(comp)) { - comp.getComponentSpec().setState(org.apache.hadoop - .yarn.service.api.records.ComponentState.SUCCEEDED); LOG.info("{} Component state changed from {} to {}", comp.getName(), comp.getComponentSpec().getState(), org.apache.hadoop .yarn.service.api.records.ComponentState.SUCCEEDED); - } else { comp.getComponentSpec().setState(org.apache.hadoop - .yarn.service.api.records.ComponentState.FAILED); + .yarn.service.api.records.ComponentState.SUCCEEDED); + } else { LOG.info("{} Component state changed from {} to {}", comp.getName(), comp.getComponentSpec().getState(), org.apache.hadoop .yarn.service.api.records.ComponentState.FAILED); + comp.getComponentSpec().setState(org.apache.hadoop + .yarn.service.api.records.ComponentState.FAILED); } if (isTimelineServiceEnabled()) { @@ -1008,18 +1050,23 @@ public void terminateServiceIfAllComponentsFinished() { LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils .join(failedComponents, ",") + "]"); - int exitStatus = EXIT_SUCCESS; - if (failedComponents.isEmpty()) { - setGracefulStop(FinalApplicationStatus.SUCCEEDED); - app.setState(ServiceState.SUCCEEDED); - } else { - setGracefulStop(FinalApplicationStatus.FAILED); - app.setState(ServiceState.FAILED); - exitStatus = EXIT_FALSE; - } + terminateService(failedComponents.isEmpty()); + } + return shouldTerminate; + } - getTerminationHandler().terminate(exitStatus); + private void terminateService(boolean isSucceeded) { + int exitStatus = EXIT_SUCCESS; + if (isSucceeded) { + setGracefulStop(FinalApplicationStatus.SUCCEEDED); + app.setState(ServiceState.SUCCEEDED); + } else { + setGracefulStop(FinalApplicationStatus.FAILED); + app.setState(ServiceState.FAILED); + exitStatus = EXIT_FALSE; } + + getTerminationHandler().terminate(exitStatus); } public Clock getSystemClock() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index dab1e46..274b9f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -374,7 +374,7 @@ static void handleComponentInstanceRelaunch(ComponentInstance compInstance, LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ? " succeeded" : " failed") + " without retry, exitStatus=" + event.getStatus()); - comp.getScheduler().terminateServiceIfAllComponentsFinished(); + comp.getScheduler().terminateServiceIfNeeded(comp); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java index d081606..05135fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java @@ -95,4 +95,6 @@ String PRINCIPAL = "yarn.service.am.principal"; String UPGRADE_DIR = "upgrade"; + String CONTAINER_STATE_REPORT_AS_SERVICE_STATE = + "yarn.service.container-state-report-as-service-state"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index b4859af..6207d63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -74,6 +74,9 @@ import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_ENABLED; import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.AM_RESOURCE_MEM; import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH; + +import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants + .CONTAINER_STATE_REPORT_AS_SERVICE_STATE; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -126,6 +129,23 @@ public static Service createTerminatingJobExample(String serviceName) { return exampleApp; } + public static Service createTerminatingDominantComponentJobExample( + String serviceName) { + Service exampleApp = new Service(); + exampleApp.setName(serviceName); + exampleApp.setVersion("v1"); + Component serviceStateComponent = createComponent("terminating-comp1", 2, + "sleep 1000", Component.RestartPolicyEnum.NEVER, null); + serviceStateComponent.getConfiguration().setProperty( + CONTAINER_STATE_REPORT_AS_SERVICE_STATE, "true"); + exampleApp.addComponent(serviceStateComponent); + exampleApp.addComponent( + createComponent("terminating-comp2", 2, "sleep 60000", + Component.RestartPolicyEnum.ON_FAILURE, null)); + + return exampleApp; + } + public static Component createComponent(String name) { return createComponent(name, 2L, "sleep 1000", Component.RestartPolicyEnum.ALWAYS, null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index 1961ff4..f8f948d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -41,6 +41,9 @@ import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.START; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP; +import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants + .CONTAINER_STATE_REPORT_AS_SERVICE_STATE; + /** * Tests for {@link Component}. */ @@ -440,6 +443,60 @@ public void testComponentStateUpdatesWithTerminatingComponents() serviceState); } + @Test + public void testComponentStateUpdatesWithTerminatingDominantComponents() + throws Exception { + final String serviceName = + "testComponentStateUpdatesWithTerminatingServiceStateComponents"; + + Service testService = + ServiceTestUtils.createTerminatingDominantComponentJobExample( + serviceName); + TestServiceManager.createDef(serviceName, testService); + + ServiceContext context = new MockRunningServiceContext(rule, testService); + + for (Component comp : context.scheduler.getAllComponents().values()) { + boolean componentIsDominant = comp.getComponentSpec() + .getConfiguration().getPropertyBool( + CONTAINER_STATE_REPORT_AS_SERVICE_STATE, false); + if (componentIsDominant) { + Iterator instanceIter = comp. + getAllComponentInstances().iterator(); + + while (instanceIter.hasNext()) { + + ComponentInstance componentInstance = instanceIter.next(); + Container instanceContainer = componentInstance.getContainer(); + + //stop 1 container + ContainerStatus containerStatus = ContainerStatus.newInstance( + instanceContainer.getId(), + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + "successful", 0); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus) + .setContainerId(instanceContainer.getId())); + componentInstance.handle( + new ComponentInstanceEvent(componentInstance.getContainer(). + getId(), ComponentInstanceEventType.STOP). + setStatus(containerStatus)); + } + ComponentState componentState = + comp.getComponentSpec().getState(); + Assert.assertEquals( + ComponentState.SUCCEEDED, + componentState); + } + } + + ServiceState serviceState = + testService.getState(); + Assert.assertEquals( + ServiceState.SUCCEEDED, + serviceState); + } + private static org.apache.hadoop.yarn.service.api.records.Component createSpecWithEnv(String serviceName, String compName, String key, String val) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index f857353..c3b1602 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.ServiceTestUtils; +import org.apache.hadoop.yarn.service.api.records.Configuration; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; @@ -348,6 +349,8 @@ private Component createComponent(ServiceScheduler scheduler, org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock( org.apache.hadoop.yarn.service.api.records.Component.class); when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy); + Configuration conf = new Configuration(); + when(componentSpec.getConfiguration()).thenReturn(conf); when(comp.getRestartPolicyHandler()).thenReturn( Component.getRestartPolicyHandler(restartPolicy)); when(componentSpec.getNumberOfContainers()).thenReturn( @@ -401,6 +404,8 @@ private Component createComponent(ServiceScheduler scheduler, org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock( org.apache.hadoop.yarn.service.api.records.Component.class); when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy); + Configuration conf = new Configuration(); + when(componentSpec.getConfiguration()).thenReturn(conf); when(comp.getRestartPolicyHandler()).thenReturn( Component.getRestartPolicyHandler(restartPolicy)); when(componentSpec.getNumberOfContainers()).thenReturn( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java index 06d33b2..842f4ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-submarine/src/main/java/org/apache/hadoop/yarn/submarine/runtimes/yarnservice/YarnServiceJobSubmitter.java @@ -65,6 +65,9 @@ import java.util.zip.ZipOutputStream; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; + +import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants + .CONTAINER_STATE_REPORT_AS_SERVICE_STATE; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.EXIT_SUCCESS; import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; @@ -492,6 +495,8 @@ private void addWorkerComponent(Service service, if (taskType.equals(TaskType.PRIMARY_WORKER)) { workerComponent.setNumberOfContainers(1L); + workerComponent.getConfiguration().setProperty( + CONTAINER_STATE_REPORT_AS_SERVICE_STATE, "true"); } else{ workerComponent.setNumberOfContainers( (long) parameters.getNumWorkers() - 1);