diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml index 8c5ad6528a3..51f215dcbde 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml @@ -424,6 +424,14 @@ definitions: items: type: string description: A list of quicklink keys defined at the service level, and to be resolved by this component. + restartPolicy: + type: string + description: Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases) + enum: + - ALWAYS + - ON_FAILURE + - NEVER + default: ALWAYS ReadinessCheck: description: A check to be performed to determine the readiness of a component instance (a container). If no readiness check is specified, the default readiness check will be used unless the yarn.service.default-readiness-check.enabled configuration property is set to false at the component, service, or system level. The artifact field is currently unsupported but may be implemented in the future, enabling a pluggable helper container to support advanced use cases. required: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index d5d6fa421be..85750de42bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -27,6 +27,7 @@ import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; @@ -130,7 +131,7 @@ public StopResponseProto stop(StopRequestProto requestProto) LOG.info("Stop the service by {}", UserGroupInformation.getCurrentUser()); context.scheduler.getDiagnostics() .append("Stopped by user " + UserGroupInformation.getCurrentUser()); - context.scheduler.setGracefulStop(); + context.scheduler.setGracefulStop(FinalApplicationStatus.ENDED); // Stop the service in 2 seconds delay to make sure this rpc call is completed. // shutdown hook will be executed which will stop AM gracefully. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java index 0383a65bf2e..70bccfcf721 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException; import org.apache.hadoop.yarn.service.monitor.ServiceMonitor; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; @@ -60,8 +61,10 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants.KEYTAB_LOCATION; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index ee0a1a73dd0..09bf913b0ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -158,6 +158,9 @@ private boolean gracefulStop = false; + private volatile FinalApplicationStatus finalApplicationStatus = + FinalApplicationStatus.ENDED; + public ServiceScheduler(ServiceContext context) { super(context.service.getName()); this.context = context; @@ -256,8 +259,9 @@ protected NMClientAsync createNMClient() { .createAMRMClientAsync(1000, new AMRMClientCallback()); } - protected void setGracefulStop() { + public void setGracefulStop(FinalApplicationStatus finalApplicationStatus) { this.gracefulStop = true; + this.finalApplicationStatus = finalApplicationStatus; nmClient.getClient().cleanupRunningContainersOnStop(true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java index 7deb076eda6..2beafc87c3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java @@ -17,6 +17,8 @@ package org.apache.hadoop.yarn.service.api.records; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.gson.annotations.SerializedName; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -29,7 +31,9 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlEnum; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlType; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; @@ -98,6 +102,54 @@ private List containers = Collections.synchronizedList(new ArrayList()); + + @JsonProperty("restart_policy") + @XmlElement(name = "restart_policy") + private RestartPolicyEnum restartPolicy = RestartPolicyEnum.ALWAYS; + + /** + * Artifact Type. DOCKER, TARBALL or SERVICE + **/ + @XmlType(name = "restart_policy") + @XmlEnum + public static enum RestartPolicyEnum { + ALWAYS("ALWAYS"), + + ON_FAILURE("ON_FAILURE"), + + NEVER("NEVER"); + private String value; + + RestartPolicyEnum(String value) { + this.value = value; + } + + @Override + @JsonValue + public String toString() { + return value; + } + } + + public Component restartPolicy(RestartPolicyEnum restartPolicy) { + this.restartPolicy = restartPolicy; + return this; + } + + /** + * Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases) + * @return restartPolicy + **/ + @ApiModelProperty(value = "Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases)") + public RestartPolicyEnum getRestartPolicy() { + return restartPolicy; + } + + public void setRestartPolicy(RestartPolicyEnum restartPolicy) { + this.restartPolicy = restartPolicy; + } + + /** * Name of the service component (mandatory). **/ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index e115841cb47..e14922c5ce0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -111,6 +111,13 @@ // The number of containers failed since last reset. This excludes preempted, // disk_failed containers etc. This will be reset to 0 periodically. public AtomicInteger currentContainerFailure = new AtomicInteger(0); + + //succeeded and Failed instances are Populated only for RestartPolicyEnum + //.ON_FAILURE/NEVER + private Map succeededInstances = + new ConcurrentHashMap<>(); + private Map failedInstances = + new ConcurrentHashMap<>(); private boolean healthThresholdMonitorEnabled = false; private AtomicBoolean upgradeInProgress = new AtomicBoolean(false); @@ -338,6 +345,8 @@ public ComponentState transition(Component component, // remove the instance component.compInstances.remove(instance.getCompInstanceName()); component.pendingInstances.remove(instance); + component.removeFailedInstanceIfExists(instance); + component.removeSuccessfulInstanceIfExists(instance); // decrement id counter component.instanceIdCounter.decrementAndGet(); instance.destroy(); @@ -949,4 +958,42 @@ public void setHealthThresholdMonitorEnabled( boolean healthThresholdMonitorEnabled) { this.healthThresholdMonitorEnabled = healthThresholdMonitorEnabled; } + + public Collection getSuceededInstances() { + return succeededInstances.values(); + } + + public Collection getFailedInstances() { + return failedInstances.values(); + } + + public synchronized void markAsSucceeded(ComponentInstance + instance) { + removeFailedInstanceIfExists(instance); + succeededInstances.put(instance.getCompInstanceName(), instance); + } + + public synchronized void markAsFailed + (ComponentInstance instance) { + removeSuccessfulInstanceIfExists(instance); + failedInstances.put(instance.getCompInstanceName(), instance); + } + + public boolean removeFailedInstanceIfExists + (ComponentInstance instance) { + if ( failedInstances.containsKey(instance.getCompInstanceName())) { + failedInstances.remove(instance.getCompInstanceName()); + return true; + } + return false; + } + + public boolean removeSuccessfulInstanceIfExists + (ComponentInstance instance) { + if ( succeededInstances.containsKey(instance.getCompInstanceName())) { + succeededInstances.remove(instance.getCompInstanceName()); + return true; + } + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 9d0a56b468e..93098d3936f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service.component.instance; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.registry.client.api.RegistryConstants; @@ -25,11 +26,12 @@ import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -58,6 +60,8 @@ import java.io.IOException; import java.text.MessageFormat; import java.util.Date; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -95,6 +99,10 @@ // This container object is used for rest API query private org.apache.hadoop.yarn.service.api.records.Container containerSpec; + // For unit test override since we don't want to terminate UT process. + static ServiceUtils.ProcessTerminationHandler terminationHandler = + new ServiceUtils.ProcessTerminationHandler(); + private static final StateMachineFactory stateMachineFactory = @@ -220,6 +228,102 @@ public void transition(ComponentInstance compInstance, } } + /* + * Check if all components of the scheduler finished. + * If all components finished + * (which #failed-instances + #suceeded-instances = #total-n-containers) + * The service will be terminated. + */ + private static synchronized void terminateServiceIfAllComponentsFinished( + ServiceScheduler scheduler) { + boolean shouldTerminate = true; + + // Succeeded comps and failed comps, for logging purposes. + Set succeededComponents = new HashSet<>(); + Set failedComponents = new HashSet<>(); + + for (Component comp : scheduler.getAllComponents().values()) { + int nSucceeded = comp.getSuceededInstances().size(); + int nFailed = comp.getFailedInstances().size(); + if (nSucceeded + nFailed < comp.getComponentSpec() + .getNumberOfContainers()) { + shouldTerminate = false; + break; + } + if (nFailed > 0) { + failedComponents.add(comp.getName()); + } else{ + succeededComponents.add(comp.getName()); + } + } + + if (shouldTerminate) { + LOG.info("All component finished, exiting Service Master... " + + ", final status=" + (failedComponents.isEmpty() ? + "Succeeded" : + "Failed")); + LOG.info("Succeeded components: [" + org.apache.commons.lang3.StringUtils + .join(succeededComponents, ",") + "]"); + LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils + .join(failedComponents, ",") + "]"); + + if (!failedComponents.isEmpty()) { + scheduler.setGracefulStop(FinalApplicationStatus.FAILED); + terminationHandler.terminate(-1); + } else{ + scheduler.setGracefulStop(FinalApplicationStatus.SUCCEEDED); + terminationHandler.terminate(0); + } + } + } + + @VisibleForTesting + static void handleComponentInstanceRelaunch( + ComponentInstance compInstance, ComponentInstanceEvent event) { + Component comp = compInstance.getComponent(); + + // Do we need to relaunch the service? + boolean shouldRelaunch = false; + boolean succeeded = false; + if (event.getStatus() != null + && event.getStatus().getExitStatus() == ContainerExitStatus.SUCCESS) { + succeeded = true; + } + + if (comp.getComponentSpec().getRestartPolicy() + == org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.ALWAYS) { + // Unconditional relaunch if restart_policy == always. + shouldRelaunch = true; + } else if (comp.getComponentSpec().getRestartPolicy() + == org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.ON_FAILURE + && (!succeeded)) { + // Relaunch for non-success exit code if restart_policy == on_failure. + shouldRelaunch = true; + } + + if (shouldRelaunch) { + comp.reInsertPendingInstance(compInstance); + LOG.info(compInstance.getCompInstanceId() + + ": {} completed. Reinsert back to pending list and requested " + + "a new container." + System.lineSeparator() + + " exitStatus={}, diagnostics={}.", + event.getContainerId(), event.getStatus().getExitStatus(), + event.getStatus().getDiagnostics()); + } else { + // When no relaunch, update component's #succeeded/#failed + // instances. + if (succeeded) { + comp.markAsSucceeded(compInstance); + } else { + comp.markAsFailed(compInstance); + } + LOG.info(compInstance.getCompInstanceId() + (succeeded ? + " succeeded" : + " failed") + " without retry, exitStatus=" + event.getStatus()); + terminateServiceIfAllComponentsFinished(comp.getScheduler()); + } + } + private static class ContainerStoppedTransition extends BaseTransition { // whether the container failed before launched by AM or not. boolean failedBeforeLaunching = false; @@ -249,7 +353,8 @@ public void transition(ComponentInstance compInstance, compInstance.component.decContainersReady(true); } compInstance.component.decRunningContainers(); - boolean shouldExit = false; + // Should we fail (terminate) the service? + boolean shouldFailService = false; // Check if it exceeds the failure threshold, but only if health threshold // monitor is not enabled if (!comp.isHealthThresholdMonitorEnabled() @@ -264,7 +369,7 @@ public void transition(ComponentInstance compInstance, comp.getScheduler().getDiagnostics().append(containerDiag); comp.getScheduler().getDiagnostics().append(exitDiag); LOG.warn(exitDiag); - shouldExit = true; + shouldFailService = true; } if (!failedBeforeLaunching) { @@ -288,23 +393,12 @@ public void transition(ComponentInstance compInstance, // remove the failed ContainerId -> CompInstance mapping comp.getScheduler().removeLiveCompInstance(event.getContainerId()); - comp.reInsertPendingInstance(compInstance); + // According to component restart policy, handle container restart + // or finish the service (if all components finished) + handleComponentInstanceRelaunch(compInstance, event); - LOG.info(compInstance.getCompInstanceId() - + ": {} completed. Reinsert back to pending list and requested " + - "a new container." + System.lineSeparator() + - " exitStatus={}, diagnostics={}.", - event.getContainerId(), event.getStatus().getExitStatus(), - event.getStatus().getDiagnostics()); - if (shouldExit) { - // Sleep for 5 seconds in hope that the state can be recorded in ATS. - // in case there's a client polling the comp state, it can be notified. - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - LOG.error("Interrupted on sleep while exiting.", e); - } - ExitUtil.terminate(-1); + if (shouldFailService) { + terminationHandler.terminate(-1); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java index 915b8361343..890fe5c059f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java @@ -28,8 +28,10 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; import org.apache.hadoop.yarn.service.containerlaunch.ClasspathConstructor; import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException; @@ -571,4 +573,17 @@ public static String getLocalHostName(@Nullable Configuration conf) // Fallback to querying the default hostname as we did before. return InetAddress.getLocalHost().getCanonicalHostName(); } + + public static class ProcessTerminationHandler { + public void terminate(int exitCode) { + // Sleep for 5 seconds in hope that the state can be recorded in ATS. + // in case there's a client polling the comp state, it can be notified. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + log.info("Interrupted on sleep while exiting.", e); + } + ExitUtil.terminate(exitCode); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 599b8a7a0ae..7c056d2439c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -103,6 +103,11 @@ public static Component createComponent(String name) { return createComponent(name, 2L, "sleep 1000"); } + public static Component createComponent(String name, int numContainers, + String cmd) { + return createComponent(name, numContainers, cmd); + } + protected static Component createComponent(String name, long numContainers, String command) { Component comp1 = new Component(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index 0b56d7ef19c..8956e9da2e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -18,7 +18,12 @@ package org.apache.hadoop.yarn.service.component.instance; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.ServiceTestUtils; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; @@ -26,9 +31,26 @@ import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; import org.apache.hadoop.yarn.service.component.TestComponent; +import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for {@link ComponentInstance}. @@ -43,31 +65,31 @@ public void testContainerUpgrade() throws Exception { ServiceContext context = TestComponent.createTestContext(rule, "testContainerUpgrade"); - Component component = context.scheduler.getAllComponents().entrySet() - .iterator().next().getValue(); + Component component = + context.scheduler.getAllComponents().entrySet().iterator().next().getValue(); upgradeComponent(component); - ComponentInstance instance = component.getAllComponentInstances() - .iterator().next(); + ComponentInstance instance = + component.getAllComponentInstances().iterator().next(); ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); instance.handle(instanceEvent); Container containerSpec = component.getComponentSpec().getContainer( instance.getContainer().getId().toString()); - Assert.assertEquals("instance not upgrading", - ContainerState.UPGRADING, containerSpec.getState()); + Assert.assertEquals("instance not upgrading", ContainerState.UPGRADING, + containerSpec.getState()); } @Test public void testContainerReadyAfterUpgrade() throws Exception { ServiceContext context = TestComponent.createTestContext(rule, "testContainerStarted"); - Component component = context.scheduler.getAllComponents().entrySet() - .iterator().next().getValue(); + Component component = + context.scheduler.getAllComponents().entrySet().iterator().next().getValue(); upgradeComponent(component); - ComponentInstance instance = component.getAllComponentInstances() - .iterator().next(); + ComponentInstance instance = + component.getAllComponentInstances().iterator().next(); ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); @@ -75,14 +97,429 @@ public void testContainerReadyAfterUpgrade() throws Exception { instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), ComponentInstanceEventType.BECOME_READY)); - Assert.assertEquals("instance not ready", - ContainerState.READY, instance.getCompSpec().getContainer( - instance.getContainer().getId().toString()).getState()); + Assert.assertEquals("instance not ready", ContainerState.READY, + instance.getCompSpec().getContainer(instance.getContainer().getId().toString()) + .getState()); } private void upgradeComponent(Component component) { component.handle(new ComponentEvent(component.getName(), - ComponentEventType.UPGRADE) - .setTargetSpec(component.getComponentSpec()).setUpgradeVersion("v2")); + ComponentEventType.UPGRADE).setTargetSpec(component.getComponentSpec()).setUpgradeVersion("v2")); + } + + private Component createComponent( + ServiceScheduler scheduler, + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum restartPolicy, + int nSucceededInstances, int nFailedInstances, int totalAsk, int + componentId) { + + assert (nSucceededInstances + nFailedInstances) <= totalAsk; + + Component comp = mock(Component.class); + org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock( + org.apache.hadoop.yarn.service.api.records.Component.class); + when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy); + when(componentSpec.getNumberOfContainers()).thenReturn( + Long.valueOf(totalAsk)); + when(comp.getComponentSpec()).thenReturn(componentSpec); + when(comp.getScheduler()).thenReturn(scheduler); + + Map succeeded = new ConcurrentHashMap<>(); + Map failed = new ConcurrentHashMap<>(); + scheduler.getAllComponents().put("comp" + componentId, comp); + + Map componentInstances = new HashMap<>(); + + for ( int i = 0; i < nSucceededInstances; i++ ) { + ComponentInstance componentInstance = createComponentInstance(comp, + i); + componentInstances.put(componentInstance.getCompInstanceName(), + componentInstance); + succeeded.put(componentInstance.getCompInstanceName(), componentInstance); + } + + for ( int i = 0; i < nFailedInstances; i++ ) { + ComponentInstance componentInstance = + createComponentInstance(comp, i + nSucceededInstances); + componentInstances.put(componentInstance.getCompInstanceName(), + componentInstance); + failed.put(componentInstance.getCompInstanceName(), componentInstance); + } + + int delta = totalAsk - nFailedInstances - nSucceededInstances; + + for ( int i = 0; i < delta; i++ ) { + ComponentInstance componentInstance = + createComponentInstance(comp, i + nSucceededInstances + nFailedInstances); + componentInstances.put(componentInstance.getCompInstanceName(), + componentInstance); + } + + when(comp.getAllComponentInstances()).thenReturn(componentInstances.values()); + when(comp.getSuceededInstances()).thenReturn(succeeded.values()); + when(comp.getFailedInstances()).thenReturn(failed.values()); + return comp; + } + + private Component createComponent( + ServiceScheduler scheduler, + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum restartPolicy, + int totalAsk, int + componentId) { + + Component comp = mock(Component.class); + org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock( + org.apache.hadoop.yarn.service.api.records.Component.class); + when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy); + when(componentSpec.getNumberOfContainers()).thenReturn( + Long.valueOf(totalAsk)); + when(comp.getComponentSpec()).thenReturn(componentSpec); + when(comp.getScheduler()).thenReturn(scheduler); + + scheduler.getAllComponents().put("comp" + componentId, comp); + + Map componentInstances = new HashMap<>(); + + for ( int i = 0; i < totalAsk; i++ ) { + ComponentInstance componentInstance = + createComponentInstance(comp, i); + componentInstances.put(componentInstance.getCompInstanceName(), + componentInstance); + } + + when(comp.getAllComponentInstances()).thenReturn(componentInstances.values()); + return comp; + } + + private ComponentInstance createComponentInstance(Component component, + int instanceId) { + // clean up the mock every time. + ComponentInstance.terminationHandler = mock( + ServiceUtils.ProcessTerminationHandler.class); + + ComponentInstance componentInstance = mock(ComponentInstance.class); + when(componentInstance.getComponent()).thenReturn(component); + when(componentInstance.getCompInstanceName()).thenReturn("compInstance" + + instanceId); + return componentInstance; + } + + @Test + public void testComponentRestartPolicy() { + + Map allComponents = new HashMap<>(); + ServiceScheduler serviceScheduler = mock(ServiceScheduler.class); + when(serviceScheduler.getAllComponents()).thenReturn(allComponents); + + ComponentInstanceEvent componentInstanceEvent = mock( + ComponentInstanceEvent.class); + ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId + .newInstance(ApplicationId.newInstance(1234L, 1), 1), 1); + ContainerStatus containerStatus = ContainerStatus.newInstance(containerId, + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, "hello", + 0); + + when(componentInstanceEvent.getStatus()).thenReturn(containerStatus); + + // Test case1: one component, one instance, restart policy = ALWAYS, exit=0 + Component comp = createComponent( + serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.ALWAYS, + 1, 0, 1, 0); + ComponentInstance componentInstance = comp.getAllComponentInstances() + .iterator().next(); + + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + + verify(comp, never()).markAsSucceeded( + any(ComponentInstance.class)); + verify(comp, never()).markAsFailed( + any(ComponentInstance.class)); + verify(comp, times(1)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, never()).terminate(anyInt()); + + // Test case2: one component, one instance, restart policy = ALWAYS, exit=1 + comp = createComponent( + serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.ALWAYS, + 0, 1, 1, 0); + componentInstance = comp.getAllComponentInstances() + .iterator().next(); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + verify(comp, never()).markAsSucceeded( + any(ComponentInstance.class)); + verify(comp, never()).markAsFailed( + any(ComponentInstance.class)); + verify(comp, times(1)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, never()).terminate(anyInt()); + + // Test case3: one component, one instance, restart policy = NEVER, exit=0 + // Should exit with code=0 + comp = createComponent( + serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 1, 0, 1, 0); + componentInstance = comp.getAllComponentInstances() + .iterator().next(); + containerStatus.setExitStatus(0); + + Map succeededInstances = new HashMap<>(); + succeededInstances.put(componentInstance.getCompInstanceName(), + componentInstance); + when(comp.getSuceededInstances()).thenReturn(succeededInstances.values()); + + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + verify(comp, times(1)).markAsSucceeded( + any(ComponentInstance.class)); + verify(comp, never()).markAsFailed( + any(ComponentInstance.class)); + verify(comp, times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(1)).terminate(eq(0)); + + // Test case4: one component, one instance, restart policy = NEVER, exit=1 + // Should exit with code=-1 + comp = createComponent( + serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 0, 1, 1, 0); + componentInstance = comp.getAllComponentInstances() + .iterator().next(); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + verify(comp, never()).markAsSucceeded( + any(ComponentInstance.class)); + verify(comp, times(1)).markAsFailed( + any(ComponentInstance.class)); + verify(comp, times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(1)).terminate(eq(-1)); + + // Test case5: one component, one instance, restart policy = ON_FAILURE, exit=1 + // Should continue run. + comp = createComponent( + serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.ON_FAILURE, + 0, 1, 1, 0); + componentInstance = comp.getAllComponentInstances() + .iterator().next(); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + verify(comp, never()).markAsSucceeded( + any(ComponentInstance.class)); + verify(comp, never()).markAsFailed( + any(ComponentInstance.class)); + verify(comp, times(1)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(0)).terminate(anyInt()); + + // Test case6: one component, 3 instances, restart policy = NEVER, exit=1 + // 2 of the instances not completed, it should continue run. + comp = createComponent( + serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 0, 1, 3, 0); + componentInstance = comp.getAllComponentInstances() + .iterator().next(); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + verify(comp, never()).markAsSucceeded( + any(ComponentInstance.class)); + verify(comp, times(1)).markAsFailed( + any(ComponentInstance.class)); + verify(comp, times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(0)).terminate(anyInt()); + + // Test case7: one component, 3 instances, restart policy = ON_FAILURE, exit=1 + // 2 of the instances completed, it should continue run. + + comp = createComponent( + serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.ON_FAILURE, + 0, 1, 3, 0); + + Iterator iter = comp.getAllComponentInstances().iterator(); + + containerStatus.setExitStatus(1); + ComponentInstance commponentInstance = iter.next(); + ComponentInstance.handleComponentInstanceRelaunch(commponentInstance, + componentInstanceEvent); + + verify(comp, never()).markAsSucceeded( + any(ComponentInstance.class)); + verify(comp, never()).markAsFailed( + any(ComponentInstance.class)); + verify(comp, times(1)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(0)).terminate(anyInt()); + + // Test case8: 2 components, 2 instances for each + // comp2 already finished. + // comp1 has a new instance finish, we should terminate the service + + comp = createComponent( + serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 2, 0); + Collection component1Instances = comp.getAllComponentInstances(); + + containerStatus.setExitStatus(1); + + Component comp2 = createComponent( + componentInstance.getComponent().getScheduler(), + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.NEVER, + 2, 1); + + Collection component2Instances = + comp2.getAllComponentInstances(); + + Map failed2Instances = new HashMap<>(); + + for(ComponentInstance component2Instance : component2Instances) { + failed2Instances.put(component2Instance.getCompInstanceName(), + component2Instance); + when(component2Instance.getComponent().getFailedInstances()).thenReturn + (failed2Instances.values()); + ComponentInstance.handleComponentInstanceRelaunch(component2Instance, + componentInstanceEvent); + } + + Map failed1Instances = new HashMap<>(); + + // 2nd component, already finished. + for (ComponentInstance component1Instance : component1Instances) { + failed1Instances.put(component1Instance.getCompInstanceName(), + component1Instance); + when(component1Instance.getComponent().getFailedInstances()).thenReturn + (failed1Instances.values()); + ComponentInstance.handleComponentInstanceRelaunch(component1Instance, + componentInstanceEvent); + } + + verify(comp, never()).markAsSucceeded( + any(ComponentInstance.class)); + verify(comp, times(2)).markAsFailed( + any(ComponentInstance.class)); + verify(comp, times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + + verify(ComponentInstance.terminationHandler, times(1)).terminate(eq(-1)); + + // Test case9: 2 components, 2 instances for each + // comp2 already finished. + // comp1 has a new instance finish, we should terminate the service + // All instance finish with 0, service should exit with 0 as well. + containerStatus.setExitStatus(0); + + comp = createComponent( + serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.ON_FAILURE, + 2, 0); + component1Instances = comp.getAllComponentInstances(); + + comp2 = createComponent( + componentInstance.getComponent().getScheduler(), + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.ON_FAILURE, + 2, 1); + + component2Instances = + comp2.getAllComponentInstances(); + + Map succeeded2Instances = new HashMap<>(); + + for(ComponentInstance component2Instance : component2Instances) { + succeeded2Instances.put(component2Instance.getCompInstanceName(), + component2Instance); + when(component2Instance.getComponent().getSuceededInstances()).thenReturn + (succeeded2Instances.values()); + ComponentInstance.handleComponentInstanceRelaunch(component2Instance, + componentInstanceEvent); + } + + Map succeeded1Instances = new HashMap<>(); + // 2nd component, already finished. + for (ComponentInstance component1Instance : component1Instances) { + succeeded1Instances.put(component1Instance.getCompInstanceName(), + component1Instance); + when(component1Instance.getComponent().getSuceededInstances()).thenReturn + (succeeded1Instances.values()); + ComponentInstance.handleComponentInstanceRelaunch(component1Instance, + componentInstanceEvent); + } + + verify(comp, times(2)).markAsSucceeded( + any(ComponentInstance.class)); + verify(comp, never()).markAsFailed( + any(ComponentInstance.class)); + verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, times(1)).terminate(eq(0)); + + + // Test case10: 2 components, 2 instances for each + // comp2 hasn't finished + // comp1 finished. + // Service should continue run. + + comp = createComponent( + serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 2, 0); + component1Instances = comp.getAllComponentInstances(); + + comp2 = createComponent( + componentInstance.getComponent().getScheduler(), + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 2, 1); + + component2Instances = + comp2.getAllComponentInstances(); + + + for(ComponentInstance component2Instance : component2Instances) { + ComponentInstance.handleComponentInstanceRelaunch(component2Instance, + componentInstanceEvent); + } + + succeeded1Instances = new HashMap<>(); + // 2nd component, already finished. + for (ComponentInstance component1Instance : component1Instances) { + succeeded1Instances.put(component1Instance.getCompInstanceName(), + component1Instance); + when(component1Instance.getComponent().getSuceededInstances()).thenReturn + (succeeded1Instances.values()); + ComponentInstance.handleComponentInstanceRelaunch(component1Instance, + componentInstanceEvent); + } + + verify(comp, times(2)).markAsSucceeded( + any(ComponentInstance.class)); + verify(comp, never()).markAsFailed( + any(ComponentInstance.class)); + verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(ComponentInstance.terminationHandler, never()).terminate(eq + (0)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md index fab33c51e89..dc2a1bb8696 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md @@ -233,6 +233,7 @@ One or more components of the service. If the service is HBase say, then the com |placement_policy|Advanced scheduling and placement policies for all containers of this component.|false|PlacementPolicy|| |configuration|Config properties for this component.|false|Configuration|| |quicklinks|A list of quicklink keys defined at the service level, and to be resolved by this component.|false|string array|| +|restart_policy|Restart policy of this component, could be NEVER/ALWAYS/ON_FAILURE|false|string|ALWAYS| ### ComponentState