commit a9db36c1ba3f8691c23f5546746437116b6569af Author: Wangda Tan Date: Tue Mar 27 10:09:09 2018 -0700 [TO-file-YARN-JIRA] Support restart policy of containers Change-Id: I385050f6af606404195e115dc265ae4a651b582f diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml index 17723bcff10..097711f2757 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml @@ -356,6 +356,14 @@ definitions: items: type: string description: A list of quicklink keys defined at the service level, and to be resolved by this component. + restartPolicy: + type: string + description: Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases) + enum: + - ALWAYS + - ON_FAILURE + - NEVER + default: ALWAYS ReadinessCheck: description: A custom command or a pluggable helper container to determine the readiness of a container of a component. Readiness for every service is different. Hence the need for a simple interface, with scope to support advanced usecases. required: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java index 75cc9c59a20..f5e7aaca1c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service; +import com.ctc.wstx.util.StringUtil; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; @@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException; import org.apache.hadoop.yarn.service.monitor.ServiceMonitor; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; @@ -59,8 +61,10 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants.KEYTAB_LOCATION; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java index ce0e0cfde8c..84288236acb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java @@ -17,6 +17,8 @@ package org.apache.hadoop.yarn.service.api.records; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.gson.annotations.SerializedName; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -29,7 +31,9 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlEnum; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlType; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; @@ -98,6 +102,52 @@ private List containers = Collections.synchronizedList(new ArrayList()); + @SerializedName("restartPolicy") + private RestartPolicyEnum restartPolicy = RestartPolicyEnum.ALWAYS; + + /** + * Artifact Type. DOCKER, TARBALL or SERVICE + **/ + @XmlType(name = "restart_policy") + @XmlEnum + public static enum RestartPolicyEnum { + ALWAYS("ALWAYS"), + + ON_FAILURE("ON_FAILURE"), + + NEVER("NEVER"); + private String value; + + RestartPolicyEnum(String value) { + this.value = value; + } + + @Override + @JsonValue + public String toString() { + return value; + } + } + + public Component restartPolicy(RestartPolicyEnum restartPolicy) { + this.restartPolicy = restartPolicy; + return this; + } + + /** + * Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases) + * @return restartPolicy + **/ + @ApiModelProperty(value = "Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases)") + public RestartPolicyEnum getRestartPolicy() { + return restartPolicy; + } + + public void setRestartPolicy(RestartPolicyEnum restartPolicy) { + this.restartPolicy = restartPolicy; + } + + /** * Name of the service component (mandatory). **/ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 0cd7e2c2084..250817baf4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -96,6 +96,9 @@ // disk_failed containers etc. This will be reset to 0 periodically. public AtomicInteger currentContainerFailure = new AtomicInteger(0); + private AtomicInteger suceededInstances = new AtomicInteger(0); + private AtomicInteger failedInstances = new AtomicInteger(0); + private StateMachine stateMachine; private AsyncDispatcher dispatcher; @@ -661,4 +664,12 @@ public ServiceContext getContext() { public List getPendingInstances() { return pendingInstances; } + + public AtomicInteger getSuceededInstances() { + return suceededInstances; + } + + public AtomicInteger getFailedInstances() { + return failedInstances; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 0e3e11bc72e..1ee5b69c7bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service.component.instance; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.registry.client.binding.RegistryPathUtils; @@ -26,6 +27,7 @@ import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -54,6 +56,8 @@ import java.io.IOException; import java.text.MessageFormat; import java.util.Date; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -91,6 +95,23 @@ // This container object is used for rest API query private org.apache.hadoop.yarn.service.api.records.Container containerSpec; + static class ProcessTerminationHandler { + void terminate(int exitCode) { + // Sleep for 5 seconds in hope that the state can be recorded in ATS. + // in case there's a client polling the comp state, it can be notified. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + LOG.info("Interrupted on sleep while exiting.", e); + } + ExitUtil.terminate(exitCode); + } + } + + // For unit test override since we don't want to terminate UT process. + static ProcessTerminationHandler terminationHandler = + new ProcessTerminationHandler(); + private static final StateMachineFactory stateMachineFactory = @@ -201,6 +222,103 @@ public void transition(ComponentInstance compInstance, } } + /* + * Check if all components of the scheduler finalized. + * If all components finalized + * (which #failed-instances + #suceeded-instances = #total-n-containers) + * The service will be terminated. + */ + private static void terminateServiceIfAllComponentsFinalized( + ServiceScheduler scheduler) { + boolean shouldTerminate = true; + + // Succeeded comps and failed comps, for logging purposes. + Set succeededComponents = new HashSet<>(); + Set failedComponents = new HashSet<>(); + + for (Component comp : scheduler.getAllComponents().values()) { + int nSucceeded = comp.getSuceededInstances().get(); + int nFailed = comp.getFailedInstances().get(); + if (nSucceeded + nFailed < comp.getComponentSpec() + .getNumberOfContainers()) { + shouldTerminate = false; + break; + } + if (nFailed > 0) { + failedComponents.add(comp.getName()); + } else { + succeededComponents.add(comp.getName()); + } + } + + if (shouldTerminate) { + LOG.info("All component finalized, exiting Service Master... " + + ", final status=" + (failedComponents.isEmpty() ? + "Succeeded" : + "Failed")); + LOG.info( + "Succeeded components: [" + org.apache.commons.lang3.StringUtils + .join(succeededComponents, ",") + + "]"); + LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils + .join(failedComponents, ",") + + "]"); + + if (!failedComponents.isEmpty()) { + terminationHandler.terminate(-1); + } else { + terminationHandler.terminate(0); + } + } + } + + @VisibleForTesting + static void handleComponentInstanceRelaunch( + ComponentInstance compInstance, ComponentInstanceEvent event) { + Component comp = compInstance.getComponent(); + + // Do we need to relaunch the service? + boolean shouldRelaunch = false; + boolean succeeded = false; + if (event.getStatus() != null + && event.getStatus().getExitStatus() == ContainerExitStatus.SUCCESS) { + succeeded = true; + } + + if (comp.getComponentSpec().getRestartPolicy() + == org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.ALWAYS) { + // Unconditional relaunch if restart_policy == always. + shouldRelaunch = true; + } else if (comp.getComponentSpec().getRestartPolicy() + == org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.ON_FAILURE + && (!succeeded)) { + // Relaunch for non-success exit code if restart_policy == on_failure. + shouldRelaunch = true; + } + + if (shouldRelaunch) { + comp.reInsertPendingInstance(compInstance); + LOG.info(compInstance.getCompInstanceId() + + ": {} completed. Reinsert back to pending list and requested " + + "a new container." + System.lineSeparator() + + " exitStatus={}, diagnostics={}.", + event.getContainerId(), event.getStatus().getExitStatus(), + event.getStatus().getDiagnostics()); + } else { + // When no relaunch, update component's #suceeded/#failed + // instances. + if (succeeded) { + comp.getSuceededInstances().incrementAndGet(); + } else { + comp.getFailedInstances().incrementAndGet(); + } + LOG.info(compInstance.getCompInstanceId() + (succeeded ? + " succeeded" : + " failed") + " without retry, exitStatus=" + event.getStatus()); + terminateServiceIfAllComponentsFinalized(comp.getScheduler()); + } + } + private static class ContainerStoppedTransition extends BaseTransition { // whether the container failed before launched by AM or not. boolean failedBeforeLaunching = false; @@ -228,7 +346,9 @@ public void transition(ComponentInstance compInstance, compInstance.component.decContainersReady(); } compInstance.component.decRunningContainers(); - boolean shouldExit = false; + + // Should we fail (terminate) the service? + boolean shouldFailService = false; // check if it exceeds the failure threshold if (comp.currentContainerFailure.get() > comp.maxContainerFailurePerComp) { String exitDiag = MessageFormat.format( @@ -240,7 +360,7 @@ public void transition(ComponentInstance compInstance, comp.getScheduler().getDiagnostics().append(containerDiag); comp.getScheduler().getDiagnostics().append(exitDiag); LOG.warn(exitDiag); - shouldExit = true; + shouldFailService = true; } if (!failedBeforeLaunching) { @@ -264,24 +384,13 @@ public void transition(ComponentInstance compInstance, // remove the failed ContainerId -> CompInstance mapping comp.getScheduler().removeLiveCompInstance(event.getContainerId()); - comp.reInsertPendingInstance(compInstance); - - LOG.info(compInstance.getCompInstanceId() - + ": {} completed. Reinsert back to pending list and requested " + - "a new container." + System.lineSeparator() + - " exitStatus={}, diagnostics={}.", - event.getContainerId(), event.getStatus().getExitStatus(), - event.getStatus().getDiagnostics()); - if (shouldExit) { - // Sleep for 5 seconds in hope that the state can be recorded in ATS. - // in case there's a client polling the comp state, it can be notified. - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - LOG.error("Interrupted on sleep while exiting.", e); - } - ExitUtil.terminate(-1); + if (shouldFailService) { + terminationHandler.terminate(-1); } + + // According to component restart policy, handle container restart + // or finish the service (if all components finalized) + handleComponentInstanceRelaunch(compInstance, event); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java new file mode 100644 index 00000000000..b1186117103 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service.component.instance; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.ServiceScheduler; +import org.apache.hadoop.yarn.service.component.Component; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestComponentInstance { + private Component createComponent(ServiceScheduler scheduler, + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum restartPolicy, + int nSucceededInstances, int nFailedInstances, int totalAsk) { + Component comp = mock(Component.class); + org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock( + org.apache.hadoop.yarn.service.api.records.Component.class); + when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy); + when(componentSpec.getNumberOfContainers()).thenReturn( + Long.valueOf(totalAsk)); + when(comp.getComponentSpec()).thenReturn(componentSpec); + AtomicInteger succeeded = new AtomicInteger(nSucceededInstances); + AtomicInteger failed = new AtomicInteger(nFailedInstances); + when(comp.getSuceededInstances()).thenReturn(succeeded); + when(comp.getFailedInstances()).thenReturn(failed); + when(comp.getScheduler()).thenReturn(scheduler); + return comp; + } + + private ComponentInstance createComponentInstance( + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum restartPolicy, + int nSucceededInstances, int nFailedInstances, int totalAsk) { + // clean up the mock every time. + ComponentInstance.terminationHandler = mock( + ComponentInstance.ProcessTerminationHandler.class); + + ServiceScheduler serviceScheduler = mock(ServiceScheduler.class); + Component comp = createComponent(serviceScheduler, restartPolicy, + nSucceededInstances, nFailedInstances, totalAsk); + + ComponentInstance componentInstance = mock(ComponentInstance.class); + when(componentInstance.getComponent()).thenReturn(comp); + + Map allComponents = new HashMap<>(); + allComponents.put("comp1", comp); + + when(serviceScheduler.getAllComponents()).thenReturn(allComponents); + + return componentInstance; + } + + @Test + public void testComponentRestartPolicy() { + ComponentInstanceEvent componentInstanceEvent = mock( + ComponentInstanceEvent.class); + ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId + .newInstance(ApplicationId.newInstance(1234L, 1), 1), 1); + ContainerStatus containerStatus = ContainerStatus.newInstance(containerId, + ContainerState.COMPLETE, "hello", 0); + + when(componentInstanceEvent.getStatus()).thenReturn(containerStatus); + + // Test case1: one component, one instance, restart policy = ALWAYS, exit=0 + ComponentInstance componentInstance = createComponentInstance( + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.ALWAYS, + 0, 0, 1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + Assert.assertEquals(0, + componentInstance.getComponent().getFailedInstances().get()); + Assert.assertEquals(0, + componentInstance.getComponent().getSuceededInstances().get()); + verify(componentInstance.getComponent(), times(1)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, never()).terminate(anyInt()); + + // Test case2: one component, one instance, restart policy = ALWAYS, exit=1 + componentInstance = createComponentInstance( + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.ALWAYS, + 0, 0, 1); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + Assert.assertEquals(0, + componentInstance.getComponent().getFailedInstances().get()); + Assert.assertEquals(0, + componentInstance.getComponent().getSuceededInstances().get()); + verify(componentInstance.getComponent(), times(1)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, never()).terminate(anyInt()); + + // Test case3: one component, one instance, restart policy = NEVER, exit=0 + // Should exit with code=0 + componentInstance = createComponentInstance( + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.NEVER, + 0, 0, 1); + containerStatus.setExitStatus(0); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + Assert.assertEquals(0, + componentInstance.getComponent().getFailedInstances().get()); + Assert.assertEquals(1, + componentInstance.getComponent().getSuceededInstances().get()); + verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(1)).terminate(eq(0)); + + // Test case4: one component, one instance, restart policy = NEVER, exit=1 + // Should exit with code=-1 + componentInstance = createComponentInstance( + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.NEVER, + 0, 0, 1); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + Assert.assertEquals(1, + componentInstance.getComponent().getFailedInstances().get()); + Assert.assertEquals(0, + componentInstance.getComponent().getSuceededInstances().get()); + verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(1)).terminate(eq(-1)); + + // Test case5: one component, one instance, restart policy = ON_FAILURE, exit=1 + // Should continue run. + componentInstance = createComponentInstance( + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.ON_FAILURE, + 0, 0, 1); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + Assert.assertEquals(0, + componentInstance.getComponent().getFailedInstances().get()); + Assert.assertEquals(0, + componentInstance.getComponent().getSuceededInstances().get()); + verify(componentInstance.getComponent(), times(1)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(0)).terminate(anyInt()); + + // Test case6: one component, 3 instances, restart policy = NEVER, exit=1 + // 2 of the components completed, it should continue run. + componentInstance = createComponentInstance( + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.NEVER, + 1, 0, 3); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + Assert.assertEquals(1, + componentInstance.getComponent().getFailedInstances().get()); + Assert.assertEquals(1, + componentInstance.getComponent().getSuceededInstances().get()); + verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(0)).terminate(anyInt()); + + // Test case7: one component, 3 instances, restart policy = ON_FAILURE, exit=1 + // 2 of the components completed, it should continue run. + componentInstance = createComponentInstance( + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.ON_FAILURE, + 1, 0, 3); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + Assert.assertEquals(0, + componentInstance.getComponent().getFailedInstances().get()); + Assert.assertEquals(1, + componentInstance.getComponent().getSuceededInstances().get()); + verify(componentInstance.getComponent(), times(1)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(0)).terminate(anyInt()); + + // Test case8: 2 components, 2 instances for each + // comp2 already finished. + // comp1 has a new instance finish, we should terminate the service + componentInstance = createComponentInstance( + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.NEVER, + 1, 0, 2); + containerStatus.setExitStatus(1); + // 2nd component, already finished. + Component comp2 = createComponent( + componentInstance.getComponent().getScheduler(), + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.NEVER, + 1, 1, 2); + componentInstance.getComponent().getScheduler().getAllComponents().put( + "comp2", comp2); + + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + Assert.assertEquals(1, + componentInstance.getComponent().getFailedInstances().get()); + Assert.assertEquals(1, + componentInstance.getComponent().getSuceededInstances().get()); + verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(1)).terminate(eq(-1)); + + // Test case9: 2 components, 2 instances for each + // comp2 already finished. + // comp1 has a new instance finish, we should terminate the service + // All instance finish with 0, service should exit with 0 as well. + componentInstance = createComponentInstance( + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.NEVER, + 1, 0, 2); + containerStatus.setExitStatus(0); + // 2nd component, already finished. + comp2 = createComponent( + componentInstance.getComponent().getScheduler(), + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.NEVER, + 2, 0, 2); + componentInstance.getComponent().getScheduler().getAllComponents().put( + "comp2", comp2); + + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + Assert.assertEquals(0, + componentInstance.getComponent().getFailedInstances().get()); + Assert.assertEquals(2, + componentInstance.getComponent().getSuceededInstances().get()); + verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(1)).terminate(eq(0)); + + // Test case10: 2 components, 2 instances for each + // comp2 hasn't finished + // comp1 finished. + // Service should continue run. + componentInstance = createComponentInstance( + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.NEVER, + 1, 0, 2); + containerStatus.setExitStatus(0); + // 2nd component, already finished. + comp2 = createComponent( + componentInstance.getComponent().getScheduler(), + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.NEVER, + 1, 0, 2); + componentInstance.getComponent().getScheduler().getAllComponents().put( + "comp2", comp2); + + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + Assert.assertEquals(0, + componentInstance.getComponent().getFailedInstances().get()); + Assert.assertEquals(2, + componentInstance.getComponent().getSuceededInstances().get()); + verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(0)).terminate(anyInt()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md index 63e0af56fa8..f4df3433bf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md @@ -231,6 +231,7 @@ One or more components of the service. If the service is HBase say, then the com |placement_policy|Advanced scheduling and placement policies for all containers of this component (optional). If not specified, the service level placement_policy takes effect. Refer to the description at the global level for more details.|false|PlacementPolicy|| |configuration|Config properties for this component.|false|Configuration|| |quicklinks|A list of quicklink keys defined at the service level, and to be resolved by this component.|false|string array|| +|restart_policy|Restart policy of this component, could be NEVER/ALWAYS/ON_FAILURE|false|string|ALWAYS| ### ComponentState