diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml index cea8296bd4f..d90ae06e04e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml @@ -424,6 +424,14 @@ definitions: items: type: string description: A list of quicklink keys defined at the service level, and to be resolved by this component. + restartPolicy: + type: string + description: Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases) + enum: + - ALWAYS + - ON_FAILURE + - NEVER + default: ALWAYS ReadinessCheck: description: A check to be performed to determine the readiness of a component instance (a container). If no readiness check is specified, the default readiness check will be used unless the yarn.service.default-readiness-check.enabled configuration property is set to false at the component, service, or system level. The artifact field is currently unsupported but may be implemented in the future, enabling a pluggable helper container to support advanced use cases. required: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index d5d6fa421be..85750de42bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -27,6 +27,7 @@ import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; @@ -130,7 +131,7 @@ public StopResponseProto stop(StopRequestProto requestProto) LOG.info("Stop the service by {}", UserGroupInformation.getCurrentUser()); context.scheduler.getDiagnostics() .append("Stopped by user " + UserGroupInformation.getCurrentUser()); - context.scheduler.setGracefulStop(); + context.scheduler.setGracefulStop(FinalApplicationStatus.ENDED); // Stop the service in 2 seconds delay to make sure this rpc call is completed. // shutdown hook will be executed which will stop AM gracefully. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java index 6c91b9cd5f2..87791536492 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java @@ -56,4 +56,8 @@ public ServiceManager getServiceManager() { void setServiceManager(ServiceManager serviceManager) { this.serviceManager = Preconditions.checkNotNull(serviceManager); } + + public Service getService() { + return service; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java index e6a38dc10c9..66b9374217a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; +import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -266,12 +267,24 @@ void processUpgradeRequest(String upgradeVersion, event.setAutoFinalize(true); } compsThatNeedUpgrade.forEach(component -> { - ComponentEvent needUpgradeEvent = new ComponentEvent( - component.getName(), ComponentEventType.UPGRADE) - .setTargetSpec(component) - .setUpgradeVersion(event.getVersion()); - context.scheduler.getDispatcher().getEventHandler().handle( - needUpgradeEvent); + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum + restartPolicy = component.getRestartPolicy(); + + final ComponentRestartPolicy restartPolicyHandler = + Component.getRestartPolicyHandler(restartPolicy); + // Do not allow upgrades for components which have NEVER/ON_FAILURE + // restart policy + if (restartPolicyHandler.allowUpgrades()) { + ComponentEvent needUpgradeEvent = new ComponentEvent( + component.getName(), ComponentEventType.UPGRADE).setTargetSpec( + component).setUpgradeVersion(event.getVersion()); + context.scheduler.getDispatcher().getEventHandler().handle( + needUpgradeEvent); + } else { + LOG.info("The component " + component.getName() + " has a restart " + + "policy that doesnt allow upgrades : " + component + .getRestartPolicy()); + } }); } else { // nothing to upgrade if upgrade auto finalize is requested, trigger a diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index ee0a1a73dd0..39dbcb93987 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service; +import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils; +import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.util.BoundedAppender; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; @@ -89,8 +91,10 @@ import java.text.MessageFormat; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -158,8 +162,15 @@ private boolean gracefulStop = false; + private volatile FinalApplicationStatus finalApplicationStatus = + FinalApplicationStatus.ENDED; + + // For unit test override since we don't want to terminate UT process. + private ServiceUtils.ProcessTerminationHandler + terminationHandler = new ServiceUtils.ProcessTerminationHandler(); + public ServiceScheduler(ServiceContext context) { - super(context.service.getName()); + super(context.getService().getName()); this.context = context; } @@ -256,8 +267,9 @@ protected NMClientAsync createNMClient() { .createAMRMClientAsync(1000, new AMRMClientCallback()); } - protected void setGracefulStop() { + public void setGracefulStop(FinalApplicationStatus applicationStatus) { this.gracefulStop = true; + this.finalApplicationStatus = applicationStatus; nmClient.getClient().cleanupRunningContainersOnStop(true); } @@ -877,4 +889,62 @@ public BoundedAppender getDiagnostics() { public boolean hasAtLeastOnePlacementConstraint() { return hasAtLeastOnePlacementConstraint; } + + /* +* Check if all components of the scheduler finished. +* If all components finished +* (which #failed-instances + #suceeded-instances = #total-n-containers) +* The service will be terminated. +*/ + public synchronized void terminateServiceIfAllComponentsFinished() { + boolean shouldTerminate = true; + + // Succeeded comps and failed comps, for logging purposes. + Set succeededComponents = new HashSet<>(); + Set failedComponents = new HashSet<>(); + + for (Component comp : getAllComponents().values()) { + long nSucceeded = comp.getNumSucceededInstances(); + long nFailed = comp.getNumFailedInstances(); + if (nSucceeded + nFailed < comp.getComponentSpec() + .getNumberOfContainers()) { + shouldTerminate = false; + break; + } + if (nFailed > 0) { + failedComponents.add(comp.getName()); + } else{ + succeededComponents.add(comp.getName()); + } + } + + if (shouldTerminate) { + LOG.info("All component finished, exiting Service Master... " + + ", final status=" + (failedComponents.isEmpty() ? + "Succeeded" : + "Failed")); + LOG.info("Succeeded components: [" + org.apache.commons.lang3.StringUtils + .join(succeededComponents, ",") + "]"); + LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils + .join(failedComponents, ",") + "]"); + + if (!failedComponents.isEmpty()) { + setGracefulStop(FinalApplicationStatus.FAILED); + getTerminationHandler().terminate(-1); + } else{ + setGracefulStop(FinalApplicationStatus.SUCCEEDED); + getTerminationHandler().terminate(0); + } + } + } + + public ServiceUtils.ProcessTerminationHandler getTerminationHandler() { + return terminationHandler; + } + + @VisibleForTesting + ServiceUtils.ProcessTerminationHandler setTerminationHandler(ServiceUtils + .ProcessTerminationHandler terminationHandler) { + return terminationHandler; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java index 7deb076eda6..2c66d3e68d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.service.api.records; +import com.fasterxml.jackson.annotation.JsonValue; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -29,7 +30,9 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlEnum; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlType; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; @@ -98,6 +101,66 @@ private List containers = Collections.synchronizedList(new ArrayList()); + + @JsonProperty("restart_policy") + @XmlElement(name = "restart_policy") + private RestartPolicyEnum restartPolicy = RestartPolicyEnum.ALWAYS; + + /** + * Policy of restart component. + * Including ALWAYS - Long lived components (Always restart component instance even if instance + * exit + * code = 0. ); + * ON_FAILURE (Only restart component instance if instance exit code != + * 0); + * NEVER (Do not restart in any cases) + * @return restartPolicy + **/ + @XmlType(name = "restart_policy") + @XmlEnum + public static enum RestartPolicyEnum { + ALWAYS("ALWAYS"), + + ON_FAILURE("ON_FAILURE"), + + NEVER("NEVER"); + private String value; + + RestartPolicyEnum(String value) { + this.value = value; + } + + @Override + @JsonValue + public String toString() { + return value; + } + } + + public Component restartPolicy(RestartPolicyEnum restartPolicyEnumVal) { + this.restartPolicy = restartPolicyEnumVal; + return this; + } + + /** + * Policy of restart component. + * Including ALWAYS (Always restart component instance even if instance exit + * code = 0); + * ON_FAILURE (Only restart component instance if instance exit code != + * 0); + * NEVER (Do not restart in any cases) + * @return restartPolicy + **/ + @ApiModelProperty(value = "Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases)") + public RestartPolicyEnum getRestartPolicy() { + return restartPolicy; + } + + public void setRestartPolicy(RestartPolicyEnum restartPolicy) { + this.restartPolicy = restartPolicy; + } + + /** * Name of the service component (mandatory). **/ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/AlwaysRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/AlwaysRestartPolicy.java new file mode 100644 index 00000000000..af907a7229a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/AlwaysRestartPolicy.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; + +/** + * Always restart policy allows for restarts for long live components which + * never terminate. + */ +public class AlwaysRestartPolicy implements ComponentRestartPolicy { + + private static AlwaysRestartPolicy INSTANCE = new AlwaysRestartPolicy(); + + private AlwaysRestartPolicy() {}; + + public static AlwaysRestartPolicy getInstance() { + return INSTANCE; + } + + @Override + public boolean isLongLived() { + return true; + } + + /** + * This is always false since these components never terminate + * @param component + * @return + */ + @Override + public boolean hasCompleted(Component component) { + return false; + } + + + /** + * This is always false since these components never terminate + * @param component + * @return + */ + @Override + public boolean hasCompletedSuccessfully(Component component) { + return false; + } + + @Override + public boolean shouldRelaunchInstance(ComponentInstance componentInstance, + ContainerStatus containerStatus) { + return true; + } + + @Override + public boolean isReadyForDownStream(Component dependentComponent) { + if (dependentComponent.getNumReadyInstances() + < dependentComponent.getNumDesiredInstances()) { + return false; + } + return true; + } + + @Override + public boolean allowUpgrades() { + return true; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 7979c19eb87..aa31ef54e2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -18,9 +18,12 @@ package org.apache.hadoop.yarn.service.component; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; +import static org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -111,6 +114,13 @@ // The number of containers failed since last reset. This excludes preempted, // disk_failed containers etc. This will be reset to 0 periodically. public AtomicInteger currentContainerFailure = new AtomicInteger(0); + + //succeeded and Failed instances are Populated only for RestartPolicyEnum + //.ON_FAILURE/NEVER + private Map succeededInstances = + new ConcurrentHashMap<>(); + private Map failedInstances = + new ConcurrentHashMap<>(); private boolean healthThresholdMonitorEnabled = false; private AtomicBoolean upgradeInProgress = new AtomicBoolean(false); @@ -297,7 +307,7 @@ private void checkAndScheduleHealthThresholdMonitor() { @Override public ComponentState transition(Component component, ComponentEvent event) { - component.setDesiredContainers((int)event.getDesired()); + component.setDesiredContainers((int) event.getDesired()); if (!component.areDependenciesReady()) { LOG.info("[FLEX COMPONENT {}]: Flex deferred because dependencies not" + " satisfied.", component.getName()); @@ -402,44 +412,73 @@ public void transition(Component component, ComponentEvent event) { } } - private static ComponentState checkIfStable(Component component) { - // if desired == running - if (component.componentMetrics.containersReady.value() == component - .getComponentSpec().getNumberOfContainers() && - component.numContainersThatNeedUpgrade.get() == 0) { + @VisibleForTesting + static ComponentState checkIfStable(Component component) { + if ( component.getRestartPolicyHandler().isLongLived()) { + return updateStateForLongRunningComponents(component); + } else { + //NEVER/ON_FAILURE + return updateStateForTerminatingComponents(component); + } + } + + private static ComponentState updateStateForTerminatingComponents( + Component component) { + if (component.getNumRunningInstances() + component + .getNumSucceededInstances() + component.getNumFailedInstances() + < component.getComponentSpec().getNumberOfContainers()) { + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); + return FLEXING; + } else{ component.componentSpec.setState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); return STABLE; + } + } + + private static ComponentState updateStateForLongRunningComponents + (Component component) { + // if desired == running + if (component.componentMetrics.containersReady.value() + == component.getComponentSpec().getNumberOfContainers() + && component.numContainersThatNeedUpgrade.get() == 0) { + component.componentSpec.setState(org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); + return STABLE; } else if (component.componentMetrics.containersReady.value() != component .getComponentSpec().getNumberOfContainers()) { - component.componentSpec.setState( - org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); + component.componentSpec.setState(org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); return FLEXING; - } else { + } else{ // component.numContainersThatNeedUpgrade.get() > 0 component.componentSpec.setState(org.apache.hadoop.yarn.service.api. records.ComponentState.NEEDS_UPGRADE); return UPGRADING; } } - // This method should be called whenever there is an increment or decrement // of a READY state container of a component - public static synchronized void checkAndUpdateComponentState( + //This should not matter for terminating components + private static synchronized void checkAndUpdateComponentState( Component component, boolean isIncrement) { org.apache.hadoop.yarn.service.api.records.ComponentState curState = component.componentSpec.getState(); - if (isIncrement) { - // check if all containers are in READY state - if (component.numContainersThatNeedUpgrade.get() == 0 && - component.componentMetrics.containersReady.value() == - component.componentMetrics.containersDesired.value()) { - component.componentSpec.setState( - org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); - if (curState != component.componentSpec.getState()) { - LOG.info("[COMPONENT {}] state changed from {} -> {}", - component.componentSpec.getName(), curState, - component.componentSpec.getState()); + + if (component.getRestartPolicyHandler().isLongLived()) { + if (isIncrement) { + // check if all containers are in READY state + if (component.numContainersThatNeedUpgrade.get() == 0 + && component.componentMetrics.containersReady.value() + == component.componentMetrics.containersDesired.value()) { + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); + if (curState != component.componentSpec.getState()) { + LOG.info("[COMPONENT {}] state changed from {} -> {}", + component.componentSpec.getName(), curState, + component.componentSpec.getState()); + } + // component state change will trigger re-check of service state + component.context.getServiceManager().checkAndUpdateServiceState(); } // component state change will trigger re-check of service state component.context.getServiceManager().checkAndUpdateServiceState(); @@ -464,25 +503,29 @@ public static synchronized void checkAndUpdateComponentState( // component state change will trigger re-check of service state component.context.getServiceManager().checkAndUpdateServiceState(); } - // when the service is stable then the state of component needs to - // transition to stable - component.dispatcher.getEventHandler().handle(new ComponentEvent( - component.getName(), ComponentEventType.CHECK_STABLE)); } private static class ContainerCompletedTransition extends BaseTransition { @Override public void transition(Component component, ComponentEvent event) { + component.updateMetrics(event.getStatus()); component.dispatcher.getEventHandler().handle( - new ComponentInstanceEvent(event.getStatus().getContainerId(), - STOP).setStatus(event.getStatus())); - component.componentSpec.setState( - org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); - if (component.context.service.getState().equals(ServiceState.STABLE)) { - component.getScheduler().getApp().setState(ServiceState.STARTED); - LOG.info("Service def state changed from {} -> {}", - ServiceState.STABLE, ServiceState.STARTED); + new ComponentInstanceEvent(event.getStatus().getContainerId(), STOP) + .setStatus(event.getStatus())); + + ComponentRestartPolicy restartPolicy = + component.getRestartPolicyHandler(); + + if (restartPolicy.shouldRelaunchInstance(event.getInstance(), event.getStatus())) { + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); + + if (component.context.service.getState().equals(ServiceState.STABLE)) { + component.getScheduler().getApp().setState(ServiceState.STARTED); + LOG.info("Service def state changed from {} -> {}", + ServiceState.STABLE, ServiceState.STARTED); + } } } } @@ -750,10 +793,9 @@ private void updateMetrics(ContainerStatus status) { scheduler.getServiceMetrics().containersFailed.incr(); if (Apps.shouldCountTowardsNodeBlacklisting(status.getExitStatus())) { - String host = scheduler.getLiveInstances().get(status.getContainerId()) - .getNodeId().getHost(); - failureTracker.incNodeFailure(host); - currentContainerFailure.getAndIncrement() ; + String host = scheduler.getLiveInstances().get(status.getContainerId()).getNodeId().getHost(); + failureTracker.incNodeFailure(host); + currentContainerFailure.getAndIncrement(); } } @@ -763,25 +805,28 @@ public boolean areDependenciesReady() { return true; } for (String dependency : dependencies) { - Component dependentComponent = - scheduler.getAllComponents().get(dependency); + Component dependentComponent = scheduler.getAllComponents().get( + dependency); if (dependentComponent == null) { LOG.error("Couldn't find dependency {} for {} (should never happen)", dependency, getName()); continue; } - if (dependentComponent.getNumReadyInstances() < dependentComponent - .getNumDesiredInstances()) { + + if (!dependentComponent.isReadyForDownstream()) { LOG.info("[COMPONENT {}]: Dependency {} not satisfied, only {} of {}" - + " instances are ready.", getName(), dependency, + + " instances are ready or the dependent component has not " + + "completed - {} ", getName(), dependency, dependentComponent.getNumReadyInstances(), - dependentComponent.getNumDesiredInstances()); + dependentComponent.getNumDesiredInstances(), dependentComponent + .getNumRunningInstances() + " instances are still running."); return false; } } return true; } + public Map getDependencyHostIpTokens() { Map tokens = new HashMap<>(); List dependencies = componentSpec.getDependencies(); @@ -955,4 +1000,71 @@ public void setHealthThresholdMonitorEnabled( boolean healthThresholdMonitorEnabled) { this.healthThresholdMonitorEnabled = healthThresholdMonitorEnabled; } + + public Collection getSucceededInstances() { + return succeededInstances.values(); + } + + public long getNumSucceededInstances() { + return succeededInstances.size(); + } + + public long getNumFailedInstances() { + return failedInstances.size(); + } + + public Collection getFailedInstances() { + return failedInstances.values(); + } + + public synchronized void markAsSucceeded(ComponentInstance + instance) { + removeFailedInstanceIfExists(instance); + succeededInstances.put(instance.getCompInstanceName(), instance); + } + + public synchronized void markAsFailed + (ComponentInstance instance) { + removeSuccessfulInstanceIfExists(instance); + failedInstances.put(instance.getCompInstanceName(), instance); + } + + public boolean removeFailedInstanceIfExists + (ComponentInstance instance) { + if ( failedInstances.containsKey(instance.getCompInstanceName())) { + failedInstances.remove(instance.getCompInstanceName()); + return true; + } + return false; + } + + public boolean removeSuccessfulInstanceIfExists + (ComponentInstance instance) { + if ( succeededInstances.containsKey(instance.getCompInstanceName())) { + succeededInstances.remove(instance.getCompInstanceName()); + return true; + } + return false; + } + + public boolean isReadyForDownstream() { + return getRestartPolicyHandler().isReadyForDownStream(this); + } + + public static ComponentRestartPolicy getRestartPolicyHandler + (RestartPolicyEnum restartPolicyEnum) { + + if ( RestartPolicyEnum.NEVER == restartPolicyEnum) { + return NeverRestartPolicy.getInstance(); + } else if ( RestartPolicyEnum.ON_FAILURE == restartPolicyEnum) { + return OnFailureRestartPolicy.getInstance(); + } else { + return AlwaysRestartPolicy.getInstance(); + } + } + + public ComponentRestartPolicy getRestartPolicyHandler() { + RestartPolicyEnum restartPolicyEnum = getComponentSpec().getRestartPolicy(); + return getRestartPolicyHandler(restartPolicyEnum); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentRestartPolicy.java new file mode 100644 index 00000000000..deb8638a5eb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentRestartPolicy.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; + +public interface ComponentRestartPolicy { + + boolean isLongLived(); + + boolean hasCompleted(Component component); + + boolean hasCompletedSuccessfully(Component component); + + boolean shouldRelaunchInstance(ComponentInstance componentInstance, + ContainerStatus containerStatus); + + boolean isReadyForDownStream(Component component); + + boolean allowUpgrades(); + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java new file mode 100644 index 00000000000..45e2aad5d3f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; + +public class NeverRestartPolicy implements ComponentRestartPolicy { + + private static NeverRestartPolicy INSTANCE = new NeverRestartPolicy(); + + private NeverRestartPolicy() {}; + + public static NeverRestartPolicy getInstance() { + return INSTANCE; + } + + @Override + public boolean isLongLived() { + return false; + } + + @Override + public boolean hasCompleted(Component component) { + if (component.getNumSucceededInstances() + component + .getNumFailedInstances() < component.getNumDesiredInstances()) { + return false; + } + return true; + } + + @Override + public boolean hasCompletedSuccessfully(Component component) { + if (component.getNumSucceededInstances() == component + .getNumDesiredInstances()) { + return true; + } + return false; + } + + @Override + public boolean shouldRelaunchInstance(ComponentInstance componentInstance, + ContainerStatus containerStatus) { + return false; + } + + @Override + public boolean isReadyForDownStream(Component component) { + if ( hasCompleted(component) ) { + return true; + } + return false; + } + + @Override + public boolean allowUpgrades() { + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java new file mode 100644 index 00000000000..0d28c6f466b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; + +public class OnFailureRestartPolicy implements ComponentRestartPolicy { + + private static OnFailureRestartPolicy INSTANCE = new OnFailureRestartPolicy(); + + private OnFailureRestartPolicy() {}; + + public static OnFailureRestartPolicy getInstance() { + return INSTANCE; + } + + @Override + public boolean isLongLived() { + return false; + } + + @Override + public boolean hasCompleted(Component component) { + if ( hasCompletedSuccessfully(component)) { + return true; + } + + return false; + } + + @Override + public boolean hasCompletedSuccessfully(Component component) { + if ( component.getNumSucceededInstances() == component + .getNumDesiredInstances()) { + return true; + } + + return false; + } + + @Override + public boolean shouldRelaunchInstance(ComponentInstance componentInstance, + ContainerStatus containerStatus) { + + if ( ComponentInstance.hasContainerFailed(containerStatus)) { + return true; + } + + return false; + } + + @Override + public boolean isReadyForDownStream(Component component) { + if (hasCompletedSuccessfully(component)) { + return true; + } + + return false; + } + + @Override + public boolean allowUpgrades() { + return false; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 4aca0ea765a..00ef5516f20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service.component.instance; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.registry.client.api.RegistryConstants; @@ -25,9 +26,9 @@ import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; +import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy; import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; @@ -58,6 +60,8 @@ import java.io.IOException; import java.text.MessageFormat; import java.util.Date; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -95,8 +99,10 @@ // This container object is used for rest API query private org.apache.hadoop.yarn.service.api.records.Container containerSpec; + private static final StateMachineFactory + ComponentInstanceState, ComponentInstanceEventType, + ComponentInstanceEvent> stateMachineFactory = new StateMachineFactory(INIT) @@ -220,6 +226,45 @@ public void transition(ComponentInstance compInstance, } } + @VisibleForTesting + static void handleComponentInstanceRelaunch( + ComponentInstance compInstance, ComponentInstanceEvent event) { + Component comp = compInstance.getComponent(); + + // Do we need to relaunch the service? + boolean hasContainerFailed = hasContainerFailed(event.getStatus()); + + ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler(); + + if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) { + comp.reInsertPendingInstance(compInstance); + LOG.info(compInstance.getCompInstanceId() + + ": {} completed. Reinsert back to pending list and requested " + + "a new container." + System.lineSeparator() + + " exitStatus={}, diagnostics={}.", + event.getContainerId(), event.getStatus().getExitStatus(), + event.getStatus().getDiagnostics()); + } else { + // When no relaunch, update component's #succeeded/#failed + // instances. + if (hasContainerFailed) { + comp.markAsFailed(compInstance); + } else { + comp.markAsSucceeded(compInstance); + } + LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ? + " succeeded" : + " failed") + " without retry, exitStatus=" + event.getStatus()); + comp.getScheduler().terminateServiceIfAllComponentsFinished(); + } + } + + public static boolean hasContainerFailed(ContainerStatus containerStatus) { + //Mark conainer as failed if we cant get its exit status i.e null? + return containerStatus == null || containerStatus.getExitStatus() != + ContainerExitStatus.SUCCESS; + } + private static class ContainerStoppedTransition extends BaseTransition { // whether the container failed before launched by AM or not. boolean failedBeforeLaunching = false; @@ -249,7 +294,10 @@ public void transition(ComponentInstance compInstance, compInstance.component.decContainersReady(true); } compInstance.component.decRunningContainers(); - boolean shouldExit = false; + // Should we fail (terminate) the service? + boolean shouldFailService = false; + + final ServiceScheduler scheduler = comp.getScheduler(); // Check if it exceeds the failure threshold, but only if health threshold // monitor is not enabled if (!comp.isHealthThresholdMonitorEnabled() @@ -261,10 +309,10 @@ public void transition(ComponentInstance compInstance, comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp); compInstance.diagnostics.append(exitDiag); // append to global diagnostics that will be reported to RM. - comp.getScheduler().getDiagnostics().append(containerDiag); - comp.getScheduler().getDiagnostics().append(exitDiag); + scheduler.getDiagnostics().append(containerDiag); + scheduler.getDiagnostics().append(exitDiag); LOG.warn(exitDiag); - shouldExit = true; + shouldFailService = true; } if (!failedBeforeLaunching) { @@ -286,25 +334,14 @@ public void transition(ComponentInstance compInstance, } // remove the failed ContainerId -> CompInstance mapping - comp.getScheduler().removeLiveCompInstance(event.getContainerId()); + scheduler.removeLiveCompInstance(event.getContainerId()); - comp.reInsertPendingInstance(compInstance); + // According to component restart policy, handle container restart + // or finish the service (if all components finished) + handleComponentInstanceRelaunch(compInstance, event); - LOG.info(compInstance.getCompInstanceId() - + ": {} completed. Reinsert back to pending list and requested " + - "a new container." + System.lineSeparator() + - " exitStatus={}, diagnostics={}.", - event.getContainerId(), event.getStatus().getExitStatus(), - event.getStatus().getDiagnostics()); - if (shouldExit) { - // Sleep for 5 seconds in hope that the state can be recorded in ATS. - // in case there's a client polling the comp state, it can be notified. - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - LOG.error("Interrupted on sleep while exiting.", e); - } - ExitUtil.terminate(-1); + if (shouldFailService) { + scheduler.getTerminationHandler().terminate(-1); } } } @@ -603,4 +640,9 @@ public int compareTo(ComponentInstance to) { >>> 32)); return result; } + + @VisibleForTesting + public org.apache.hadoop.yarn.service.api.records.Container getContainerSpec() { + return containerSpec; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java index 915b8361343..d76346a8a3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; @@ -571,4 +572,17 @@ public static String getLocalHostName(@Nullable Configuration conf) // Fallback to querying the default hostname as we did before. return InetAddress.getLocalHost().getCanonicalHostName(); } + + public static class ProcessTerminationHandler { + public void terminate(int exitCode) { + // Sleep for 5 seconds in hope that the state can be recorded in ATS. + // in case there's a client polling the comp state, it can be notified. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + log.info("Interrupted on sleep while exiting.", e); + } + ExitUtil.terminate(exitCode); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 599b8a7a0ae..05bee0a2dd2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -57,6 +57,8 @@ import java.io.OutputStream; import java.net.URL; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM; @@ -99,8 +101,28 @@ public static Service createExampleApplication() { return exampleApp; } + // Example service definition + // 2 components, each of which has 2 containers. + public static Service createTerminatingJobExample(String serviceName) { + Service exampleApp = new Service(); + exampleApp.setName(serviceName); + exampleApp.setVersion("v1"); + exampleApp.addComponent(createComponent("terminating-comp1", 2, "sleep " + + "1000", + Component.RestartPolicyEnum.NEVER, null)); + exampleApp.addComponent(createComponent("terminating-comp2", 2 , + "sleep 1000", Component.RestartPolicyEnum.ON_FAILURE, + new ArrayList() {{ add("terminating-comp1"); }})); + exampleApp.addComponent(createComponent("terminating-comp3", 2 , + "sleep 1000", Component.RestartPolicyEnum.ON_FAILURE, new + ArrayList() {{ add("terminating-comp2"); }})); + + return exampleApp; + } + public static Component createComponent(String name) { - return createComponent(name, 2L, "sleep 1000"); + return createComponent(name, 2L, + "sleep 1000", Component.RestartPolicyEnum.ALWAYS, null); } protected static Component createComponent(String name, long numContainers, @@ -116,6 +138,17 @@ protected static Component createComponent(String name, long numContainers, return comp1; } + protected static Component createComponent(String name, long numContainers, + String command, Component.RestartPolicyEnum restartPolicyEnum, List dependencies) { + Component comp = createComponent(name, numContainers, command); + comp.setRestartPolicy(restartPolicyEnum); + + if ( dependencies != null) { + comp.dependencies(dependencies); + } + return comp; + } + public static SliderFileSystem initMockFs() throws IOException { return initMockFs(null); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java index 56a0c71efd2..fc509f19420 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java @@ -227,14 +227,16 @@ protected YarnRegistryViewForProviders createYarnRegistryOperations( } public static Service createBaseDef(String name) { + return createDef(name, ServiceTestUtils.createExampleApplication()); + } + + public static Service createDef(String name, Service serviceDef) { ApplicationId applicationId = ApplicationId.newInstance( System.currentTimeMillis(), 1); - Service serviceDef = ServiceTestUtils.createExampleApplication(); serviceDef.setId(applicationId.toString()); serviceDef.setName(name); serviceDef.setState(ServiceState.STARTED); Artifact artifact = createTestArtifact("1"); - serviceDef.getComponents().forEach(component -> component.setArtifact(artifact)); return serviceDef; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index 600e438e010..000d46bb1f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -34,12 +34,16 @@ import org.apache.hadoop.yarn.service.ServiceTestUtils; import org.apache.hadoop.yarn.service.TestServiceManager; import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; + import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; +import org.apache.hadoop.yarn.service.monitor.ServiceMonitor; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -50,6 +54,8 @@ import java.util.Map; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP; + +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -60,6 +66,9 @@ */ public class TestComponent { + private static final int WAIT_MS_PER_LOOP = 1000; + static final Logger LOG = Logger.getLogger(TestComponent.class); + @Rule public ServiceTestUtils.ServiceFSWatcher rule = new ServiceTestUtils.ServiceFSWatcher(); @@ -158,6 +167,53 @@ public void testContainerCompletedWhenUpgrading() throws Exception { comp.getComponentSpec().getConfiguration().getEnv("key1")); } + @Test + public void testComponentStateUpdatesWithTerminatingComponents() throws + Exception { + final String serviceName = + "testComponentStateUpdatesWithTerminatingComponents"; + + Service testService = ServiceTestUtils.createTerminatingJobExample(serviceName); + TestServiceManager.createDef(serviceName, testService); + + ServiceContext context = createTestContext(rule, testService); + + for (Component comp : context.scheduler.getAllComponents().values()){ + + Iterator instanceIter = comp. + getAllComponentInstances().iterator(); + + ComponentInstance componentInstance = instanceIter.next(); + Container instanceContainer = componentInstance.getContainer(); + + Assert.assertEquals(0, comp.getNumSucceededInstances()); + Assert.assertEquals(0, comp.getNumFailedInstances()); + Assert.assertEquals(2, comp.getNumRunningInstances()); + Assert.assertEquals(2, comp.getNumReadyInstances()); + Assert.assertEquals(0, comp.getPendingInstances().size()); + + //stop 1 container + ContainerStatus containerStatus = ContainerStatus.newInstance( + instanceContainer.getId(), org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + "successful", 0); + comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus)); + componentInstance.handle( + new ComponentInstanceEvent(componentInstance.getContainer().getId(), + ComponentInstanceEventType.STOP).setStatus(containerStatus)); + + Assert.assertEquals(1, comp.getNumSucceededInstances()); + Assert.assertEquals(0, comp.getNumFailedInstances()); + Assert.assertEquals(1, comp.getNumRunningInstances()); + Assert.assertEquals(1, comp.getNumReadyInstances()); + Assert.assertEquals(0, comp.getPendingInstances().size()); + + org.apache.hadoop.yarn.service.component.ComponentState componentState = + Component.checkIfStable(comp); + Assert.assertEquals(org.apache.hadoop.yarn.service.component.ComponentState.STABLE, + componentState); + } + } + private static org.apache.hadoop.yarn.service.api.records.Component createSpecWithEnv(String serviceName, String compName, String key, String val) { @@ -171,8 +227,15 @@ public void testContainerCompletedWhenUpgrading() throws Exception { public static ServiceContext createTestContext( ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName) throws Exception { + return createTestContext(fsWatcher, TestServiceManager.createBaseDef + (serviceName)); + } + + public static ServiceContext createTestContext( + ServiceTestUtils.ServiceFSWatcher fsWatcher, Service serviceDef) + throws Exception { ServiceContext context = new ServiceContext(); - context.service = TestServiceManager.createBaseDef(serviceName); + context.service = serviceDef; context.fs = fsWatcher.getFs(); ContainerLaunchService mockLaunchService = mock( @@ -223,6 +286,8 @@ private static void stabilizeComponents(ServiceContext context) { context.attemptId = attemptId; Map componentState = context.scheduler.getAllComponents(); + + int counter = 0; for (org.apache.hadoop.yarn.service.api.records.Component componentSpec : context.service.getComponents()) { Component component = new org.apache.hadoop.yarn.service.component. @@ -230,9 +295,12 @@ private static void stabilizeComponents(ServiceContext context) { componentState.put(component.getName(), component); component.handle(new ComponentEvent(component.getName(), ComponentEventType.FLEX)); + for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) { - assignNewContainer(attemptId, i + 1, context, component); + counter++; + assignNewContainer(attemptId, counter, context, component); } + component.handle(new ComponentEvent(component.getName(), ComponentEventType.CHECK_STABLE)); } @@ -241,6 +309,8 @@ private static void stabilizeComponents(ServiceContext context) { private static void assignNewContainer( ApplicationAttemptId attemptId, long containerNum, ServiceContext context, Component component) { + + Container container = org.apache.hadoop.yarn.api.records.Container .newInstance(ContainerId.newContainerId(attemptId, containerNum), NODE_ID, "localhost", null, null, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java new file mode 100644 index 00000000000..25c2d333139 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestComponentRestartPolicy { + + @Test + public void testAlwaysRestartPolicy() throws Exception { + + AlwaysRestartPolicy alwaysRestartPolicy = AlwaysRestartPolicy.getInstance(); + + Component component = mock(Component.class); + when(component.getNumReadyInstances()).thenReturn(1); + when(component.getNumDesiredInstances()).thenReturn(2); + + ComponentInstance instance = mock(ComponentInstance.class); + when(instance.getComponent()).thenReturn(component); + + ContainerStatus containerStatus = mock(ContainerStatus.class); + + assertEquals(true, alwaysRestartPolicy.isLongLived()); + assertEquals(true, alwaysRestartPolicy.allowUpgrades()); + assertEquals(false, alwaysRestartPolicy.hasCompleted(component)); + assertEquals(false, + alwaysRestartPolicy.hasCompletedSuccessfully(component)); + + assertEquals(true, + alwaysRestartPolicy.shouldRelaunchInstance(instance, containerStatus)); + + assertEquals(false, alwaysRestartPolicy.isReadyForDownStream(component)); + } + + @Test + public void testNeverRestartPolicy() throws Exception { + + NeverRestartPolicy restartPolicy = NeverRestartPolicy.getInstance(); + + Component component = mock(Component.class); + when(component.getNumSucceededInstances()).thenReturn(new Long(1)); + when(component.getNumFailedInstances()).thenReturn(new Long(2)); + when(component.getNumDesiredInstances()).thenReturn(3); + + ComponentInstance instance = mock(ComponentInstance.class); + when(instance.getComponent()).thenReturn(component); + + ContainerStatus containerStatus = mock(ContainerStatus.class); + + assertEquals(false, restartPolicy.isLongLived()); + assertEquals(false, restartPolicy.allowUpgrades()); + assertEquals(true, restartPolicy.hasCompleted(component)); + assertEquals(false, + restartPolicy.hasCompletedSuccessfully(component)); + + assertEquals(false, + restartPolicy.shouldRelaunchInstance(instance, containerStatus)); + + assertEquals(true, restartPolicy.isReadyForDownStream(component)); + } + + @Test + public void testOnFailureRestartPolicy() throws Exception { + + OnFailureRestartPolicy restartPolicy = OnFailureRestartPolicy.getInstance(); + + Component component = mock(Component.class); + when(component.getNumSucceededInstances()).thenReturn(new Long(3)); + when(component.getNumFailedInstances()).thenReturn(new Long(0)); + when(component.getNumDesiredInstances()).thenReturn(3); + + ComponentInstance instance = mock(ComponentInstance.class); + when(instance.getComponent()).thenReturn(component); + + ContainerStatus containerStatus = mock(ContainerStatus.class); + when(containerStatus.getExitStatus()).thenReturn(0); + + assertEquals(false, restartPolicy.isLongLived()); + assertEquals(false, restartPolicy.allowUpgrades()); + assertEquals(true, restartPolicy.hasCompleted(component)); + assertEquals(true, + restartPolicy.hasCompletedSuccessfully(component)); + + assertEquals(false, + restartPolicy.shouldRelaunchInstance(instance, containerStatus)); + + assertEquals(true, restartPolicy.isReadyForDownStream(component)); + + + when(component.getNumSucceededInstances()).thenReturn(new Long(2)); + when(component.getNumFailedInstances()).thenReturn(new Long(1)); + when(component.getNumDesiredInstances()).thenReturn(3); + + assertEquals(false, restartPolicy.hasCompleted(component)); + assertEquals(false, + restartPolicy.hasCompletedSuccessfully(component)); + + when(containerStatus.getExitStatus()).thenReturn(-1000); + + assertEquals(true, + restartPolicy.shouldRelaunchInstance(instance, containerStatus)); + + assertEquals(false, restartPolicy.isReadyForDownStream(component)); + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index 0b56d7ef19c..f94b0f63770 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -18,17 +18,42 @@ package org.apache.hadoop.yarn.service.component.instance; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.service.ServiceContext; +import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.ServiceTestUtils; import org.apache.hadoop.yarn.service.api.records.Container; import org.apache.hadoop.yarn.service.api.records.ContainerState; +import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; import org.apache.hadoop.yarn.service.component.TestComponent; +import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests for {@link ComponentInstance}. @@ -43,31 +68,33 @@ public void testContainerUpgrade() throws Exception { ServiceContext context = TestComponent.createTestContext(rule, "testContainerUpgrade"); - Component component = context.scheduler.getAllComponents().entrySet() - .iterator().next().getValue(); + Component component = + context.scheduler.getAllComponents().entrySet().iterator().next() + .getValue(); upgradeComponent(component); - ComponentInstance instance = component.getAllComponentInstances() - .iterator().next(); + ComponentInstance instance = + component.getAllComponentInstances().iterator().next(); ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); instance.handle(instanceEvent); Container containerSpec = component.getComponentSpec().getContainer( instance.getContainer().getId().toString()); - Assert.assertEquals("instance not upgrading", - ContainerState.UPGRADING, containerSpec.getState()); + Assert.assertEquals("instance not upgrading", ContainerState.UPGRADING, + containerSpec.getState()); } @Test public void testContainerReadyAfterUpgrade() throws Exception { ServiceContext context = TestComponent.createTestContext(rule, "testContainerStarted"); - Component component = context.scheduler.getAllComponents().entrySet() - .iterator().next().getValue(); + Component component = + context.scheduler.getAllComponents().entrySet().iterator().next() + .getValue(); upgradeComponent(component); - ComponentInstance instance = component.getAllComponentInstances() - .iterator().next(); + ComponentInstance instance = + component.getAllComponentInstances().iterator().next(); ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent( instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); @@ -75,14 +102,416 @@ public void testContainerReadyAfterUpgrade() throws Exception { instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), ComponentInstanceEventType.BECOME_READY)); - Assert.assertEquals("instance not ready", - ContainerState.READY, instance.getCompSpec().getContainer( - instance.getContainer().getId().toString()).getState()); + Assert.assertEquals("instance not ready", ContainerState.READY, + instance.getCompSpec() + .getContainer(instance.getContainer().getId().toString()) + .getState()); } private void upgradeComponent(Component component) { component.handle(new ComponentEvent(component.getName(), - ComponentEventType.UPGRADE) - .setTargetSpec(component.getComponentSpec()).setUpgradeVersion("v2")); + ComponentEventType.UPGRADE).setTargetSpec(component.getComponentSpec()) + .setUpgradeVersion("v2")); + } + + private Component createComponent( + ServiceScheduler scheduler, + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum restartPolicy, + int nSucceededInstances, int nFailedInstances, int totalAsk, int + componentId) { + + assert (nSucceededInstances + nFailedInstances) <= totalAsk; + + Component comp = mock(Component.class); + org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock( + org.apache.hadoop.yarn.service.api.records.Component.class); + when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy); + when(comp.getRestartPolicyHandler()).thenReturn(Component + .getRestartPolicyHandler(restartPolicy)); + when(componentSpec.getNumberOfContainers()).thenReturn( + Long.valueOf(totalAsk)); + when(comp.getComponentSpec()).thenReturn(componentSpec); + when(comp.getScheduler()).thenReturn(scheduler); + + Map succeeded = new ConcurrentHashMap<>(); + Map failed = new ConcurrentHashMap<>(); + scheduler.getAllComponents().put("comp" + componentId, comp); + + Map componentInstances = new HashMap<>(); + + for (int i = 0; i < nSucceededInstances; i++) { + ComponentInstance componentInstance = createComponentInstance(comp, i); + componentInstances.put(componentInstance.getCompInstanceName(), + componentInstance); + succeeded.put(componentInstance.getCompInstanceName(), componentInstance); + } + + for (int i = 0; i < nFailedInstances; i++) { + ComponentInstance componentInstance = createComponentInstance(comp, + i + nSucceededInstances); + componentInstances.put(componentInstance.getCompInstanceName(), + componentInstance); + failed.put(componentInstance.getCompInstanceName(), componentInstance); + } + + int delta = totalAsk - nFailedInstances - nSucceededInstances; + + for (int i = 0; i < delta; i++) { + ComponentInstance componentInstance = createComponentInstance(comp, + i + nSucceededInstances + nFailedInstances); + componentInstances.put(componentInstance.getCompInstanceName(), + componentInstance); + } + + when(comp.getAllComponentInstances()).thenReturn( + componentInstances.values()); + when(comp.getSucceededInstances()).thenReturn(succeeded.values()); + when(comp.getFailedInstances()).thenReturn(failed.values()); + return comp; + } + + private Component createComponent( + ServiceScheduler scheduler, + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum restartPolicy, + int totalAsk, int + componentId) { + + Component comp = mock(Component.class); + org.apache.hadoop.yarn.service.api.records.Component componentSpec = mock( + org.apache.hadoop.yarn.service.api.records.Component.class); + when(componentSpec.getRestartPolicy()).thenReturn(restartPolicy); + when(comp.getRestartPolicyHandler()).thenReturn(Component + .getRestartPolicyHandler(restartPolicy)); + when(componentSpec.getNumberOfContainers()).thenReturn( + Long.valueOf(totalAsk)); + when(comp.getComponentSpec()).thenReturn(componentSpec); + when(comp.getScheduler()).thenReturn(scheduler); + + scheduler.getAllComponents().put("comp" + componentId, comp); + + Map componentInstances = new HashMap<>(); + + for (int i = 0; i < totalAsk; i++) { + ComponentInstance componentInstance = createComponentInstance(comp, i); + componentInstances.put(componentInstance.getCompInstanceName(), + componentInstance); + } + + when(comp.getAllComponentInstances()).thenReturn( + componentInstances.values()); + return comp; + } + + private ComponentInstance createComponentInstance(Component component, + int instanceId) { + + ComponentInstance componentInstance = mock(ComponentInstance.class); + when(componentInstance.getComponent()).thenReturn(component); + when(componentInstance.getCompInstanceName()).thenReturn("compInstance" + + instanceId); + + ServiceUtils.ProcessTerminationHandler terminationHandler = mock + (ServiceUtils.ProcessTerminationHandler.class); + when(component.getScheduler().getTerminationHandler()).thenReturn + (terminationHandler); + + return componentInstance; + } + + @Test + public void testComponentRestartPolicy() { + + Map allComponents = new HashMap<>(); + Service mockService = mock(Service.class); + ServiceContext serviceContext = mock(ServiceContext.class); + when(serviceContext.getService()).thenReturn(mockService); + ServiceScheduler serviceSchedulerInstance = new ServiceScheduler( + serviceContext); + ServiceScheduler serviceScheduler = spy(serviceSchedulerInstance); + when(serviceScheduler.getAllComponents()).thenReturn(allComponents); + Mockito.doNothing().when(serviceScheduler).setGracefulStop( + any(FinalApplicationStatus.class)); + + ComponentInstanceEvent componentInstanceEvent = mock( + ComponentInstanceEvent.class); + ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId + .newInstance(ApplicationId.newInstance(1234L, 1), 1), 1); + ContainerStatus containerStatus = ContainerStatus.newInstance(containerId, + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, "hello", 0); + + when(componentInstanceEvent.getStatus()).thenReturn(containerStatus); + + // Test case1: one component, one instance, restart policy = ALWAYS, exit=0 + Component comp = createComponent(serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.ALWAYS, + 1, 0, 1, 0); + ComponentInstance componentInstance = + comp.getAllComponentInstances().iterator().next(); + + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + + verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); + verify(comp, never()).markAsFailed(any(ComponentInstance.class)); + verify(comp, times(1)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(serviceScheduler.getTerminationHandler(), never()).terminate( + anyInt()); + + // Test case2: one component, one instance, restart policy = ALWAYS, exit=1 + comp = createComponent(serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.ALWAYS, + 0, 1, 1, 0); + componentInstance = comp.getAllComponentInstances().iterator().next(); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); + verify(comp, never()).markAsFailed(any(ComponentInstance.class)); + verify(comp, times(1)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(serviceScheduler.getTerminationHandler(), never()).terminate( + anyInt()); + + // Test case3: one component, one instance, restart policy = NEVER, exit=0 + // Should exit with code=0 + comp = createComponent(serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 1, 0, 1, 0); + componentInstance = comp.getAllComponentInstances().iterator().next(); + containerStatus.setExitStatus(0); + + Map succeededInstances = new HashMap<>(); + succeededInstances.put(componentInstance.getCompInstanceName(), + componentInstance); + when(comp.getSucceededInstances()).thenReturn(succeededInstances.values()); + + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + verify(comp, times(1)).markAsSucceeded(any(ComponentInstance.class)); + verify(comp, never()).markAsFailed(any(ComponentInstance.class)); + verify(comp, times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(serviceScheduler.getTerminationHandler(), times(1)).terminate(eq(0)); + + // Test case4: one component, one instance, restart policy = NEVER, exit=1 + // Should exit with code=-1 + comp = createComponent(serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 0, 1, 1, 0); + componentInstance = comp.getAllComponentInstances().iterator().next(); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); + verify(comp, times(1)).markAsFailed(any(ComponentInstance.class)); + verify(comp, times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(serviceScheduler.getTerminationHandler(), times(1)).terminate( + eq(-1)); + + // Test case5: one component, one instance, restart policy = ON_FAILURE, + // exit=1 + // Should continue run. + comp = createComponent(serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.ON_FAILURE, + 0, 1, 1, 0); + componentInstance = comp.getAllComponentInstances().iterator().next(); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); + verify(comp, never()).markAsFailed(any(ComponentInstance.class)); + verify(comp, times(1)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(serviceScheduler.getTerminationHandler(), times(0)).terminate( + anyInt()); + + // Test case6: one component, 3 instances, restart policy = NEVER, exit=1 + // 2 of the instances not completed, it should continue run. + comp = createComponent(serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 0, 1, 3, 0); + componentInstance = comp.getAllComponentInstances().iterator().next(); + containerStatus.setExitStatus(1); + ComponentInstance.handleComponentInstanceRelaunch(componentInstance, + componentInstanceEvent); + verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); + verify(comp, times(1)).markAsFailed(any(ComponentInstance.class)); + verify(comp, times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(serviceScheduler.getTerminationHandler(), times(0)).terminate( + anyInt()); + + // Test case7: one component, 3 instances, restart policy = ON_FAILURE, + // exit=1 + // 2 of the instances completed, it should continue run. + + comp = createComponent(serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.ON_FAILURE, + 0, 1, 3, 0); + + Iterator iter = + comp.getAllComponentInstances().iterator(); + + containerStatus.setExitStatus(1); + ComponentInstance commponentInstance = iter.next(); + ComponentInstance.handleComponentInstanceRelaunch(commponentInstance, + componentInstanceEvent); + + verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); + verify(comp, never()).markAsFailed(any(ComponentInstance.class)); + verify(comp, times(1)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(serviceScheduler.getTerminationHandler(), times(0)).terminate( + anyInt()); + + // Test case8: 2 components, 2 instances for each + // comp2 already finished. + // comp1 has a new instance finish, we should terminate the service + + comp = createComponent(serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 2, 0); + Collection component1Instances = + comp.getAllComponentInstances(); + + containerStatus.setExitStatus(1); + + Component comp2 = createComponent( + componentInstance.getComponent().getScheduler(), + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 2, 1); + + Collection component2Instances = + comp2.getAllComponentInstances(); + + Map failed2Instances = new HashMap<>(); + + for (ComponentInstance component2Instance : component2Instances) { + failed2Instances.put(component2Instance.getCompInstanceName(), + component2Instance); + when(component2Instance.getComponent().getFailedInstances()).thenReturn( + failed2Instances.values()); + ComponentInstance.handleComponentInstanceRelaunch(component2Instance, + componentInstanceEvent); + } + + Map failed1Instances = new HashMap<>(); + + // 2nd component, already finished. + for (ComponentInstance component1Instance : component1Instances) { + failed1Instances.put(component1Instance.getCompInstanceName(), + component1Instance); + when(component1Instance.getComponent().getFailedInstances()).thenReturn( + failed1Instances.values()); + ComponentInstance.handleComponentInstanceRelaunch(component1Instance, + componentInstanceEvent); + } + + verify(comp, never()).markAsSucceeded(any(ComponentInstance.class)); + verify(comp, times(2)).markAsFailed(any(ComponentInstance.class)); + verify(comp, times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + + verify(serviceScheduler.getTerminationHandler(), times(1)).terminate( + eq(-1)); + + // Test case9: 2 components, 2 instances for each + // comp2 already finished. + // comp1 has a new instance finish, we should terminate the service + // All instance finish with 0, service should exit with 0 as well. + containerStatus.setExitStatus(0); + + comp = createComponent(serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.ON_FAILURE, + 2, 0); + component1Instances = comp.getAllComponentInstances(); + + comp2 = createComponent(componentInstance.getComponent().getScheduler(), + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.ON_FAILURE, + 2, 1); + + component2Instances = comp2.getAllComponentInstances(); + + Map succeeded2Instances = new HashMap<>(); + + for (ComponentInstance component2Instance : component2Instances) { + succeeded2Instances.put(component2Instance.getCompInstanceName(), + component2Instance); + when(component2Instance.getComponent().getSucceededInstances()).thenReturn( + succeeded2Instances.values()); + ComponentInstance.handleComponentInstanceRelaunch(component2Instance, + componentInstanceEvent); + } + + Map succeeded1Instances = new HashMap<>(); + // 2nd component, already finished. + for (ComponentInstance component1Instance : component1Instances) { + succeeded1Instances.put(component1Instance.getCompInstanceName(), + component1Instance); + when(component1Instance.getComponent().getSucceededInstances()).thenReturn( + succeeded1Instances.values()); + ComponentInstance.handleComponentInstanceRelaunch(component1Instance, + componentInstanceEvent); + } + + verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class)); + verify(comp, never()).markAsFailed(any(ComponentInstance.class)); + verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(serviceScheduler.getTerminationHandler(), times(1)).terminate(eq(0)); + + // Test case10: 2 components, 2 instances for each + // comp2 hasn't finished + // comp1 finished. + // Service should continue run. + + comp = createComponent(serviceScheduler, + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 2, 0); + component1Instances = comp.getAllComponentInstances(); + + comp2 = createComponent(componentInstance.getComponent().getScheduler(), + org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum.NEVER, + 2, 1); + + component2Instances = comp2.getAllComponentInstances(); + + for (ComponentInstance component2Instance : component2Instances) { + ComponentInstance.handleComponentInstanceRelaunch(component2Instance, + componentInstanceEvent); + } + + succeeded1Instances = new HashMap<>(); + // 2nd component, already finished. + for (ComponentInstance component1Instance : component1Instances) { + succeeded1Instances.put(component1Instance.getCompInstanceName(), + component1Instance); + when(component1Instance.getComponent().getSucceededInstances()).thenReturn( + succeeded1Instances.values()); + ComponentInstance.handleComponentInstanceRelaunch(component1Instance, + componentInstanceEvent); + } + + verify(comp, times(2)).markAsSucceeded(any(ComponentInstance.class)); + verify(comp, never()).markAsFailed(any(ComponentInstance.class)); + verify(componentInstance.getComponent(), times(0)).reInsertPendingInstance( + any(ComponentInstance.class)); + verify(serviceScheduler.getTerminationHandler(), never()).terminate(eq(0)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md index c648046b827..2b567b0057d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md @@ -233,6 +233,8 @@ One or more components of the service. If the service is HBase say, then the com |placement_policy|Advanced scheduling and placement policies for all containers of this component.|false|PlacementPolicy|| |configuration|Config properties for this component.|false|Configuration|| |quicklinks|A list of quicklink keys defined at the service level, and to be resolved by this component.|false|string array|| +|restart_policy|Policy of restart component. Including ALWAYS (Always restart + component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases). Flexing is not supported for components which have restart_policy=ON_FAILURE/NEVER|false|string|ALWAYS| ### ComponentState