diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index 932945b..ed64b54 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; @@ -46,6 +47,7 @@ *
  • Optional, application-specific binary service data.
  • *
  • Environment variables for the launched process.
  • *
  • Command to launch the container.
  • + *
  • Retry strategy when container fails to run.
  • * * * @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest) @@ -61,6 +63,18 @@ public static ContainerLaunchContext newInstance( Map environment, List commands, Map serviceData, ByteBuffer tokens, Map acls) { + return newInstance(localResources, environment, commands, serviceData, + tokens, acls, null); + } + + @Public + @Unstable + public static ContainerLaunchContext newInstance( + Map localResources, + Map environment, List commands, + Map serviceData, ByteBuffer tokens, + Map acls, + ContainerRetryContext containerRetryContext) { ContainerLaunchContext container = Records.newRecord(ContainerLaunchContext.class); container.setLocalResources(localResources); @@ -69,6 +83,7 @@ public static ContainerLaunchContext newInstance( container.setServiceData(serviceData); container.setTokens(tokens); container.setApplicationACLs(acls); + container.setContainerRetryContext(containerRetryContext); return container; } @@ -195,4 +210,22 @@ public static ContainerLaunchContext newInstance( @Public @Stable public abstract void setApplicationACLs(Map acls); + + /** + * Get the ContainerRetryContext to relaunch container. + * @return ContainerRetryContext to relaunch container. + */ + @Public + @Unstable + public abstract ContainerRetryContext getContainerRetryContext(); + + /** + * Set the ContainerRetryContext to relaunch container. + * @param containerRetryContext ContainerRetryContext to + * relaunch container. + */ + @Public + @Unstable + public abstract void setContainerRetryContext( + ContainerRetryContext containerRetryContext); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java new file mode 100644 index 0000000..f257967 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.Set; + +/** + * {@code ContainerRetryContext} indicates how container retry after it fails + * to run. + *

    + * It provides details such as: + *

      + *
    • + * {@link ContainerRetryPolicy} : + * - NEVER_RETRY(DEFAULT value): no matter what error code is when container + * fails to run, just do not retry. + * - ALWAYS_RETRY: no matter what error code is, when container fails to + * run, just retry. + * - RETRY_ON_SPECIFIC_ERROR_CODE: when container fails to run, do retry if + * the error code is one of errorCodes, otherwise do not retry. + * + * Note: if error code is 137(SIGKILL) or 143(SIGTERM), it will not retry + * because it is usually killed on purpose. + *
    • + *
    • + * maxRetries specifies how many times to retry if need to retry. + * If the value is -1, it means retry forever. + *
    • + *
    • retryInterval specifies delaying some time before relaunch + * container, the unit is millisecond.
    • + *
    + */ +@Public +@Unstable +public abstract class ContainerRetryContext { + public static final int RETRY_FOREVER = -1; + public static final int RETRY_INVALID = -1000; + public static final ContainerRetryContext NEVER_RETRY_CONTEXT = + newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 0, 0); + + @Private + @Unstable + public static ContainerRetryContext newInstance( + ContainerRetryPolicy retryPolicy, Set errorCodes, + int maxRetries, int retryInterval) { + ContainerRetryContext containerRetryContext = + Records.newRecord(ContainerRetryContext.class); + containerRetryContext.setRetryPolicy(retryPolicy); + containerRetryContext.setErrorCodes(errorCodes); + containerRetryContext.setMaxRetries(maxRetries); + containerRetryContext.setRetryInterval(retryInterval); + return containerRetryContext; + } + + public abstract ContainerRetryPolicy getRetryPolicy(); + public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy); + public abstract Set getErrorCodes(); + public abstract void setErrorCodes(Set errorCodes); + public abstract int getMaxRetries(); + public abstract void setMaxRetries(int maxRetries); + public abstract int getRetryInterval(); + public abstract void setRetryInterval(int retryInterval); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java new file mode 100644 index 0000000..a3816c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

    Retry policy for relaunching a Container.

    + */ +@Public +@Unstable +public enum ContainerRetryPolicy { + /** Never retry. */ + NEVER_RETRY, + /** Always retry. */ + ALWAYS_RETRY, + /** Retry for specific error codes. */ + RETRY_ON_SPECIFIC_ERROR_CODE +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index d122f5a..ace442d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -487,6 +487,7 @@ message ContainerLaunchContextProto { repeated StringStringMapProto environment = 4; repeated string command = 5; repeated ApplicationACLMapProto application_ACLs = 6; + optional ContainerRetryContextProto container_retry_context = 7; } message ContainerStatusProto { @@ -510,6 +511,19 @@ message ContainerResourceChangeRequestProto { optional ResourceProto capability = 2; } +message ContainerRetryContextProto { + optional ContainerRetryPolicyProto retry_policy = 1 [default = NEVER_RETRY]; + repeated int32 error_codes = 2; + optional int32 max_retries = 3 [default = 0]; + optional int32 retry_interval = 4 [default = 0]; +} + +enum ContainerRetryPolicyProto { + NEVER_RETRY = 0; + ALWAYS_RETRY = 1; + RETRY_ON_SPECIFIC_ERROR_CODE = 2; +} + //////////////////////////////////////////////////////////////////////// ////// From common////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index f410c43..f61059c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -76,6 +77,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -253,6 +256,13 @@ // File length needed for local resource private long shellScriptPathLen = 0; + // Container retry options + private ContainerRetryPolicy containerRetryPolicy = + ContainerRetryPolicy.NEVER_RETRY; + private Set errorCodes = null; + private int maxRetries = 0; + private int retryInterval = 0; + // Timeline domain ID private String domainId = null; @@ -371,6 +381,17 @@ public boolean init(String[] args) throws ParseException, IOException { opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("retry_policy", true, + "Retry policy when container fails to run, " + + "0: NEVER_RETRY, 1: ALWAYS_RETRY, " + + "2: RETRY_ON_SPECIFIC_ERROR_CODE"); + opts.addOption("error_codes", true, + "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes " + + "is specified with this option, e.g. --error_codes 1,2,3"); + opts.addOption("max_retries", true, "If container could retry, it specifies" + + " max retires"); + opts.addOption("retry_interval", true, "interval between each retry, unit " + + "is milliseconds"); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -508,6 +529,19 @@ public boolean init(String[] args) throws ParseException, IOException { } requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + + containerRetryPolicy = ContainerRetryPolicy.values()[ + Integer.parseInt(cliParser.getOptionValue("retry_policy", "0"))]; + if (cliParser.hasOption("error_codes")) { + errorCodes = new HashSet<>(); + for (String errorCode : + cliParser.getOptionValue("error_codes").split(",")) { + errorCodes.add(Integer.parseInt(errorCode)); + } + } + maxRetries = Integer.parseInt(cliParser.getOptionValue("max_retries", "0")); + retryInterval = Integer.parseInt(cliParser.getOptionValue( + "retry_interval", "0")); return true; } @@ -1062,9 +1096,11 @@ public void run() { // "hadoop dfs" command inside the distributed shell. Map myShellEnv = new HashMap(shellEnv); myShellEnv.put(YARN_SHELL_ID, shellId); + ContainerRetryContext containerRetryContext = ContainerRetryContext.newInstance( + containerRetryPolicy, errorCodes, maxRetries, retryInterval); ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, myShellEnv, commands, null, allTokens.duplicate(), - null); + null, containerRetryContext); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 68d2bde..15df60b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -168,6 +168,8 @@ private long attemptFailuresValidityInterval = -1; + private Vector containerRetryOptions = new Vector<>(5); + // Debug flag boolean debugFlag = false; @@ -287,6 +289,17 @@ public Client(Configuration conf) throws Exception { + " will be allocated, \"\" means containers" + " can be allocated anywhere, if you don't specify the option," + " default node_label_expression of queue will be used."); + opts.addOption("retry_policy", true, + "Retry policy when container fails to run, " + + "0: NEVER_RETRY, 1: ALWAYS_RETRY, " + + "2: RETRY_ON_SPECIFIC_ERROR_CODE"); + opts.addOption("error_codes", true, + "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes " + + "is specified with this option, e.g. --error_codes 1,2,3"); + opts.addOption("max_retries", true, "If container could retry, it specifies" + + " max retries"); + opts.addOption("retry_interval", true, "interval between each retry, unit " + + "is milliseconds"); } /** @@ -429,6 +442,24 @@ public boolean init(String[] args) throws ParseException { } } + // Get container retry options + if (cliParser.hasOption("retry_policy")) { + containerRetryOptions.add("--retry_policy " + + cliParser.getOptionValue("retry_policy")); + } + if (cliParser.hasOption("error_codes")) { + containerRetryOptions.add("--error_codes " + + cliParser.getOptionValue("error_codes")); + } + if (cliParser.hasOption("max_retries")) { + containerRetryOptions.add("--max_retries " + + cliParser.getOptionValue("max_retries")); + } + if (cliParser.hasOption("retry_interval")) { + containerRetryOptions.add("--retry_interval " + + cliParser.getOptionValue("retry_interval")); + } + return true; } @@ -638,6 +669,8 @@ public boolean run() throws IOException, YarnException { vargs.add("--debug"); } + vargs.addAll(containerRetryOptions); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 12dcfcd..37eea46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -29,10 +29,12 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; @@ -56,7 +58,8 @@ private Map environment = null; private List commands = null; private Map applicationACLS = null; - + private ContainerRetryContext containerRetryContext = null; + public ContainerLaunchContextPBImpl() { builder = ContainerLaunchContextProto.newBuilder(); } @@ -120,6 +123,10 @@ private void mergeLocalToBuilder() { if (this.applicationACLS != null) { addApplicationACLs(); } + if (this.containerRetryContext != null) { + builder.setContainerRetryContext( + convertToProtoFormat(this.containerRetryContext)); + } } private void mergeLocalToProto() { @@ -456,6 +463,27 @@ public void setApplicationACLs( this.applicationACLS.putAll(appACLs); } + public ContainerRetryContext getContainerRetryContext() { + ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerRetryContext != null) { + return this.containerRetryContext; + } + if (!p.hasContainerRetryContext()) { + return null; + } + this.containerRetryContext = convertFromProtoFormat( + p.getContainerRetryContext()); + return this.containerRetryContext; + } + + public void setContainerRetryContext(ContainerRetryContext retryContext) { + maybeInitBuilder(); + if (retryContext == null) { + builder.clearContainerRetryContext(); + } + this.containerRetryContext = retryContext; + } + private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { return new LocalResourcePBImpl(p); } @@ -463,4 +491,14 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { private LocalResourceProto convertToProtoFormat(LocalResource t) { return ((LocalResourcePBImpl)t).getProto(); } -} + + private ContainerRetryContextPBImpl convertFromProtoFormat( + ContainerRetryContextProto p) { + return new ContainerRetryContextPBImpl(p); + } + + private ContainerRetryContextProto convertToProtoFormat( + ContainerRetryContext t) { + return ((ContainerRetryContextPBImpl)t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java new file mode 100644 index 0000000..a5ef70d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProtoOrBuilder; + +import java.util.HashSet; +import java.util.Set; + +/** + * Implementation of ContainerRetryContext. + */ +public class ContainerRetryContextPBImpl extends ContainerRetryContext { + private ContainerRetryContextProto proto = + ContainerRetryContextProto.getDefaultInstance(); + private ContainerRetryContextProto.Builder builder = null; + private boolean viaProto = false; + + private Set errorCodes = null; + + public ContainerRetryContextPBImpl() { + builder = ContainerRetryContextProto.newBuilder(); + } + + public ContainerRetryContextPBImpl(ContainerRetryContextProto proto) { + this.proto = proto; + viaProto = true; + } + + public ContainerRetryContextProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.errorCodes != null) { + builder.clearErrorCodes(); + builder.addAllErrorCodes(this.errorCodes); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerRetryContextProto.newBuilder(proto); + } + viaProto = false; + } + + public ContainerRetryPolicy getRetryPolicy() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRetryPolicy()) { + return ContainerRetryPolicy.NEVER_RETRY; + } + return convertFromProtoFormat(p.getRetryPolicy()); + } + + public void setRetryPolicy(ContainerRetryPolicy containerRetryPolicy) { + maybeInitBuilder(); + if (containerRetryPolicy == null) { + builder.clearRetryPolicy(); + return; + } + builder.setRetryPolicy(convertToProtoFormat(containerRetryPolicy)); + } + + private void initErrorCodes() { + if (this.errorCodes != null) { + return; + } + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + this.errorCodes = new HashSet<>(); + this.errorCodes.addAll(p.getErrorCodesList()); + } + + public Set getErrorCodes() { + initErrorCodes(); + return this.errorCodes; + } + + public void setErrorCodes(Set errCodes) { + maybeInitBuilder(); + if (errCodes == null || errCodes.isEmpty()) { + builder.clearErrorCodes(); + } + this.errorCodes = errCodes; + } + + public int getMaxRetries() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasMaxRetries()) { + return 0; + } + return p.getMaxRetries(); + } + + public void setMaxRetries(int maxRetries) { + maybeInitBuilder(); + builder.setMaxRetries(maxRetries); + } + + public int getRetryInterval() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRetryInterval()) { + return 0; + } + return p.getRetryInterval(); + } + + public void setRetryInterval(int retryInterval) { + maybeInitBuilder(); + builder.setRetryInterval(retryInterval); + } + + private ContainerRetryPolicyProto convertToProtoFormat( + ContainerRetryPolicy containerRetryPolicy) { + return ProtoUtils.convertToProtoFormat(containerRetryPolicy); + } + + private ContainerRetryPolicy convertFromProtoFormat( + ContainerRetryPolicyProto containerRetryPolicyProto) { + return ProtoUtils.convertFromProtoFormat(containerRetryPolicyProto); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 29ed0f3..d3d5948 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos; @@ -294,4 +296,17 @@ public static ExecutionTypeProto convertToProtoFormat(ExecutionType e) { public static ExecutionType convertFromProtoFormat(ExecutionTypeProto e) { return ExecutionType.valueOf(e.name()); } + + /* + * ContainerRetryPolicy + */ + public static ContainerRetryPolicyProto convertToProtoFormat( + ContainerRetryPolicy e) { + return ContainerRetryPolicyProto.valueOf(e.name()); + } + + public static ContainerRetryPolicy convertFromProtoFormat( + ContainerRetryPolicyProto e) { + return ContainerRetryPolicy.valueOf(e.name()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index b7f5ff7..96df10b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -118,6 +118,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LogAggregationContext; @@ -159,6 +160,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; @@ -192,6 +194,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; @@ -345,7 +348,7 @@ private static Object genTypeValue(Type type) { return rand.nextBoolean(); } else if (type.equals(byte.class)) { return bytes[rand.nextInt(4)]; - } else if (type.equals(int.class)) { + } else if (type.equals(int.class) || type.equals(Integer.class)) { return rand.nextInt(1000000); } else if (type.equals(long.class)) { return Long.valueOf(rand.nextInt(1000000)); @@ -469,6 +472,7 @@ public static void setup() throws Exception { generateByNewInstance(ApplicationResourceUsageReport.class); generateByNewInstance(ApplicationReport.class); generateByNewInstance(Container.class); + generateByNewInstance(ContainerRetryContext.class); generateByNewInstance(ContainerLaunchContext.class); generateByNewInstance(ApplicationSubmissionContext.class); generateByNewInstance(ContainerReport.class); @@ -957,6 +961,12 @@ public void testContainerIdPBImpl() throws Exception { } @Test + public void testContainerRetryPBImpl() throws Exception { + validatePBImplRecord(ContainerRetryContextPBImpl.class, + ContainerRetryContextProto.class); + } + + @Test public void testContainerLaunchContextPBImpl() throws Exception { validatePBImplRecord(ContainerLaunchContextPBImpl.class, ContainerLaunchContextProto.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 8c74bf5..d08ee67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -308,6 +308,7 @@ public void writeLaunchEnv(OutputStream out, } public enum ExitCode { + SUCCESS(0), FORCE_KILLED(137), TERMINATED(143), LOST(154); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index 5cc4e19..976df7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -551,6 +551,10 @@ public Path getLocalPathForWrite(String pathStr, long size, checkWrite); } + public Path getLocalPathForRead(String pathStr) throws IOException { + return getPathToRead(pathStr, getLocalDirsForRead()); + } + public Path getLogPathForWrite(String pathStr, boolean checkWrite) throws IOException { return logDirsAllocator.getLocalPathForWrite(pathStr, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index f44de59..fb37a06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -349,7 +349,8 @@ private void recoverContainer(RecoveredContainerState rcs) Container container = new ContainerImpl(getConfig(), dispatcher, context.getNMStateStore(), req.getContainerLaunchContext(), credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), - rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability()); + rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(), + rcs.getRemainingRetryAttempts()); context.getContainers().put(containerId, container); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 1d2ec56..310d8df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -55,6 +55,10 @@ NMContainerStatus getNMContainerStatus(); + boolean shouldRetry(int errorCode); + + boolean isRelaunch(); + String toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index e16ea93..60da9f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; @@ -100,6 +103,10 @@ private long containerLocalizationStartTime; private long containerLaunchStartTime; private static Clock clock = new SystemClock(); + private final ContainerRetryContext containerRetryContext; + // remaining retries to relaunch container if needed + private int remainingRetryAttempts; + private boolean isRelaunch = false; /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; @@ -134,6 +141,13 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.dispatcher = dispatcher; this.stateStore = stateStore; this.launchContext = launchContext; + if (launchContext != null + && launchContext.getContainerRetryContext() != null) { + this.containerRetryContext = launchContext.getContainerRetryContext(); + } else { + this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT; + } + this.remainingRetryAttempts = containerRetryContext.getMaxRetries(); this.containerTokenIdentifier = containerTokenIdentifier; this.containerId = containerTokenIdentifier.getContainerID(); this.resource = containerTokenIdentifier.getResource(); @@ -154,7 +168,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier, RecoveredContainerStatus recoveredStatus, int exitCode, - String diagnostics, boolean wasKilled, Resource recoveredCapability) { + String diagnostics, boolean wasKilled, Resource recoveredCapability, + int remainingRetryAttempts) { this(conf, dispatcher, stateStore, launchContext, creds, metrics, containerTokenIdentifier); this.recoveredStatus = recoveredStatus; @@ -167,6 +182,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.resource = Resource.newInstance(recoveredCapability.getMemory(), recoveredCapability.getVirtualCores()); } + if (remainingRetryAttempts != ContainerRetryContext.RETRY_INVALID) { + this.remainingRetryAttempts = remainingRetryAttempts; + } } private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = @@ -246,9 +264,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, new ExitedWithSuccessTransition(true)) .addTransition(ContainerState.RUNNING, - ContainerState.EXITED_WITH_FAILURE, + EnumSet.of(ContainerState.EXITED_WITH_FAILURE, + ContainerState.LOCALIZED), ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, - new ExitedWithFailureTransition(true)) + new RetryFailureTransition()) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) @@ -854,6 +873,86 @@ public void transition(ContainerImpl container, ContainerEvent event) { } /** + * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon + * CONTAINER_EXITED_WITH_FAILURE state. + **/ + @SuppressWarnings("unchecked") // dispatcher not typed + static class RetryFailureTransition implements + MultipleArcTransition { + + @Override + public ContainerState transition(final ContainerImpl container, + ContainerEvent event) { + ContainerExitEvent exitEvent = (ContainerExitEvent) event; + container.exitCode = exitEvent.getExitCode(); + if (exitEvent.getDiagnosticInfo() != null) { + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); + } + + if (container.shouldRetry(container.exitCode)) { + if (container.remainingRetryAttempts > 0) { + container.remainingRetryAttempts--; + try { + container.stateStore.storeContainerRemainingRetryAttempts( + container.getContainerId(), container.remainingRetryAttempts); + } catch (IOException e) { + LOG.warn("Unable to update remainingRetryAttempts in state store for " + + container.getContainerId(), e); + } + } + LOG.info("Relaunch Container " + container.getContainerId() + + ". Remain retry attempts : " + container.remainingRetryAttempts + + ". Retry interval is " + + container.containerRetryContext.getRetryInterval() + "ms"); + container.isRelaunch = true; + container.wasLaunched = false; + container.metrics.endRunningContainer(); + // wait for some time, then relaunch + new Thread() { + @Override + public void run() { + try { + Thread.sleep(container.containerRetryContext.getRetryInterval()); + container.sendLaunchEvent(); + } catch (InterruptedException e) { + return; + } + } + }.start(); + return ContainerState.LOCALIZED; + } else { + new ExitedWithFailureTransition(true).transition(container, event); + return ContainerState.EXITED_WITH_FAILURE; + } + } + } + + @Override + public boolean shouldRetry(int errorCode) { + if (errorCode == ExitCode.SUCCESS.getExitCode() + || errorCode == ExitCode.FORCE_KILLED.getExitCode() + || errorCode == ExitCode.TERMINATED.getExitCode()) { + return false; + } + + ContainerRetryPolicy retryPolicy = containerRetryContext.getRetryPolicy(); + if (retryPolicy == ContainerRetryPolicy.ALWAYS_RETRY + || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE + && containerRetryContext.getErrorCodes() != null + && containerRetryContext.getErrorCodes().contains(errorCode))) { + return remainingRetryAttempts > 0 + || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER; + } + + return false; + } + + @Override + public boolean isRelaunch() { + return isRelaunch; + } + + /** * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST */ static class KilledExternallyTransition extends ExitedWithFailureTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 6371b21..0b48abd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -48,6 +48,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -177,6 +178,10 @@ public Integer call() { Path containerLogDir; try { + // Select the working directory for the container + Path containerWorkDir = getContainerWorkDir(container); + cleanupContainerFilesForRelaunch(container, containerWorkDir); + localResources = container.getLocalizedResources(); if (localResources == null) { throw RPCUtil.getRemoteException( @@ -191,8 +196,7 @@ public Integer call() { String appIdStr = app.getAppId().toString(); String relativeContainerLogDir = ContainerLaunch .getRelativeContainerLogDir(appIdStr, containerIdStr); - containerLogDir = - dirsHandler.getLogPathForWrite(relativeContainerLogDir, false); + containerLogDir = getContainerLogDir(container); for (String str : command) { // TODO: Should we instead work via symlinks without this grammar? newCmds.add(expandEnvironment(str, containerLogDir)); @@ -226,14 +230,6 @@ public Integer call() { DataOutputStream containerScriptOutStream = null; DataOutputStream tokensOutStream = null; - // Select the working directory for the container - Path containerWorkDir = - dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE - + Path.SEPARATOR + user + Path.SEPARATOR - + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr - + Path.SEPARATOR + containerIdStr, - LocalDirAllocator.SIZE_UNKNOWN, false); - String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); // pid file should be in nm private dir so that it is not @@ -329,7 +325,9 @@ public Integer call() { completed.set(true); exec.deactivateContainer(containerID); try { - context.getNMStateStore().storeContainerCompleted(containerID, ret); + if (!container.shouldRetry(ret)) { + context.getNMStateStore().storeContainerCompleted(containerID, ret); + } } catch (IOException e) { LOG.error("Unable to set exit code for container " + containerID); } @@ -369,6 +367,120 @@ public Integer call() { } /** + * Get container's working directory. + * If container is in relaunch and we find its previous working directory + * could be read/write it will be reused as current working directory. + * We apply a simple heuristic to find its previous working directory: + * if a good work dir with the container tokens file already exists, + * it means it is container's previous working directory. + */ + private Path getContainerWorkDir(Container container1) throws IOException { + Path containerWorkDir = null; + String relativeContainerWorkDir = ContainerLocalizer.USERCACHE + + Path.SEPARATOR + container1.getUser() + Path.SEPARATOR + + ContainerLocalizer.APPCACHE + Path.SEPARATOR + + app.getAppId().toString() + Path.SEPARATOR + + ConverterUtils.toString(container1.getContainerId()); + + if (container1.isRelaunch()) { + try { + // container's tokens file's parent is container's previous working dir + containerWorkDir = dirsHandler.getLocalPathForRead( + relativeContainerWorkDir + Path.SEPARATOR + + ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE).getParent(); + // check whether containerWorDir could be read/write, it will throw + // IOException if not. + DiskChecker.checkDir( + new File(containerWorkDir.toUri().getPath())); + } catch (IOException e) { + containerWorkDir = null; + } + } + if (containerWorkDir == null) { + containerWorkDir = + dirsHandler.getLocalPathForWrite(relativeContainerWorkDir, + LocalDirAllocator.SIZE_UNKNOWN, false); + } + + return containerWorkDir; + } + + /** + * Get container's log directory. + * If container is in relaunch and we find its previous log directory + * could be read/write it will be reused as current log directory. + * We apply a simple heuristic to find its previous working directory: + * if a good work dir with file 'stdout' already exists, it means it + * is container's previous working directory. We assume there is 'stdout' + * in most containers's log directory. + */ + private Path getContainerLogDir(Container container1) throws IOException { + Path containerLogDir = null; + String relativeContainerLogDir = ContainerLaunch + .getRelativeContainerLogDir(app.getAppId().toString(), + ConverterUtils.toString(container1.getContainerId())); + + if (container1.isRelaunch()) { + try { + containerLogDir = dirsHandler.getLocalPathForRead( + relativeContainerLogDir + Path.SEPARATOR + + "stdout").getParent(); + // check whether containerWorDir could be read/write, it will throw + // IOException if not. + DiskChecker.checkDir( + new File(containerLogDir.toUri().getPath())); + } catch (IOException e) { + containerLogDir = null; + } + } + if (containerLogDir == null) { + containerLogDir = + dirsHandler.getLogPathForWrite(relativeContainerLogDir, false); + } + + return containerLogDir; + } + + /** + * Clean up container's previous files for container relaunch. + */ + private void cleanupContainerFilesForRelaunch(Container container1, + Path containerWorkDir) { + if (!container1.isRelaunch()) { + return; + } + + try { + FileContext lfs = FileContext.getLocalFSFileContext(); + // delete ContainerScriptPath + lfs.delete(new Path(containerWorkDir, CONTAINER_SCRIPT), false); + // delete TokensPath + lfs.delete( + new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE), false); + } catch (IOException e) { + LOG.error("Failed to delete container script file and token file", e); + } + + String appIdStr = app.getAppId().toString(); + String containerIdStr = + ConverterUtils.toString(container1.getContainerId()); + // delete pid file and pid exit code file + String pidPath = getPidFileSubpath(appIdStr, containerIdStr); + deleteLocalPath(pidPath); + deleteLocalPath(getExitCodeFile(pidPath)); + } + + private void deleteLocalPath(String relativePath) { + try { + FileContext lfs = FileContext.getLocalFSFileContext(); + Path path = dirsHandler.getLocalPathForRead(relativePath); + lfs.delete(path, false); + } catch (IOException e) { + LOG.error("Failed to delete " + relativePath, e); + } + } + + /** * Tries to tail and fetch TAIL_SIZE_IN_BYTES of data from the error log. * ErrorLog filename is not fixed and depends upon app, hence file name * pattern is used. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 89c71bb..db84485 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -106,6 +106,8 @@ "/resourceChanged"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; + private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX = + "/remainingRetryAttempts"; private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; @@ -238,6 +240,8 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) { rcs.capability = new ResourcePBImpl( ResourceProto.parseFrom(entry.getValue())); + } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { + rcs.setRemainingRetryAttempts(Integer.parseInt(asString(entry.getValue()))); } else { throw new IOException("Unexpected container state key: " + key); } @@ -321,6 +325,18 @@ public void storeContainerCompleted(ContainerId containerId, } @Override + public void storeContainerRemainingRetryAttempts(ContainerId containerId, + int remainingRetryAttempts) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_REMAIN_RETRIES_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts))); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override public void removeContainer(ContainerId containerId) throws IOException { String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index d5dce9b..68f4e87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -104,6 +104,11 @@ public void storeContainerCompleted(ContainerId containerId, int exitCode) } @Override + public void storeContainerRemainingRetryAttempts(ContainerId containerId, + int remainingRetryAttempts) throws IOException { + } + + @Override public void removeContainer(ContainerId containerId) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index e8ccf54..b733ffe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -76,6 +77,7 @@ public NMStateStoreService(String name) { String diagnostics = ""; StartContainerRequest startRequest; Resource capability; + private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID; public RecoveredContainerStatus getStatus() { return status; @@ -100,6 +102,14 @@ public StartContainerRequest getStartRequest() { public Resource getCapability() { return capability; } + + public int getRemainingRetryAttempts() { + return remainingRetryAttempts; + } + + public void setRemainingRetryAttempts(int retryAttempts) { + this.remainingRetryAttempts = retryAttempts; + } } public static class LocalResourceTrackerState { @@ -325,6 +335,15 @@ public abstract void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException; /** + * Record remaining retry attempts for a container. + * @param containerId the container ID + * @param remainingRetryAttempts the remain retry times when container fails to run + * @throws IOException + */ + public abstract void storeContainerRemainingRetryAttempts( + ContainerId containerId, int remainingRetryAttempts) throws IOException; + + /** * Remove records corresponding to a container * @param containerId the container ID * @throws IOException diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 2ab9842..7a26df5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -645,6 +647,66 @@ public void testLaunchAfterKillRequest() throws Exception { } } + @Test + public void testContainerRetry() throws Exception{ + ContainerRetryContext containerRetryContext1 = ContainerRetryContext.newInstance( + ContainerRetryPolicy.NEVER_RETRY, null, 3, 0); + testContainerRetry(containerRetryContext1, 2, 0); + + ContainerRetryContext containerRetryContext2 = ContainerRetryContext.newInstance( + ContainerRetryPolicy.ALWAYS_RETRY, null, 3, 0); + testContainerRetry(containerRetryContext2, 2, 3); + + ContainerRetryContext containerRetryContext3 = ContainerRetryContext.newInstance( + ContainerRetryPolicy.ALWAYS_RETRY, null, 3, 0); + // If exit code is 0, it will not retry + testContainerRetry(containerRetryContext3, 0, 0); + + ContainerRetryContext containerRetryContext4 = ContainerRetryContext.newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, null, 3, 0); + testContainerRetry(containerRetryContext4, 2, 0); + + HashSet errorCodes = new HashSet<>(); + errorCodes.add(2); + errorCodes.add(6); + ContainerRetryContext containerRetryContext5 = ContainerRetryContext.newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, errorCodes, 3, 0); + testContainerRetry(containerRetryContext5, 2, 3); + + HashSet errorCodes2 = new HashSet<>(); + errorCodes.add(143); + ContainerRetryContext containerRetryContext6 = ContainerRetryContext.newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, errorCodes2, 3, 0); + // If exit code is 143(SIGTERM), it will not retry even it is in errorCodes. + testContainerRetry(containerRetryContext5, 143, 0); + } + + private void testContainerRetry(ContainerRetryContext containerRetryContext, int exitCode, + int expectedRetries) throws Exception{ + WrappedContainer wc = null; + try { + int retryTimes = 0; + wc = new WrappedContainer(24, 314159265358979L, 4344, "yak", + containerRetryContext); + wc.initContainer(); + wc.localizeResources(); + wc.launchContainer(); + while (true) { + wc.containerFailed(exitCode); + if (wc.c.getContainerState() == ContainerState.RUNNING) { + retryTimes ++; + } else { + break; + } + } + Assert.assertEquals(expectedRetries, retryTimes); + } finally { + if (wc != null) { + wc.finished(); + } + } + } + private void verifyCleanupCall(WrappedContainer wc) throws Exception { ResourcesReleasedMatcher matchesReq = new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( @@ -782,12 +844,23 @@ public boolean matches(Object o) { WrappedContainer(int appId, long timestamp, int id, String user) throws IOException { - this(appId, timestamp, id, user, true, false); + this(appId, timestamp, id, user, null); + } + + WrappedContainer(int appId, long timestamp, int id, String user, + ContainerRetryContext containerRetryContext) throws IOException { + this(appId, timestamp, id, user, true, false, containerRetryContext); } - @SuppressWarnings("rawtypes") WrappedContainer(int appId, long timestamp, int id, String user, boolean withLocalRes, boolean withServiceData) throws IOException { + this(appId, timestamp, id, user, withLocalRes, withServiceData, null); + } + + @SuppressWarnings("rawtypes") + WrappedContainer(int appId, long timestamp, int id, String user, + boolean withLocalRes, boolean withServiceData, + ContainerRetryContext containerRetryContext) throws IOException { dispatcher = new DrainDispatcher(); dispatcher.init(new Configuration()); @@ -864,6 +937,8 @@ public boolean matches(Object o) { } when(ctxt.getServiceData()).thenReturn(serviceData); + when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext); + c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(), ctxt, null, metrics, identifier); dispatcher.register(ContainerEventType.class, @@ -984,6 +1059,10 @@ public void containerFailed(int exitCode) { assert containerStatus.getDiagnostics().contains(diagnosticMsg); assert containerStatus.getExitStatus() == exitCode; drainDispatcherEvents(); + // If container needs retry, relaunch it + if (c.getContainerState() == ContainerState.LOCALIZED) { + launchContainer(); + } } public void killContainer() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index a1c95ab..81ecf65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -124,6 +124,7 @@ public synchronized void removeApplication(ApplicationId appId) rcsCopy.diagnostics = rcs.diagnostics; rcsCopy.startRequest = rcs.startRequest; rcsCopy.capability = rcs.capability; + rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts()); result.add(rcsCopy); } return result; @@ -177,6 +178,13 @@ public synchronized void storeContainerCompleted(ContainerId containerId, } @Override + public void storeContainerRemainingRetryAttempts(ContainerId containerId, + int remainingRetryAttempts) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.setRemainingRetryAttempts(remainingRetryAttempts); + } + + @Override public synchronized void removeContainer(ContainerId containerId) throws IOException { containerStates.remove(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 08b49e7..fe70f10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -337,6 +337,13 @@ public void testContainerStorage() throws IOException { assertEquals(containerReq, rcs.getStartRequest()); assertEquals(diags.toString(), rcs.getDiagnostics()); + // store remainRetries + stateStore.storeContainerRemainingRetryAttempts(containerId, 6); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + assertEquals(6, recoveredContainers.get(0).getRemainingRetryAttempts()); + // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 394a92c..ba52a97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -144,4 +144,14 @@ public ContainerTokenIdentifier getContainerTokenIdentifier() { public NMContainerStatus getNMContainerStatus() { return null; } + + @Override + public boolean shouldRetry(int errorCode) { + return false; + } + + @Override + public boolean isRelaunch() { + return false; + } } diff --git a/patch b/patch new file mode 100644 index 0000000..7eea87c --- /dev/null +++ b/patch @@ -0,0 +1,1382 @@ +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +index 932945b..ed64b54 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +@@ -24,6 +24,7 @@ + + import org.apache.hadoop.classification.InterfaceAudience.Public; + import org.apache.hadoop.classification.InterfaceStability.Stable; ++import org.apache.hadoop.classification.InterfaceStability.Unstable; + import org.apache.hadoop.yarn.api.ContainerManagementProtocol; + import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; + import org.apache.hadoop.yarn.server.api.AuxiliaryService; +@@ -46,6 +47,7 @@ + *
  • Optional, application-specific binary service data.
  • + *
  • Environment variables for the launched process.
  • + *
  • Command to launch the container.
  • ++ *
  • Retry strategy when container fails to run.
  • + * + * + * @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest) +@@ -61,6 +63,18 @@ public static ContainerLaunchContext newInstance( + Map environment, List commands, + Map serviceData, ByteBuffer tokens, + Map acls) { ++ return newInstance(localResources, environment, commands, serviceData, ++ tokens, acls, null); ++ } ++ ++ @Public ++ @Unstable ++ public static ContainerLaunchContext newInstance( ++ Map localResources, ++ Map environment, List commands, ++ Map serviceData, ByteBuffer tokens, ++ Map acls, ++ ContainerRetryContext containerRetryContext) { + ContainerLaunchContext container = + Records.newRecord(ContainerLaunchContext.class); + container.setLocalResources(localResources); +@@ -69,6 +83,7 @@ public static ContainerLaunchContext newInstance( + container.setServiceData(serviceData); + container.setTokens(tokens); + container.setApplicationACLs(acls); ++ container.setContainerRetryContext(containerRetryContext); + return container; + } + +@@ -195,4 +210,22 @@ public static ContainerLaunchContext newInstance( + @Public + @Stable + public abstract void setApplicationACLs(Map acls); ++ ++ /** ++ * Get the ContainerRetryContext to relaunch container. ++ * @return ContainerRetryContext to relaunch container. ++ */ ++ @Public ++ @Unstable ++ public abstract ContainerRetryContext getContainerRetryContext(); ++ ++ /** ++ * Set the ContainerRetryContext to relaunch container. ++ * @param containerRetryContext ContainerRetryContext to ++ * relaunch container. ++ */ ++ @Public ++ @Unstable ++ public abstract void setContainerRetryContext( ++ ContainerRetryContext containerRetryContext); + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java +new file mode 100644 +index 0000000..f257967 +--- /dev/null ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java +@@ -0,0 +1,84 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.hadoop.yarn.api.records; ++ ++import org.apache.hadoop.classification.InterfaceAudience.Private; ++import org.apache.hadoop.classification.InterfaceAudience.Public; ++import org.apache.hadoop.classification.InterfaceStability.Unstable; ++import org.apache.hadoop.yarn.util.Records; ++ ++import java.util.Set; ++ ++/** ++ * {@code ContainerRetryContext} indicates how container retry after it fails ++ * to run. ++ *

    ++ * It provides details such as: ++ *

      ++ *
    • ++ * {@link ContainerRetryPolicy} : ++ * - NEVER_RETRY(DEFAULT value): no matter what error code is when container ++ * fails to run, just do not retry. ++ * - ALWAYS_RETRY: no matter what error code is, when container fails to ++ * run, just retry. ++ * - RETRY_ON_SPECIFIC_ERROR_CODE: when container fails to run, do retry if ++ * the error code is one of errorCodes, otherwise do not retry. ++ * ++ * Note: if error code is 137(SIGKILL) or 143(SIGTERM), it will not retry ++ * because it is usually killed on purpose. ++ *
    • ++ *
    • ++ * maxRetries specifies how many times to retry if need to retry. ++ * If the value is -1, it means retry forever. ++ *
    • ++ *
    • retryInterval specifies delaying some time before relaunch ++ * container, the unit is millisecond.
    • ++ *
    ++ */ ++@Public ++@Unstable ++public abstract class ContainerRetryContext { ++ public static final int RETRY_FOREVER = -1; ++ public static final int RETRY_INVALID = -1000; ++ public static final ContainerRetryContext NEVER_RETRY_CONTEXT = ++ newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 0, 0); ++ ++ @Private ++ @Unstable ++ public static ContainerRetryContext newInstance( ++ ContainerRetryPolicy retryPolicy, Set errorCodes, ++ int maxRetries, int retryInterval) { ++ ContainerRetryContext containerRetryContext = ++ Records.newRecord(ContainerRetryContext.class); ++ containerRetryContext.setRetryPolicy(retryPolicy); ++ containerRetryContext.setErrorCodes(errorCodes); ++ containerRetryContext.setMaxRetries(maxRetries); ++ containerRetryContext.setRetryInterval(retryInterval); ++ return containerRetryContext; ++ } ++ ++ public abstract ContainerRetryPolicy getRetryPolicy(); ++ public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy); ++ public abstract Set getErrorCodes(); ++ public abstract void setErrorCodes(Set errorCodes); ++ public abstract int getMaxRetries(); ++ public abstract void setMaxRetries(int maxRetries); ++ public abstract int getRetryInterval(); ++ public abstract void setRetryInterval(int retryInterval); ++} +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java +new file mode 100644 +index 0000000..a3816c1 +--- /dev/null ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java +@@ -0,0 +1,35 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.yarn.api.records; ++ ++import org.apache.hadoop.classification.InterfaceAudience.Public; ++import org.apache.hadoop.classification.InterfaceStability.Unstable; ++ ++/** ++ *

    Retry policy for relaunching a Container.

    ++ */ ++@Public ++@Unstable ++public enum ContainerRetryPolicy { ++ /** Never retry. */ ++ NEVER_RETRY, ++ /** Always retry. */ ++ ALWAYS_RETRY, ++ /** Retry for specific error codes. */ ++ RETRY_ON_SPECIFIC_ERROR_CODE ++} +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +index d122f5a..ace442d 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +@@ -487,6 +487,7 @@ message ContainerLaunchContextProto { + repeated StringStringMapProto environment = 4; + repeated string command = 5; + repeated ApplicationACLMapProto application_ACLs = 6; ++ optional ContainerRetryContextProto container_retry_context = 7; + } + + message ContainerStatusProto { +@@ -510,6 +511,19 @@ message ContainerResourceChangeRequestProto { + optional ResourceProto capability = 2; + } + ++message ContainerRetryContextProto { ++ optional ContainerRetryPolicyProto retry_policy = 1 [default = NEVER_RETRY]; ++ repeated int32 error_codes = 2; ++ optional int32 max_retries = 3 [default = 0]; ++ optional int32 retry_interval = 4 [default = 0]; ++} ++ ++enum ContainerRetryPolicyProto { ++ NEVER_RETRY = 0; ++ ALWAYS_RETRY = 1; ++ RETRY_ON_SPECIFIC_ERROR_CODE = 2; ++} ++ + //////////////////////////////////////////////////////////////////////// + ////// From common////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////// +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +index f410c43..f61059c 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +@@ -32,6 +32,7 @@ + import java.util.ArrayList; + import java.util.Collections; + import java.util.HashMap; ++import java.util.HashSet; + import java.util.Iterator; + import java.util.List; + import java.util.Map; +@@ -76,6 +77,8 @@ + import org.apache.hadoop.yarn.api.records.ContainerExitStatus; + import org.apache.hadoop.yarn.api.records.ContainerId; + import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; ++import org.apache.hadoop.yarn.api.records.ContainerRetryContext; ++import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; + import org.apache.hadoop.yarn.api.records.ContainerState; + import org.apache.hadoop.yarn.api.records.ContainerStatus; + import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +@@ -253,6 +256,13 @@ + // File length needed for local resource + private long shellScriptPathLen = 0; + ++ // Container retry options ++ private ContainerRetryPolicy containerRetryPolicy = ++ ContainerRetryPolicy.NEVER_RETRY; ++ private Set errorCodes = null; ++ private int maxRetries = 0; ++ private int retryInterval = 0; ++ + // Timeline domain ID + private String domainId = null; + +@@ -371,6 +381,17 @@ public boolean init(String[] args) throws ParseException, IOException { + opts.addOption("num_containers", true, + "No. of containers on which the shell command needs to be executed"); + opts.addOption("priority", true, "Application Priority. Default 0"); ++ opts.addOption("retry_policy", true, ++ "Retry policy when container fails to run, " ++ + "0: NEVER_RETRY, 1: ALWAYS_RETRY, " ++ + "2: RETRY_ON_SPECIFIC_ERROR_CODE"); ++ opts.addOption("error_codes", true, ++ "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes " ++ + "is specified with this option, e.g. --error_codes 1,2,3"); ++ opts.addOption("max_retries", true, "If container could retry, it specifies" ++ + " max retires"); ++ opts.addOption("retry_interval", true, "interval between each retry, unit " ++ + "is milliseconds"); + opts.addOption("debug", false, "Dump out debug information"); + + opts.addOption("help", false, "Print usage"); +@@ -508,6 +529,19 @@ public boolean init(String[] args) throws ParseException, IOException { + } + requestPriority = Integer.parseInt(cliParser + .getOptionValue("priority", "0")); ++ ++ containerRetryPolicy = ContainerRetryPolicy.values()[ ++ Integer.parseInt(cliParser.getOptionValue("retry_policy", "0"))]; ++ if (cliParser.hasOption("error_codes")) { ++ errorCodes = new HashSet<>(); ++ for (String errorCode : ++ cliParser.getOptionValue("error_codes").split(",")) { ++ errorCodes.add(Integer.parseInt(errorCode)); ++ } ++ } ++ maxRetries = Integer.parseInt(cliParser.getOptionValue("max_retries", "0")); ++ retryInterval = Integer.parseInt(cliParser.getOptionValue( ++ "retry_interval", "0")); + return true; + } + +@@ -1062,9 +1096,11 @@ public void run() { + // "hadoop dfs" command inside the distributed shell. + Map myShellEnv = new HashMap(shellEnv); + myShellEnv.put(YARN_SHELL_ID, shellId); ++ ContainerRetryContext containerRetryContext = ContainerRetryContext.newInstance( ++ containerRetryPolicy, errorCodes, maxRetries, retryInterval); + ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( + localResources, myShellEnv, commands, null, allTokens.duplicate(), +- null); ++ null, containerRetryContext); + containerListener.addContainer(container.getId(), container); + nmClientAsync.startContainerAsync(container, ctx); + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +index 68d2bde..15df60b 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +@@ -168,6 +168,8 @@ + + private long attemptFailuresValidityInterval = -1; + ++ private Vector containerRetryOptions = new Vector<>(5); ++ + // Debug flag + boolean debugFlag = false; + +@@ -287,6 +289,17 @@ public Client(Configuration conf) throws Exception { + + " will be allocated, \"\" means containers" + + " can be allocated anywhere, if you don't specify the option," + + " default node_label_expression of queue will be used."); ++ opts.addOption("retry_policy", true, ++ "Retry policy when container fails to run, " ++ + "0: NEVER_RETRY, 1: ALWAYS_RETRY, " ++ + "2: RETRY_ON_SPECIFIC_ERROR_CODE"); ++ opts.addOption("error_codes", true, ++ "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes " ++ + "is specified with this option, e.g. --error_codes 1,2,3"); ++ opts.addOption("max_retries", true, "If container could retry, it specifies" ++ + " max retries"); ++ opts.addOption("retry_interval", true, "interval between each retry, unit " ++ + "is milliseconds"); + } + + /** +@@ -429,6 +442,24 @@ public boolean init(String[] args) throws ParseException { + } + } + ++ // Get container retry options ++ if (cliParser.hasOption("retry_policy")) { ++ containerRetryOptions.add("--retry_policy " ++ + cliParser.getOptionValue("retry_policy")); ++ } ++ if (cliParser.hasOption("error_codes")) { ++ containerRetryOptions.add("--error_codes " ++ + cliParser.getOptionValue("error_codes")); ++ } ++ if (cliParser.hasOption("max_retries")) { ++ containerRetryOptions.add("--max_retries " ++ + cliParser.getOptionValue("max_retries")); ++ } ++ if (cliParser.hasOption("retry_interval")) { ++ containerRetryOptions.add("--retry_interval " ++ + cliParser.getOptionValue("retry_interval")); ++ } ++ + return true; + } + +@@ -638,6 +669,8 @@ public boolean run() throws IOException, YarnException { + vargs.add("--debug"); + } + ++ vargs.addAll(containerRetryOptions); ++ + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); + +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +index 12dcfcd..37eea46 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +@@ -29,10 +29,12 @@ + import org.apache.hadoop.classification.InterfaceStability.Unstable; + import org.apache.hadoop.yarn.api.records.ApplicationAccessType; + import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; ++import org.apache.hadoop.yarn.api.records.ContainerRetryContext; + import org.apache.hadoop.yarn.api.records.LocalResource; + import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; + import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; + import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder; ++import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; + import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; + import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto; + import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; +@@ -56,7 +58,8 @@ + private Map environment = null; + private List commands = null; + private Map applicationACLS = null; +- ++ private ContainerRetryContext containerRetryContext = null; ++ + public ContainerLaunchContextPBImpl() { + builder = ContainerLaunchContextProto.newBuilder(); + } +@@ -120,6 +123,10 @@ private void mergeLocalToBuilder() { + if (this.applicationACLS != null) { + addApplicationACLs(); + } ++ if (this.containerRetryContext != null) { ++ builder.setContainerRetryContext( ++ convertToProtoFormat(this.containerRetryContext)); ++ } + } + + private void mergeLocalToProto() { +@@ -456,6 +463,27 @@ public void setApplicationACLs( + this.applicationACLS.putAll(appACLs); + } + ++ public ContainerRetryContext getContainerRetryContext() { ++ ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; ++ if (this.containerRetryContext != null) { ++ return this.containerRetryContext; ++ } ++ if (!p.hasContainerRetryContext()) { ++ return null; ++ } ++ this.containerRetryContext = convertFromProtoFormat( ++ p.getContainerRetryContext()); ++ return this.containerRetryContext; ++ } ++ ++ public void setContainerRetryContext(ContainerRetryContext retryContext) { ++ maybeInitBuilder(); ++ if (retryContext == null) { ++ builder.clearContainerRetryContext(); ++ } ++ this.containerRetryContext = retryContext; ++ } ++ + private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { + return new LocalResourcePBImpl(p); + } +@@ -463,4 +491,14 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { + private LocalResourceProto convertToProtoFormat(LocalResource t) { + return ((LocalResourcePBImpl)t).getProto(); + } +-} ++ ++ private ContainerRetryContextPBImpl convertFromProtoFormat( ++ ContainerRetryContextProto p) { ++ return new ContainerRetryContextPBImpl(p); ++ } ++ ++ private ContainerRetryContextProto convertToProtoFormat( ++ ContainerRetryContext t) { ++ return ((ContainerRetryContextPBImpl)t).getProto(); ++ } ++} +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java +new file mode 100644 +index 0000000..a5ef70d +--- /dev/null ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java +@@ -0,0 +1,177 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.hadoop.yarn.api.records.impl.pb; ++ ++ ++import com.google.protobuf.TextFormat; ++import org.apache.hadoop.yarn.api.records.ContainerRetryContext; ++import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; ++import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; ++import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; ++import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProtoOrBuilder; ++ ++import java.util.HashSet; ++import java.util.Set; ++ ++/** ++ * Implementation of ContainerRetryContext. ++ */ ++public class ContainerRetryContextPBImpl extends ContainerRetryContext { ++ private ContainerRetryContextProto proto = ++ ContainerRetryContextProto.getDefaultInstance(); ++ private ContainerRetryContextProto.Builder builder = null; ++ private boolean viaProto = false; ++ ++ private Set errorCodes = null; ++ ++ public ContainerRetryContextPBImpl() { ++ builder = ContainerRetryContextProto.newBuilder(); ++ } ++ ++ public ContainerRetryContextPBImpl(ContainerRetryContextProto proto) { ++ this.proto = proto; ++ viaProto = true; ++ } ++ ++ public ContainerRetryContextProto getProto() { ++ mergeLocalToProto(); ++ proto = viaProto ? proto : builder.build(); ++ viaProto = true; ++ return proto; ++ } ++ ++ @Override ++ public int hashCode() { ++ return getProto().hashCode(); ++ } ++ ++ @Override ++ public boolean equals(Object other) { ++ if (other == null) { ++ return false; ++ } ++ if (other.getClass().isAssignableFrom(this.getClass())) { ++ return this.getProto().equals(this.getClass().cast(other).getProto()); ++ } ++ return false; ++ } ++ ++ @Override ++ public String toString() { ++ return TextFormat.shortDebugString(getProto()); ++ } ++ ++ private void mergeLocalToBuilder() { ++ if (this.errorCodes != null) { ++ builder.clearErrorCodes(); ++ builder.addAllErrorCodes(this.errorCodes); ++ } ++ } ++ ++ private void mergeLocalToProto() { ++ if (viaProto) { ++ maybeInitBuilder(); ++ } ++ mergeLocalToBuilder(); ++ proto = builder.build(); ++ viaProto = true; ++ } ++ ++ private void maybeInitBuilder() { ++ if (viaProto || builder == null) { ++ builder = ContainerRetryContextProto.newBuilder(proto); ++ } ++ viaProto = false; ++ } ++ ++ public ContainerRetryPolicy getRetryPolicy() { ++ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; ++ if (!p.hasRetryPolicy()) { ++ return ContainerRetryPolicy.NEVER_RETRY; ++ } ++ return convertFromProtoFormat(p.getRetryPolicy()); ++ } ++ ++ public void setRetryPolicy(ContainerRetryPolicy containerRetryPolicy) { ++ maybeInitBuilder(); ++ if (containerRetryPolicy == null) { ++ builder.clearRetryPolicy(); ++ return; ++ } ++ builder.setRetryPolicy(convertToProtoFormat(containerRetryPolicy)); ++ } ++ ++ private void initErrorCodes() { ++ if (this.errorCodes != null) { ++ return; ++ } ++ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; ++ this.errorCodes = new HashSet<>(); ++ this.errorCodes.addAll(p.getErrorCodesList()); ++ } ++ ++ public Set getErrorCodes() { ++ initErrorCodes(); ++ return this.errorCodes; ++ } ++ ++ public void setErrorCodes(Set errCodes) { ++ maybeInitBuilder(); ++ if (errCodes == null || errCodes.isEmpty()) { ++ builder.clearErrorCodes(); ++ } ++ this.errorCodes = errCodes; ++ } ++ ++ public int getMaxRetries() { ++ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; ++ if (!p.hasMaxRetries()) { ++ return 0; ++ } ++ return p.getMaxRetries(); ++ } ++ ++ public void setMaxRetries(int maxRetries) { ++ maybeInitBuilder(); ++ builder.setMaxRetries(maxRetries); ++ } ++ ++ public int getRetryInterval() { ++ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; ++ if (!p.hasRetryInterval()) { ++ return 0; ++ } ++ return p.getRetryInterval(); ++ } ++ ++ public void setRetryInterval(int retryInterval) { ++ maybeInitBuilder(); ++ builder.setRetryInterval(retryInterval); ++ } ++ ++ private ContainerRetryPolicyProto convertToProtoFormat( ++ ContainerRetryPolicy containerRetryPolicy) { ++ return ProtoUtils.convertToProtoFormat(containerRetryPolicy); ++ } ++ ++ private ContainerRetryPolicy convertFromProtoFormat( ++ ContainerRetryPolicyProto containerRetryPolicyProto) { ++ return ProtoUtils.convertFromProtoFormat(containerRetryPolicyProto); ++ } ++} +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +index 29ed0f3..d3d5948 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +@@ -26,6 +26,7 @@ + import org.apache.hadoop.yarn.api.records.AMCommand; + import org.apache.hadoop.yarn.api.records.ApplicationAccessType; + import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; ++import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; + import org.apache.hadoop.yarn.api.records.ContainerState; + import org.apache.hadoop.yarn.api.records.ExecutionType; + import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +@@ -54,6 +55,7 @@ + import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto; + import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; + import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; ++import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; + import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; + import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto; + import org.apache.hadoop.yarn.proto.YarnServiceProtos; +@@ -294,4 +296,17 @@ public static ExecutionTypeProto convertToProtoFormat(ExecutionType e) { + public static ExecutionType convertFromProtoFormat(ExecutionTypeProto e) { + return ExecutionType.valueOf(e.name()); + } ++ ++ /* ++ * ContainerRetryPolicy ++ */ ++ public static ContainerRetryPolicyProto convertToProtoFormat( ++ ContainerRetryPolicy e) { ++ return ContainerRetryPolicyProto.valueOf(e.name()); ++ } ++ ++ public static ContainerRetryPolicy convertFromProtoFormat( ++ ContainerRetryPolicyProto e) { ++ return ContainerRetryPolicy.valueOf(e.name()); ++ } + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +index b7f5ff7..96df10b 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +@@ -118,6 +118,7 @@ + import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; + import org.apache.hadoop.yarn.api.records.ContainerReport; + import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; ++import org.apache.hadoop.yarn.api.records.ContainerRetryContext; + import org.apache.hadoop.yarn.api.records.ContainerStatus; + import org.apache.hadoop.yarn.api.records.LocalResource; + import org.apache.hadoop.yarn.api.records.LogAggregationContext; +@@ -159,6 +160,7 @@ + import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; + import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl; + import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl; ++import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryContextPBImpl; + import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; + import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; + import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; +@@ -192,6 +194,7 @@ + import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; + import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto; + import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; ++import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; + import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; + import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; + import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +@@ -345,7 +348,7 @@ private static Object genTypeValue(Type type) { + return rand.nextBoolean(); + } else if (type.equals(byte.class)) { + return bytes[rand.nextInt(4)]; +- } else if (type.equals(int.class)) { ++ } else if (type.equals(int.class) || type.equals(Integer.class)) { + return rand.nextInt(1000000); + } else if (type.equals(long.class)) { + return Long.valueOf(rand.nextInt(1000000)); +@@ -469,6 +472,7 @@ public static void setup() throws Exception { + generateByNewInstance(ApplicationResourceUsageReport.class); + generateByNewInstance(ApplicationReport.class); + generateByNewInstance(Container.class); ++ generateByNewInstance(ContainerRetryContext.class); + generateByNewInstance(ContainerLaunchContext.class); + generateByNewInstance(ApplicationSubmissionContext.class); + generateByNewInstance(ContainerReport.class); +@@ -957,6 +961,12 @@ public void testContainerIdPBImpl() throws Exception { + } + + @Test ++ public void testContainerRetryPBImpl() throws Exception { ++ validatePBImplRecord(ContainerRetryContextPBImpl.class, ++ ContainerRetryContextProto.class); ++ } ++ ++ @Test + public void testContainerLaunchContextPBImpl() throws Exception { + validatePBImplRecord(ContainerLaunchContextPBImpl.class, + ContainerLaunchContextProto.class); +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +index 8c74bf5..d08ee67 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +@@ -308,6 +308,7 @@ public void writeLaunchEnv(OutputStream out, + } + + public enum ExitCode { ++ SUCCESS(0), + FORCE_KILLED(137), + TERMINATED(143), + LOST(154); +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +index 5cc4e19..976df7c 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +@@ -551,6 +551,10 @@ public Path getLocalPathForWrite(String pathStr, long size, + checkWrite); + } + ++ public Path getLocalPathForRead(String pathStr) throws IOException { ++ return getPathToRead(pathStr, getLocalDirsForRead()); ++ } ++ + public Path getLogPathForWrite(String pathStr, boolean checkWrite) + throws IOException { + return logDirsAllocator.getLocalPathForWrite(pathStr, +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +index f44de59..fb37a06 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +@@ -349,7 +349,8 @@ private void recoverContainer(RecoveredContainerState rcs) + Container container = new ContainerImpl(getConfig(), dispatcher, + context.getNMStateStore(), req.getContainerLaunchContext(), + credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), +- rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability()); ++ rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(), ++ rcs.getRemainingRetryAttempts()); + context.getContainers().put(containerId, container); + dispatcher.getEventHandler().handle( + new ApplicationContainerInitEvent(container)); +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +index 1d2ec56..310d8df 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +@@ -55,6 +55,10 @@ + + NMContainerStatus getNMContainerStatus(); + ++ boolean shouldRetry(int errorCode); ++ ++ boolean isRelaunch(); ++ + String toString(); + + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +index e16ea93..60da9f3 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +@@ -41,6 +41,8 @@ + import org.apache.hadoop.yarn.api.records.ContainerExitStatus; + import org.apache.hadoop.yarn.api.records.ContainerId; + import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; ++import org.apache.hadoop.yarn.api.records.ContainerRetryContext; ++import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; + import org.apache.hadoop.yarn.api.records.ContainerStatus; + import org.apache.hadoop.yarn.api.records.LocalResource; + import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +@@ -50,6 +52,7 @@ + import org.apache.hadoop.yarn.event.EventHandler; + import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; + import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; ++import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; + import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; + import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; + import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; +@@ -100,6 +103,10 @@ + private long containerLocalizationStartTime; + private long containerLaunchStartTime; + private static Clock clock = new SystemClock(); ++ private final ContainerRetryContext containerRetryContext; ++ // remaining retries to relaunch container if needed ++ private int remainingRetryAttempts; ++ private boolean isRelaunch = false; + + /** The NM-wide configuration - not specific to this container */ + private final Configuration daemonConf; +@@ -134,6 +141,13 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, + this.dispatcher = dispatcher; + this.stateStore = stateStore; + this.launchContext = launchContext; ++ if (launchContext != null ++ && launchContext.getContainerRetryContext() != null) { ++ this.containerRetryContext = launchContext.getContainerRetryContext(); ++ } else { ++ this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT; ++ } ++ this.remainingRetryAttempts = containerRetryContext.getMaxRetries(); + this.containerTokenIdentifier = containerTokenIdentifier; + this.containerId = containerTokenIdentifier.getContainerID(); + this.resource = containerTokenIdentifier.getResource(); +@@ -154,7 +168,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, + Credentials creds, NodeManagerMetrics metrics, + ContainerTokenIdentifier containerTokenIdentifier, + RecoveredContainerStatus recoveredStatus, int exitCode, +- String diagnostics, boolean wasKilled, Resource recoveredCapability) { ++ String diagnostics, boolean wasKilled, Resource recoveredCapability, ++ int remainingRetryAttempts) { + this(conf, dispatcher, stateStore, launchContext, creds, metrics, + containerTokenIdentifier); + this.recoveredStatus = recoveredStatus; +@@ -167,6 +182,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, + this.resource = Resource.newInstance(recoveredCapability.getMemory(), + recoveredCapability.getVirtualCores()); + } ++ if (remainingRetryAttempts != ContainerRetryContext.RETRY_INVALID) { ++ this.remainingRetryAttempts = remainingRetryAttempts; ++ } + } + + private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = +@@ -246,9 +264,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, + new ExitedWithSuccessTransition(true)) + .addTransition(ContainerState.RUNNING, +- ContainerState.EXITED_WITH_FAILURE, ++ EnumSet.of(ContainerState.EXITED_WITH_FAILURE, ++ ContainerState.LOCALIZED), + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, +- new ExitedWithFailureTransition(true)) ++ new RetryFailureTransition()) + .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) +@@ -854,6 +873,86 @@ public void transition(ContainerImpl container, ContainerEvent event) { + } + + /** ++ * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon ++ * CONTAINER_EXITED_WITH_FAILURE state. ++ **/ ++ @SuppressWarnings("unchecked") // dispatcher not typed ++ static class RetryFailureTransition implements ++ MultipleArcTransition { ++ ++ @Override ++ public ContainerState transition(final ContainerImpl container, ++ ContainerEvent event) { ++ ContainerExitEvent exitEvent = (ContainerExitEvent) event; ++ container.exitCode = exitEvent.getExitCode(); ++ if (exitEvent.getDiagnosticInfo() != null) { ++ container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); ++ } ++ ++ if (container.shouldRetry(container.exitCode)) { ++ if (container.remainingRetryAttempts > 0) { ++ container.remainingRetryAttempts--; ++ try { ++ container.stateStore.storeContainerRemainingRetryAttempts( ++ container.getContainerId(), container.remainingRetryAttempts); ++ } catch (IOException e) { ++ LOG.warn("Unable to update remainingRetryAttempts in state store for " ++ + container.getContainerId(), e); ++ } ++ } ++ LOG.info("Relaunch Container " + container.getContainerId() ++ + ". Remain retry attempts : " + container.remainingRetryAttempts ++ + ". Retry interval is " ++ + container.containerRetryContext.getRetryInterval() + "ms"); ++ container.isRelaunch = true; ++ container.wasLaunched = false; ++ container.metrics.endRunningContainer(); ++ // wait for some time, then relaunch ++ new Thread() { ++ @Override ++ public void run() { ++ try { ++ Thread.sleep(container.containerRetryContext.getRetryInterval()); ++ container.sendLaunchEvent(); ++ } catch (InterruptedException e) { ++ return; ++ } ++ } ++ }.start(); ++ return ContainerState.LOCALIZED; ++ } else { ++ new ExitedWithFailureTransition(true).transition(container, event); ++ return ContainerState.EXITED_WITH_FAILURE; ++ } ++ } ++ } ++ ++ @Override ++ public boolean shouldRetry(int errorCode) { ++ if (errorCode == ExitCode.SUCCESS.getExitCode() ++ || errorCode == ExitCode.FORCE_KILLED.getExitCode() ++ || errorCode == ExitCode.TERMINATED.getExitCode()) { ++ return false; ++ } ++ ++ ContainerRetryPolicy retryPolicy = containerRetryContext.getRetryPolicy(); ++ if (retryPolicy == ContainerRetryPolicy.ALWAYS_RETRY ++ || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE ++ && containerRetryContext.getErrorCodes() != null ++ && containerRetryContext.getErrorCodes().contains(errorCode))) { ++ return remainingRetryAttempts > 0 ++ || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER; ++ } ++ ++ return false; ++ } ++ ++ @Override ++ public boolean isRelaunch() { ++ return isRelaunch; ++ } ++ ++ /** + * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST + */ + static class KilledExternallyTransition extends ExitedWithFailureTransition { +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +index 6371b21..0b48abd 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +@@ -48,6 +48,7 @@ + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.io.IOUtils; + import org.apache.hadoop.security.Credentials; ++import org.apache.hadoop.util.DiskChecker; + import org.apache.hadoop.util.Shell; + import org.apache.hadoop.util.StringUtils; + import org.apache.hadoop.yarn.api.ApplicationConstants; +@@ -177,6 +178,10 @@ public Integer call() { + + Path containerLogDir; + try { ++ // Select the working directory for the container ++ Path containerWorkDir = getContainerWorkDir(container); ++ cleanupContainerFilesForRelaunch(container, containerWorkDir); ++ + localResources = container.getLocalizedResources(); + if (localResources == null) { + throw RPCUtil.getRemoteException( +@@ -191,8 +196,7 @@ public Integer call() { + String appIdStr = app.getAppId().toString(); + String relativeContainerLogDir = ContainerLaunch + .getRelativeContainerLogDir(appIdStr, containerIdStr); +- containerLogDir = +- dirsHandler.getLogPathForWrite(relativeContainerLogDir, false); ++ containerLogDir = getContainerLogDir(container); + for (String str : command) { + // TODO: Should we instead work via symlinks without this grammar? + newCmds.add(expandEnvironment(str, containerLogDir)); +@@ -226,14 +230,6 @@ public Integer call() { + DataOutputStream containerScriptOutStream = null; + DataOutputStream tokensOutStream = null; + +- // Select the working directory for the container +- Path containerWorkDir = +- dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE +- + Path.SEPARATOR + user + Path.SEPARATOR +- + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr +- + Path.SEPARATOR + containerIdStr, +- LocalDirAllocator.SIZE_UNKNOWN, false); +- + String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); + + // pid file should be in nm private dir so that it is not +@@ -329,7 +325,9 @@ public Integer call() { + completed.set(true); + exec.deactivateContainer(containerID); + try { +- context.getNMStateStore().storeContainerCompleted(containerID, ret); ++ if (!container.shouldRetry(ret)) { ++ context.getNMStateStore().storeContainerCompleted(containerID, ret); ++ } + } catch (IOException e) { + LOG.error("Unable to set exit code for container " + containerID); + } +@@ -369,6 +367,120 @@ public Integer call() { + } + + /** ++ * Get container's working directory. ++ * If container is in relaunch and we find its previous working directory ++ * could be read/write it will be reused as current working directory. ++ * We apply a simple heuristic to find its previous working directory: ++ * if a good work dir with the container tokens file already exists, ++ * it means it is container's previous working directory. ++ */ ++ private Path getContainerWorkDir(Container container1) throws IOException { ++ Path containerWorkDir = null; ++ String relativeContainerWorkDir = ContainerLocalizer.USERCACHE ++ + Path.SEPARATOR + container1.getUser() + Path.SEPARATOR ++ + ContainerLocalizer.APPCACHE + Path.SEPARATOR ++ + app.getAppId().toString() + Path.SEPARATOR ++ + ConverterUtils.toString(container1.getContainerId()); ++ ++ if (container1.isRelaunch()) { ++ try { ++ // container's tokens file's parent is container's previous working dir ++ containerWorkDir = dirsHandler.getLocalPathForRead( ++ relativeContainerWorkDir + Path.SEPARATOR ++ + ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE).getParent(); ++ // check whether containerWorDir could be read/write, it will throw ++ // IOException if not. ++ DiskChecker.checkDir( ++ new File(containerWorkDir.toUri().getPath())); ++ } catch (IOException e) { ++ containerWorkDir = null; ++ } ++ } ++ if (containerWorkDir == null) { ++ containerWorkDir = ++ dirsHandler.getLocalPathForWrite(relativeContainerWorkDir, ++ LocalDirAllocator.SIZE_UNKNOWN, false); ++ } ++ ++ return containerWorkDir; ++ } ++ ++ /** ++ * Get container's log directory. ++ * If container is in relaunch and we find its previous log directory ++ * could be read/write it will be reused as current log directory. ++ * We apply a simple heuristic to find its previous working directory: ++ * if a good work dir with file 'stdout' already exists, it means it ++ * is container's previous working directory. We assume there is 'stdout' ++ * in most containers's log directory. ++ */ ++ private Path getContainerLogDir(Container container1) throws IOException { ++ Path containerLogDir = null; ++ String relativeContainerLogDir = ContainerLaunch ++ .getRelativeContainerLogDir(app.getAppId().toString(), ++ ConverterUtils.toString(container1.getContainerId())); ++ ++ if (container1.isRelaunch()) { ++ try { ++ containerLogDir = dirsHandler.getLocalPathForRead( ++ relativeContainerLogDir + Path.SEPARATOR ++ + "stdout").getParent(); ++ // check whether containerWorDir could be read/write, it will throw ++ // IOException if not. ++ DiskChecker.checkDir( ++ new File(containerLogDir.toUri().getPath())); ++ } catch (IOException e) { ++ containerLogDir = null; ++ } ++ } ++ if (containerLogDir == null) { ++ containerLogDir = ++ dirsHandler.getLogPathForWrite(relativeContainerLogDir, false); ++ } ++ ++ return containerLogDir; ++ } ++ ++ /** ++ * Clean up container's previous files for container relaunch. ++ */ ++ private void cleanupContainerFilesForRelaunch(Container container1, ++ Path containerWorkDir) { ++ if (!container1.isRelaunch()) { ++ return; ++ } ++ ++ try { ++ FileContext lfs = FileContext.getLocalFSFileContext(); ++ // delete ContainerScriptPath ++ lfs.delete(new Path(containerWorkDir, CONTAINER_SCRIPT), false); ++ // delete TokensPath ++ lfs.delete( ++ new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE), false); ++ } catch (IOException e) { ++ LOG.error("Failed to delete container script file and token file", e); ++ } ++ ++ String appIdStr = app.getAppId().toString(); ++ String containerIdStr = ++ ConverterUtils.toString(container1.getContainerId()); ++ // delete pid file and pid exit code file ++ String pidPath = getPidFileSubpath(appIdStr, containerIdStr); ++ deleteLocalPath(pidPath); ++ deleteLocalPath(getExitCodeFile(pidPath)); ++ } ++ ++ private void deleteLocalPath(String relativePath) { ++ try { ++ FileContext lfs = FileContext.getLocalFSFileContext(); ++ Path path = dirsHandler.getLocalPathForRead(relativePath); ++ lfs.delete(path, false); ++ } catch (IOException e) { ++ LOG.error("Failed to delete " + relativePath, e); ++ } ++ } ++ ++ /** + * Tries to tail and fetch TAIL_SIZE_IN_BYTES of data from the error log. + * ErrorLog filename is not fixed and depends upon app, hence file name + * pattern is used. +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +index 89c71bb..db84485 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +@@ -106,6 +106,8 @@ + "/resourceChanged"; + private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; + private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; ++ private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX = ++ "/remainingRetryAttempts"; + + private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; + private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; +@@ -238,6 +240,8 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, + } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) { + rcs.capability = new ResourcePBImpl( + ResourceProto.parseFrom(entry.getValue())); ++ } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { ++ rcs.setRemainingRetryAttempts(Integer.parseInt(asString(entry.getValue()))); + } else { + throw new IOException("Unexpected container state key: " + key); + } +@@ -321,6 +325,18 @@ public void storeContainerCompleted(ContainerId containerId, + } + + @Override ++ public void storeContainerRemainingRetryAttempts(ContainerId containerId, ++ int remainingRetryAttempts) throws IOException { ++ String key = CONTAINERS_KEY_PREFIX + containerId.toString() ++ + CONTAINER_REMAIN_RETRIES_KEY_SUFFIX; ++ try { ++ db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts))); ++ } catch (DBException e) { ++ throw new IOException(e); ++ } ++ } ++ ++ @Override + public void removeContainer(ContainerId containerId) + throws IOException { + String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString(); +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +index d5dce9b..68f4e87 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +@@ -104,6 +104,11 @@ public void storeContainerCompleted(ContainerId containerId, int exitCode) + } + + @Override ++ public void storeContainerRemainingRetryAttempts(ContainerId containerId, ++ int remainingRetryAttempts) throws IOException { ++ } ++ ++ @Override + public void removeContainer(ContainerId containerId) throws IOException { + } + +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +index e8ccf54..b733ffe 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +@@ -34,6 +34,7 @@ + import org.apache.hadoop.yarn.api.records.ApplicationId; + import org.apache.hadoop.yarn.api.records.ContainerExitStatus; + import org.apache.hadoop.yarn.api.records.ContainerId; ++import org.apache.hadoop.yarn.api.records.ContainerRetryContext; + import org.apache.hadoop.yarn.api.records.Resource; + import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; + import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; +@@ -76,6 +77,7 @@ public NMStateStoreService(String name) { + String diagnostics = ""; + StartContainerRequest startRequest; + Resource capability; ++ private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID; + + public RecoveredContainerStatus getStatus() { + return status; +@@ -100,6 +102,14 @@ public StartContainerRequest getStartRequest() { + public Resource getCapability() { + return capability; + } ++ ++ public int getRemainingRetryAttempts() { ++ return remainingRetryAttempts; ++ } ++ ++ public void setRemainingRetryAttempts(int retryAttempts) { ++ this.remainingRetryAttempts = retryAttempts; ++ } + } + + public static class LocalResourceTrackerState { +@@ -325,6 +335,15 @@ public abstract void storeContainerDiagnostics(ContainerId containerId, + StringBuilder diagnostics) throws IOException; + + /** ++ * Record remaining retry attempts for a container. ++ * @param containerId the container ID ++ * @param remainingRetryAttempts the remain retry times when container fails to run ++ * @throws IOException ++ */ ++ public abstract void storeContainerRemainingRetryAttempts( ++ ContainerId containerId, int remainingRetryAttempts) throws IOException; ++ ++ /** + * Remove records corresponding to a container + * @param containerId the container ID + * @throws IOException +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +index 2ab9842..7a26df5 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +@@ -54,6 +54,8 @@ + import org.apache.hadoop.yarn.api.records.ContainerExitStatus; + import org.apache.hadoop.yarn.api.records.ContainerId; + import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; ++import org.apache.hadoop.yarn.api.records.ContainerRetryContext; ++import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; + import org.apache.hadoop.yarn.api.records.ContainerStatus; + import org.apache.hadoop.yarn.api.records.LocalResource; + import org.apache.hadoop.yarn.api.records.LocalResourceType; +@@ -645,6 +647,66 @@ public void testLaunchAfterKillRequest() throws Exception { + } + } + ++ @Test ++ public void testContainerRetry() throws Exception{ ++ ContainerRetryContext containerRetryContext1 = ContainerRetryContext.newInstance( ++ ContainerRetryPolicy.NEVER_RETRY, null, 3, 0); ++ testContainerRetry(containerRetryContext1, 2, 0); ++ ++ ContainerRetryContext containerRetryContext2 = ContainerRetryContext.newInstance( ++ ContainerRetryPolicy.ALWAYS_RETRY, null, 3, 0); ++ testContainerRetry(containerRetryContext2, 2, 3); ++ ++ ContainerRetryContext containerRetryContext3 = ContainerRetryContext.newInstance( ++ ContainerRetryPolicy.ALWAYS_RETRY, null, 3, 0); ++ // If exit code is 0, it will not retry ++ testContainerRetry(containerRetryContext3, 0, 0); ++ ++ ContainerRetryContext containerRetryContext4 = ContainerRetryContext.newInstance( ++ ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, null, 3, 0); ++ testContainerRetry(containerRetryContext4, 2, 0); ++ ++ HashSet errorCodes = new HashSet<>(); ++ errorCodes.add(2); ++ errorCodes.add(6); ++ ContainerRetryContext containerRetryContext5 = ContainerRetryContext.newInstance( ++ ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, errorCodes, 3, 0); ++ testContainerRetry(containerRetryContext5, 2, 3); ++ ++ HashSet errorCodes2 = new HashSet<>(); ++ errorCodes.add(143); ++ ContainerRetryContext containerRetryContext6 = ContainerRetryContext.newInstance( ++ ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, errorCodes2, 3, 0); ++ // If exit code is 143(SIGTERM), it will not retry even it is in errorCodes. ++ testContainerRetry(containerRetryContext5, 143, 0); ++ } ++ ++ private void testContainerRetry(ContainerRetryContext containerRetryContext, int exitCode, ++ int expectedRetries) throws Exception{ ++ WrappedContainer wc = null; ++ try { ++ int retryTimes = 0; ++ wc = new WrappedContainer(24, 314159265358979L, 4344, "yak", ++ containerRetryContext); ++ wc.initContainer(); ++ wc.localizeResources(); ++ wc.launchContainer(); ++ while (true) { ++ wc.containerFailed(exitCode); ++ if (wc.c.getContainerState() == ContainerState.RUNNING) { ++ retryTimes ++; ++ } else { ++ break; ++ } ++ } ++ Assert.assertEquals(expectedRetries, retryTimes); ++ } finally { ++ if (wc != null) { ++ wc.finished(); ++ } ++ } ++ } ++ + private void verifyCleanupCall(WrappedContainer wc) throws Exception { + ResourcesReleasedMatcher matchesReq = + new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( +@@ -782,12 +844,23 @@ public boolean matches(Object o) { + + WrappedContainer(int appId, long timestamp, int id, String user) + throws IOException { +- this(appId, timestamp, id, user, true, false); ++ this(appId, timestamp, id, user, null); ++ } ++ ++ WrappedContainer(int appId, long timestamp, int id, String user, ++ ContainerRetryContext containerRetryContext) throws IOException { ++ this(appId, timestamp, id, user, true, false, containerRetryContext); + } + +- @SuppressWarnings("rawtypes") + WrappedContainer(int appId, long timestamp, int id, String user, + boolean withLocalRes, boolean withServiceData) throws IOException { ++ this(appId, timestamp, id, user, withLocalRes, withServiceData, null); ++ } ++ ++ @SuppressWarnings("rawtypes") ++ WrappedContainer(int appId, long timestamp, int id, String user, ++ boolean withLocalRes, boolean withServiceData, ++ ContainerRetryContext containerRetryContext) throws IOException { + dispatcher = new DrainDispatcher(); + dispatcher.init(new Configuration()); + +@@ -864,6 +937,8 @@ public boolean matches(Object o) { + } + when(ctxt.getServiceData()).thenReturn(serviceData); + ++ when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext); ++ + c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(), + ctxt, null, metrics, identifier); + dispatcher.register(ContainerEventType.class, +@@ -984,6 +1059,10 @@ public void containerFailed(int exitCode) { + assert containerStatus.getDiagnostics().contains(diagnosticMsg); + assert containerStatus.getExitStatus() == exitCode; + drainDispatcherEvents(); ++ // If container needs retry, relaunch it ++ if (c.getContainerState() == ContainerState.LOCALIZED) { ++ launchContainer(); ++ } + } + + public void killContainer() { +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hado \ No newline at end of file