commit 4d38189dc2ca3d009cd2db9522890b44b9f920ff Author: Wangda Tan Date: Tue Mar 27 10:09:09 2018 -0700 [TO-file-YARN-JIRA] Support restart policy of containers Change-Id: I385050f6af606404195e115dc265ae4a651b582f diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml index 17723bcff10..097711f2757 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml @@ -356,6 +356,14 @@ definitions: items: type: string description: A list of quicklink keys defined at the service level, and to be resolved by this component. + restartPolicy: + type: string + description: Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases) + enum: + - ALWAYS + - ON_FAILURE + - NEVER + default: ALWAYS ReadinessCheck: description: A custom command or a pluggable helper container to determine the readiness of a container of a component. Readiness for every service is different. Hence the need for a simple interface, with scope to support advanced usecases. required: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java index 75cc9c59a20..f5e7aaca1c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service; +import com.ctc.wstx.util.StringUtil; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; import org.apache.commons.lang3.StringUtils; @@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException; import org.apache.hadoop.yarn.service.monitor.ServiceMonitor; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; @@ -59,8 +61,10 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants.KEYTAB_LOCATION; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java index ce0e0cfde8c..84288236acb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java @@ -17,6 +17,8 @@ package org.apache.hadoop.yarn.service.api.records; +import com.fasterxml.jackson.annotation.JsonValue; +import com.google.gson.annotations.SerializedName; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -29,7 +31,9 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlEnum; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlType; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; @@ -98,6 +102,52 @@ private List containers = Collections.synchronizedList(new ArrayList()); + @SerializedName("restartPolicy") + private RestartPolicyEnum restartPolicy = RestartPolicyEnum.ALWAYS; + + /** + * Artifact Type. DOCKER, TARBALL or SERVICE + **/ + @XmlType(name = "restart_policy") + @XmlEnum + public static enum RestartPolicyEnum { + ALWAYS("ALWAYS"), + + ON_FAILURE("ON_FAILURE"), + + NEVER("NEVER"); + private String value; + + RestartPolicyEnum(String value) { + this.value = value; + } + + @Override + @JsonValue + public String toString() { + return value; + } + } + + public Component restartPolicy(RestartPolicyEnum restartPolicy) { + this.restartPolicy = restartPolicy; + return this; + } + + /** + * Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases) + * @return restartPolicy + **/ + @ApiModelProperty(value = "Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases)") + public RestartPolicyEnum getRestartPolicy() { + return restartPolicy; + } + + public void setRestartPolicy(RestartPolicyEnum restartPolicy) { + this.restartPolicy = restartPolicy; + } + + /** * Name of the service component (mandatory). **/ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 0cd7e2c2084..250817baf4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -96,6 +96,9 @@ // disk_failed containers etc. This will be reset to 0 periodically. public AtomicInteger currentContainerFailure = new AtomicInteger(0); + private AtomicInteger suceededInstances = new AtomicInteger(0); + private AtomicInteger failedInstances = new AtomicInteger(0); + private StateMachine stateMachine; private AsyncDispatcher dispatcher; @@ -661,4 +664,12 @@ public ServiceContext getContext() { public List getPendingInstances() { return pendingInstances; } + + public AtomicInteger getSuceededInstances() { + return suceededInstances; + } + + public AtomicInteger getFailedInstances() { + return failedInstances; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 0e3e11bc72e..3e17c369fa5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -26,6 +26,7 @@ import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -54,6 +55,8 @@ import java.io.IOException; import java.text.MessageFormat; import java.util.Date; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -201,6 +204,64 @@ public void transition(ComponentInstance compInstance, } } + /* + * Check if all components of the scheduler finalized. + * If all components finalized + * (which #failed-instances + #suceeded-instances = #total-n-containers) + * The service will be terminated. + */ + private static void terminateServiceIfAllComponentsFinalized( + ServiceScheduler scheduler) { + boolean shouldTerminate = true; + + // Succeeded comps and failed comps, for logging purposes. + Set succeededComponents = new HashSet<>(); + Set failedComponents = new HashSet<>(); + + for (Component comp : scheduler.getAllComponents().values()) { + int nSucceeded = comp.getSuceededInstances().get(); + int nFailed = comp.getFailedInstances().get(); + if (nSucceeded + nFailed < comp.getComponentSpec() + .getNumberOfContainers()) { + shouldTerminate = false; + break; + } + if (nFailed > 0) { + failedComponents.add(comp.getName()); + } else { + succeededComponents.add(comp.getName()); + } + } + + if (shouldTerminate) { + LOG.info("All component finalized, exiting Service Master... " + + ", final status=" + (failedComponents.isEmpty() ? + "Succeeded" : + "Failed")); + LOG.info( + "Succeeded components: [" + org.apache.commons.lang3.StringUtils + .join(succeededComponents, ",") + + "]"); + LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils + .join(failedComponents, ",") + + "]"); + + // Sleep for 5 seconds in hope that the state can be recorded in ATS. + // in case there's a client polling the comp state, it can be notified. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + LOG.info("Interrupted on sleep while exiting.", e); + } + + if (!failedComponents.isEmpty()) { + ExitUtil.terminate(-1); + } else { + ExitUtil.terminate(0); + } + } + } + private static class ContainerStoppedTransition extends BaseTransition { // whether the container failed before launched by AM or not. boolean failedBeforeLaunching = false; @@ -264,14 +325,46 @@ public void transition(ComponentInstance compInstance, // remove the failed ContainerId -> CompInstance mapping comp.getScheduler().removeLiveCompInstance(event.getContainerId()); - comp.reInsertPendingInstance(compInstance); + boolean shouldRelaunch = false; + boolean succeeded = false; + if (event.getStatus() != null + && event.getStatus().getExitStatus() == ContainerExitStatus.SUCCESS) { + succeeded = true; + } + + if (comp.getComponentSpec().getRestartPolicy() + == org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.ALWAYS) { + // Unconditional relaunch if restart_policy == always. + shouldRelaunch = true; + } else if (comp.getComponentSpec().getRestartPolicy() + == org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum.ON_FAILURE + && (!succeeded)) { + // Relaunch for non-success exit code if restart_policy == on_failure. + shouldRelaunch = true; + } + + if (shouldRelaunch) { + comp.reInsertPendingInstance(compInstance); + LOG.info(compInstance.getCompInstanceId() + + ": {} completed. Reinsert back to pending list and requested " + + "a new container." + System.lineSeparator() + + " exitStatus={}, diagnostics={}.", + event.getContainerId(), event.getStatus().getExitStatus(), + event.getStatus().getDiagnostics()); + } else { + // When no relaunch, update component's #suceeded/#failed + // instances. + if (succeeded) { + comp.getSuceededInstances().incrementAndGet(); + } else { + comp.getFailedInstances().incrementAndGet(); + } + LOG.info(compInstance.getCompInstanceId() + (succeeded ? + " succeeded" : + " failed") + " without retry, exitStatus=" + event.getStatus()); + terminateServiceIfAllComponentsFinalized(comp.getScheduler()); + } - LOG.info(compInstance.getCompInstanceId() - + ": {} completed. Reinsert back to pending list and requested " + - "a new container." + System.lineSeparator() + - " exitStatus={}, diagnostics={}.", - event.getContainerId(), event.getStatus().getExitStatus(), - event.getStatus().getDiagnostics()); if (shouldExit) { // Sleep for 5 seconds in hope that the state can be recorded in ATS. // in case there's a client polling the comp state, it can be notified. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md index 63e0af56fa8..f4df3433bf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md @@ -231,6 +231,7 @@ One or more components of the service. If the service is HBase say, then the com |placement_policy|Advanced scheduling and placement policies for all containers of this component (optional). If not specified, the service level placement_policy takes effect. Refer to the description at the global level for more details.|false|PlacementPolicy|| |configuration|Config properties for this component.|false|Configuration|| |quicklinks|A list of quicklink keys defined at the service level, and to be resolved by this component.|false|string array|| +|restart_policy|Restart policy of this component, could be NEVER/ALWAYS/ON_FAILURE|false|string|ALWAYS| ### ComponentState