diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
index 932945b..ed64b54 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
@@ -46,6 +47,7 @@
*
Optional, application-specific binary service data.
* Environment variables for the launched process.
* Command to launch the container.
+ * Retry strategy when container fails to run.
*
*
* @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest)
@@ -61,6 +63,18 @@ public static ContainerLaunchContext newInstance(
Map environment, List commands,
Map serviceData, ByteBuffer tokens,
Map acls) {
+ return newInstance(localResources, environment, commands, serviceData,
+ tokens, acls, null);
+ }
+
+ @Public
+ @Unstable
+ public static ContainerLaunchContext newInstance(
+ Map localResources,
+ Map environment, List commands,
+ Map serviceData, ByteBuffer tokens,
+ Map acls,
+ ContainerRetryContext containerRetryContext) {
ContainerLaunchContext container =
Records.newRecord(ContainerLaunchContext.class);
container.setLocalResources(localResources);
@@ -69,6 +83,7 @@ public static ContainerLaunchContext newInstance(
container.setServiceData(serviceData);
container.setTokens(tokens);
container.setApplicationACLs(acls);
+ container.setContainerRetryContext(containerRetryContext);
return container;
}
@@ -195,4 +210,22 @@ public static ContainerLaunchContext newInstance(
@Public
@Stable
public abstract void setApplicationACLs(Map acls);
+
+ /**
+ * Get the ContainerRetryContext to relaunch container.
+ * @return ContainerRetryContext to relaunch container.
+ */
+ @Public
+ @Unstable
+ public abstract ContainerRetryContext getContainerRetryContext();
+
+ /**
+ * Set the ContainerRetryContext to relaunch container.
+ * @param containerRetryContext ContainerRetryContext to
+ * relaunch container.
+ */
+ @Public
+ @Unstable
+ public abstract void setContainerRetryContext(
+ ContainerRetryContext containerRetryContext);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java
new file mode 100644
index 0000000..f257967
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.Set;
+
+/**
+ * {@code ContainerRetryContext} indicates how container retry after it fails
+ * to run.
+ *
+ * It provides details such as:
+ *
+ * -
+ * {@link ContainerRetryPolicy} :
+ * - NEVER_RETRY(DEFAULT value): no matter what error code is when container
+ * fails to run, just do not retry.
+ * - ALWAYS_RETRY: no matter what error code is, when container fails to
+ * run, just retry.
+ * - RETRY_ON_SPECIFIC_ERROR_CODE: when container fails to run, do retry if
+ * the error code is one of errorCodes, otherwise do not retry.
+ *
+ * Note: if error code is 137(SIGKILL) or 143(SIGTERM), it will not retry
+ * because it is usually killed on purpose.
+ *
+ * -
+ * maxRetries specifies how many times to retry if need to retry.
+ * If the value is -1, it means retry forever.
+ *
+ * - retryInterval specifies delaying some time before relaunch
+ * container, the unit is millisecond.
+ *
+ */
+@Public
+@Unstable
+public abstract class ContainerRetryContext {
+ public static final int RETRY_FOREVER = -1;
+ public static final int RETRY_INVALID = -1000;
+ public static final ContainerRetryContext NEVER_RETRY_CONTEXT =
+ newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 0, 0);
+
+ @Private
+ @Unstable
+ public static ContainerRetryContext newInstance(
+ ContainerRetryPolicy retryPolicy, Set errorCodes,
+ int maxRetries, int retryInterval) {
+ ContainerRetryContext containerRetryContext =
+ Records.newRecord(ContainerRetryContext.class);
+ containerRetryContext.setRetryPolicy(retryPolicy);
+ containerRetryContext.setErrorCodes(errorCodes);
+ containerRetryContext.setMaxRetries(maxRetries);
+ containerRetryContext.setRetryInterval(retryInterval);
+ return containerRetryContext;
+ }
+
+ public abstract ContainerRetryPolicy getRetryPolicy();
+ public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy);
+ public abstract Set getErrorCodes();
+ public abstract void setErrorCodes(Set errorCodes);
+ public abstract int getMaxRetries();
+ public abstract void setMaxRetries(int maxRetries);
+ public abstract int getRetryInterval();
+ public abstract void setRetryInterval(int retryInterval);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java
new file mode 100644
index 0000000..a3816c1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Retry policy for relaunching a Container.
+ */
+@Public
+@Unstable
+public enum ContainerRetryPolicy {
+ /** Never retry. */
+ NEVER_RETRY,
+ /** Always retry. */
+ ALWAYS_RETRY,
+ /** Retry for specific error codes. */
+ RETRY_ON_SPECIFIC_ERROR_CODE
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index d122f5a..ace442d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -487,6 +487,7 @@ message ContainerLaunchContextProto {
repeated StringStringMapProto environment = 4;
repeated string command = 5;
repeated ApplicationACLMapProto application_ACLs = 6;
+ optional ContainerRetryContextProto container_retry_context = 7;
}
message ContainerStatusProto {
@@ -510,6 +511,19 @@ message ContainerResourceChangeRequestProto {
optional ResourceProto capability = 2;
}
+message ContainerRetryContextProto {
+ optional ContainerRetryPolicyProto retry_policy = 1 [default = NEVER_RETRY];
+ repeated int32 error_codes = 2;
+ optional int32 max_retries = 3 [default = 0];
+ optional int32 retry_interval = 4 [default = 0];
+}
+
+enum ContainerRetryPolicyProto {
+ NEVER_RETRY = 0;
+ ALWAYS_RETRY = 1;
+ RETRY_ON_SPECIFIC_ERROR_CODE = 2;
+}
+
////////////////////////////////////////////////////////////////////////
////// From common//////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index f410c43..f61059c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -32,6 +32,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -76,6 +77,8 @@
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -253,6 +256,13 @@
// File length needed for local resource
private long shellScriptPathLen = 0;
+ // Container retry options
+ private ContainerRetryPolicy containerRetryPolicy =
+ ContainerRetryPolicy.NEVER_RETRY;
+ private Set errorCodes = null;
+ private int maxRetries = 0;
+ private int retryInterval = 0;
+
// Timeline domain ID
private String domainId = null;
@@ -371,6 +381,17 @@ public boolean init(String[] args) throws ParseException, IOException {
opts.addOption("num_containers", true,
"No. of containers on which the shell command needs to be executed");
opts.addOption("priority", true, "Application Priority. Default 0");
+ opts.addOption("retry_policy", true,
+ "Retry policy when container fails to run, "
+ + "0: NEVER_RETRY, 1: ALWAYS_RETRY, "
+ + "2: RETRY_ON_SPECIFIC_ERROR_CODE");
+ opts.addOption("error_codes", true,
+ "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes "
+ + "is specified with this option, e.g. --error_codes 1,2,3");
+ opts.addOption("max_retries", true, "If container could retry, it specifies"
+ + " max retires");
+ opts.addOption("retry_interval", true, "interval between each retry, unit "
+ + "is milliseconds");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("help", false, "Print usage");
@@ -508,6 +529,19 @@ public boolean init(String[] args) throws ParseException, IOException {
}
requestPriority = Integer.parseInt(cliParser
.getOptionValue("priority", "0"));
+
+ containerRetryPolicy = ContainerRetryPolicy.values()[
+ Integer.parseInt(cliParser.getOptionValue("retry_policy", "0"))];
+ if (cliParser.hasOption("error_codes")) {
+ errorCodes = new HashSet<>();
+ for (String errorCode :
+ cliParser.getOptionValue("error_codes").split(",")) {
+ errorCodes.add(Integer.parseInt(errorCode));
+ }
+ }
+ maxRetries = Integer.parseInt(cliParser.getOptionValue("max_retries", "0"));
+ retryInterval = Integer.parseInt(cliParser.getOptionValue(
+ "retry_interval", "0"));
return true;
}
@@ -1062,9 +1096,11 @@ public void run() {
// "hadoop dfs" command inside the distributed shell.
Map myShellEnv = new HashMap(shellEnv);
myShellEnv.put(YARN_SHELL_ID, shellId);
+ ContainerRetryContext containerRetryContext = ContainerRetryContext.newInstance(
+ containerRetryPolicy, errorCodes, maxRetries, retryInterval);
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, myShellEnv, commands, null, allTokens.duplicate(),
- null);
+ null, containerRetryContext);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 68d2bde..15df60b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -168,6 +168,8 @@
private long attemptFailuresValidityInterval = -1;
+ private Vector containerRetryOptions = new Vector<>(5);
+
// Debug flag
boolean debugFlag = false;
@@ -287,6 +289,17 @@ public Client(Configuration conf) throws Exception {
+ " will be allocated, \"\" means containers"
+ " can be allocated anywhere, if you don't specify the option,"
+ " default node_label_expression of queue will be used.");
+ opts.addOption("retry_policy", true,
+ "Retry policy when container fails to run, "
+ + "0: NEVER_RETRY, 1: ALWAYS_RETRY, "
+ + "2: RETRY_ON_SPECIFIC_ERROR_CODE");
+ opts.addOption("error_codes", true,
+ "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes "
+ + "is specified with this option, e.g. --error_codes 1,2,3");
+ opts.addOption("max_retries", true, "If container could retry, it specifies"
+ + " max retries");
+ opts.addOption("retry_interval", true, "interval between each retry, unit "
+ + "is milliseconds");
}
/**
@@ -429,6 +442,24 @@ public boolean init(String[] args) throws ParseException {
}
}
+ // Get container retry options
+ if (cliParser.hasOption("retry_policy")) {
+ containerRetryOptions.add("--retry_policy "
+ + cliParser.getOptionValue("retry_policy"));
+ }
+ if (cliParser.hasOption("error_codes")) {
+ containerRetryOptions.add("--error_codes "
+ + cliParser.getOptionValue("error_codes"));
+ }
+ if (cliParser.hasOption("max_retries")) {
+ containerRetryOptions.add("--max_retries "
+ + cliParser.getOptionValue("max_retries"));
+ }
+ if (cliParser.hasOption("retry_interval")) {
+ containerRetryOptions.add("--retry_interval "
+ + cliParser.getOptionValue("retry_interval"));
+ }
+
return true;
}
@@ -638,6 +669,8 @@ public boolean run() throws IOException, YarnException {
vargs.add("--debug");
}
+ vargs.addAll(containerRetryOptions);
+
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
index 12dcfcd..37eea46 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
@@ -29,10 +29,12 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
@@ -56,7 +58,8 @@
private Map environment = null;
private List commands = null;
private Map applicationACLS = null;
-
+ private ContainerRetryContext containerRetryContext = null;
+
public ContainerLaunchContextPBImpl() {
builder = ContainerLaunchContextProto.newBuilder();
}
@@ -120,6 +123,10 @@ private void mergeLocalToBuilder() {
if (this.applicationACLS != null) {
addApplicationACLs();
}
+ if (this.containerRetryContext != null) {
+ builder.setContainerRetryContext(
+ convertToProtoFormat(this.containerRetryContext));
+ }
}
private void mergeLocalToProto() {
@@ -456,6 +463,27 @@ public void setApplicationACLs(
this.applicationACLS.putAll(appACLs);
}
+ public ContainerRetryContext getContainerRetryContext() {
+ ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.containerRetryContext != null) {
+ return this.containerRetryContext;
+ }
+ if (!p.hasContainerRetryContext()) {
+ return null;
+ }
+ this.containerRetryContext = convertFromProtoFormat(
+ p.getContainerRetryContext());
+ return this.containerRetryContext;
+ }
+
+ public void setContainerRetryContext(ContainerRetryContext retryContext) {
+ maybeInitBuilder();
+ if (retryContext == null) {
+ builder.clearContainerRetryContext();
+ }
+ this.containerRetryContext = retryContext;
+ }
+
private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
return new LocalResourcePBImpl(p);
}
@@ -463,4 +491,14 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
private LocalResourceProto convertToProtoFormat(LocalResource t) {
return ((LocalResourcePBImpl)t).getProto();
}
-}
+
+ private ContainerRetryContextPBImpl convertFromProtoFormat(
+ ContainerRetryContextProto p) {
+ return new ContainerRetryContextPBImpl(p);
+ }
+
+ private ContainerRetryContextProto convertToProtoFormat(
+ ContainerRetryContext t) {
+ return ((ContainerRetryContextPBImpl)t).getProto();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
new file mode 100644
index 0000000..a5ef70d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProtoOrBuilder;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Implementation of ContainerRetryContext.
+ */
+public class ContainerRetryContextPBImpl extends ContainerRetryContext {
+ private ContainerRetryContextProto proto =
+ ContainerRetryContextProto.getDefaultInstance();
+ private ContainerRetryContextProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ private Set errorCodes = null;
+
+ public ContainerRetryContextPBImpl() {
+ builder = ContainerRetryContextProto.newBuilder();
+ }
+
+ public ContainerRetryContextPBImpl(ContainerRetryContextProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ContainerRetryContextProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.errorCodes != null) {
+ builder.clearErrorCodes();
+ builder.addAllErrorCodes(this.errorCodes);
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ContainerRetryContextProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ public ContainerRetryPolicy getRetryPolicy() {
+ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasRetryPolicy()) {
+ return ContainerRetryPolicy.NEVER_RETRY;
+ }
+ return convertFromProtoFormat(p.getRetryPolicy());
+ }
+
+ public void setRetryPolicy(ContainerRetryPolicy containerRetryPolicy) {
+ maybeInitBuilder();
+ if (containerRetryPolicy == null) {
+ builder.clearRetryPolicy();
+ return;
+ }
+ builder.setRetryPolicy(convertToProtoFormat(containerRetryPolicy));
+ }
+
+ private void initErrorCodes() {
+ if (this.errorCodes != null) {
+ return;
+ }
+ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
+ this.errorCodes = new HashSet<>();
+ this.errorCodes.addAll(p.getErrorCodesList());
+ }
+
+ public Set getErrorCodes() {
+ initErrorCodes();
+ return this.errorCodes;
+ }
+
+ public void setErrorCodes(Set errCodes) {
+ maybeInitBuilder();
+ if (errCodes == null || errCodes.isEmpty()) {
+ builder.clearErrorCodes();
+ }
+ this.errorCodes = errCodes;
+ }
+
+ public int getMaxRetries() {
+ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasMaxRetries()) {
+ return 0;
+ }
+ return p.getMaxRetries();
+ }
+
+ public void setMaxRetries(int maxRetries) {
+ maybeInitBuilder();
+ builder.setMaxRetries(maxRetries);
+ }
+
+ public int getRetryInterval() {
+ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasRetryInterval()) {
+ return 0;
+ }
+ return p.getRetryInterval();
+ }
+
+ public void setRetryInterval(int retryInterval) {
+ maybeInitBuilder();
+ builder.setRetryInterval(retryInterval);
+ }
+
+ private ContainerRetryPolicyProto convertToProtoFormat(
+ ContainerRetryPolicy containerRetryPolicy) {
+ return ProtoUtils.convertToProtoFormat(containerRetryPolicy);
+ }
+
+ private ContainerRetryPolicy convertFromProtoFormat(
+ ContainerRetryPolicyProto containerRetryPolicyProto) {
+ return ProtoUtils.convertFromProtoFormat(containerRetryPolicyProto);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 29ed0f3..d3d5948 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -54,6 +55,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
@@ -294,4 +296,17 @@ public static ExecutionTypeProto convertToProtoFormat(ExecutionType e) {
public static ExecutionType convertFromProtoFormat(ExecutionTypeProto e) {
return ExecutionType.valueOf(e.name());
}
+
+ /*
+ * ContainerRetryPolicy
+ */
+ public static ContainerRetryPolicyProto convertToProtoFormat(
+ ContainerRetryPolicy e) {
+ return ContainerRetryPolicyProto.valueOf(e.name());
+ }
+
+ public static ContainerRetryPolicy convertFromProtoFormat(
+ ContainerRetryPolicyProto e) {
+ return ContainerRetryPolicy.valueOf(e.name());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index b7f5ff7..96df10b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -118,6 +118,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
@@ -159,6 +160,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
@@ -192,6 +194,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@@ -345,7 +348,7 @@ private static Object genTypeValue(Type type) {
return rand.nextBoolean();
} else if (type.equals(byte.class)) {
return bytes[rand.nextInt(4)];
- } else if (type.equals(int.class)) {
+ } else if (type.equals(int.class) || type.equals(Integer.class)) {
return rand.nextInt(1000000);
} else if (type.equals(long.class)) {
return Long.valueOf(rand.nextInt(1000000));
@@ -469,6 +472,7 @@ public static void setup() throws Exception {
generateByNewInstance(ApplicationResourceUsageReport.class);
generateByNewInstance(ApplicationReport.class);
generateByNewInstance(Container.class);
+ generateByNewInstance(ContainerRetryContext.class);
generateByNewInstance(ContainerLaunchContext.class);
generateByNewInstance(ApplicationSubmissionContext.class);
generateByNewInstance(ContainerReport.class);
@@ -957,6 +961,12 @@ public void testContainerIdPBImpl() throws Exception {
}
@Test
+ public void testContainerRetryPBImpl() throws Exception {
+ validatePBImplRecord(ContainerRetryContextPBImpl.class,
+ ContainerRetryContextProto.class);
+ }
+
+ @Test
public void testContainerLaunchContextPBImpl() throws Exception {
validatePBImplRecord(ContainerLaunchContextPBImpl.class,
ContainerLaunchContextProto.class);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index 8c74bf5..d08ee67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -308,6 +308,7 @@ public void writeLaunchEnv(OutputStream out,
}
public enum ExitCode {
+ SUCCESS(0),
FORCE_KILLED(137),
TERMINATED(143),
LOST(154);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
index 5cc4e19..976df7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
@@ -551,6 +551,10 @@ public Path getLocalPathForWrite(String pathStr, long size,
checkWrite);
}
+ public Path getLocalPathForRead(String pathStr) throws IOException {
+ return getPathToRead(pathStr, getLocalDirsForRead());
+ }
+
public Path getLogPathForWrite(String pathStr, boolean checkWrite)
throws IOException {
return logDirsAllocator.getLocalPathForWrite(pathStr,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index f44de59..fb37a06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -349,7 +349,8 @@ private void recoverContainer(RecoveredContainerState rcs)
Container container = new ContainerImpl(getConfig(), dispatcher,
context.getNMStateStore(), req.getContainerLaunchContext(),
credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
- rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability());
+ rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(),
+ rcs.getRemainingRetryAttempts());
context.getContainers().put(containerId, container);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index 1d2ec56..310d8df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -55,6 +55,10 @@
NMContainerStatus getNMContainerStatus();
+ boolean shouldRetry(int errorCode);
+
+ boolean isRelaunch();
+
String toString();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index e16ea93..60da9f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -41,6 +41,8 @@
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -50,6 +52,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
@@ -100,6 +103,10 @@
private long containerLocalizationStartTime;
private long containerLaunchStartTime;
private static Clock clock = new SystemClock();
+ private final ContainerRetryContext containerRetryContext;
+ // remaining retries to relaunch container if needed
+ private int remainingRetryAttempts;
+ private boolean isRelaunch = false;
/** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf;
@@ -134,6 +141,13 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
this.dispatcher = dispatcher;
this.stateStore = stateStore;
this.launchContext = launchContext;
+ if (launchContext != null
+ && launchContext.getContainerRetryContext() != null) {
+ this.containerRetryContext = launchContext.getContainerRetryContext();
+ } else {
+ this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT;
+ }
+ this.remainingRetryAttempts = containerRetryContext.getMaxRetries();
this.containerTokenIdentifier = containerTokenIdentifier;
this.containerId = containerTokenIdentifier.getContainerID();
this.resource = containerTokenIdentifier.getResource();
@@ -154,7 +168,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
Credentials creds, NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier,
RecoveredContainerStatus recoveredStatus, int exitCode,
- String diagnostics, boolean wasKilled, Resource recoveredCapability) {
+ String diagnostics, boolean wasKilled, Resource recoveredCapability,
+ int remainingRetryAttempts) {
this(conf, dispatcher, stateStore, launchContext, creds, metrics,
containerTokenIdentifier);
this.recoveredStatus = recoveredStatus;
@@ -167,6 +182,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
this.resource = Resource.newInstance(recoveredCapability.getMemory(),
recoveredCapability.getVirtualCores());
}
+ if (remainingRetryAttempts != ContainerRetryContext.RETRY_INVALID) {
+ this.remainingRetryAttempts = remainingRetryAttempts;
+ }
}
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
@@ -246,9 +264,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition(true))
.addTransition(ContainerState.RUNNING,
- ContainerState.EXITED_WITH_FAILURE,
+ EnumSet.of(ContainerState.EXITED_WITH_FAILURE,
+ ContainerState.LOCALIZED),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
- new ExitedWithFailureTransition(true))
+ new RetryFailureTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
@@ -854,6 +873,86 @@ public void transition(ContainerImpl container, ContainerEvent event) {
}
/**
+ * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon
+ * CONTAINER_EXITED_WITH_FAILURE state.
+ **/
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ static class RetryFailureTransition implements
+ MultipleArcTransition {
+
+ @Override
+ public ContainerState transition(final ContainerImpl container,
+ ContainerEvent event) {
+ ContainerExitEvent exitEvent = (ContainerExitEvent) event;
+ container.exitCode = exitEvent.getExitCode();
+ if (exitEvent.getDiagnosticInfo() != null) {
+ container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
+ }
+
+ if (container.shouldRetry(container.exitCode)) {
+ if (container.remainingRetryAttempts > 0) {
+ container.remainingRetryAttempts--;
+ try {
+ container.stateStore.storeContainerRemainingRetryAttempts(
+ container.getContainerId(), container.remainingRetryAttempts);
+ } catch (IOException e) {
+ LOG.warn("Unable to update remainingRetryAttempts in state store for "
+ + container.getContainerId(), e);
+ }
+ }
+ LOG.info("Relaunch Container " + container.getContainerId()
+ + ". Remain retry attempts : " + container.remainingRetryAttempts
+ + ". Retry interval is "
+ + container.containerRetryContext.getRetryInterval() + "ms");
+ container.isRelaunch = true;
+ container.wasLaunched = false;
+ container.metrics.endRunningContainer();
+ // wait for some time, then relaunch
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(container.containerRetryContext.getRetryInterval());
+ container.sendLaunchEvent();
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }.start();
+ return ContainerState.LOCALIZED;
+ } else {
+ new ExitedWithFailureTransition(true).transition(container, event);
+ return ContainerState.EXITED_WITH_FAILURE;
+ }
+ }
+ }
+
+ @Override
+ public boolean shouldRetry(int errorCode) {
+ if (errorCode == ExitCode.SUCCESS.getExitCode()
+ || errorCode == ExitCode.FORCE_KILLED.getExitCode()
+ || errorCode == ExitCode.TERMINATED.getExitCode()) {
+ return false;
+ }
+
+ ContainerRetryPolicy retryPolicy = containerRetryContext.getRetryPolicy();
+ if (retryPolicy == ContainerRetryPolicy.ALWAYS_RETRY
+ || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE
+ && containerRetryContext.getErrorCodes() != null
+ && containerRetryContext.getErrorCodes().contains(errorCode))) {
+ return remainingRetryAttempts > 0
+ || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER;
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean isRelaunch() {
+ return isRelaunch;
+ }
+
+ /**
* Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST
*/
static class KilledExternallyTransition extends ExitedWithFailureTransition {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 6371b21..0b48abd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -177,6 +178,10 @@ public Integer call() {
Path containerLogDir;
try {
+ // Select the working directory for the container
+ Path containerWorkDir = getContainerWorkDir(container);
+ cleanupContainerFilesForRelaunch(container, containerWorkDir);
+
localResources = container.getLocalizedResources();
if (localResources == null) {
throw RPCUtil.getRemoteException(
@@ -191,8 +196,7 @@ public Integer call() {
String appIdStr = app.getAppId().toString();
String relativeContainerLogDir = ContainerLaunch
.getRelativeContainerLogDir(appIdStr, containerIdStr);
- containerLogDir =
- dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
+ containerLogDir = getContainerLogDir(container);
for (String str : command) {
// TODO: Should we instead work via symlinks without this grammar?
newCmds.add(expandEnvironment(str, containerLogDir));
@@ -226,14 +230,6 @@ public Integer call() {
DataOutputStream containerScriptOutStream = null;
DataOutputStream tokensOutStream = null;
- // Select the working directory for the container
- Path containerWorkDir =
- dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE
- + Path.SEPARATOR + user + Path.SEPARATOR
- + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
- + Path.SEPARATOR + containerIdStr,
- LocalDirAllocator.SIZE_UNKNOWN, false);
-
String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
// pid file should be in nm private dir so that it is not
@@ -329,7 +325,9 @@ public Integer call() {
completed.set(true);
exec.deactivateContainer(containerID);
try {
- context.getNMStateStore().storeContainerCompleted(containerID, ret);
+ if (!container.shouldRetry(ret)) {
+ context.getNMStateStore().storeContainerCompleted(containerID, ret);
+ }
} catch (IOException e) {
LOG.error("Unable to set exit code for container " + containerID);
}
@@ -369,6 +367,120 @@ public Integer call() {
}
/**
+ * Get container's working directory.
+ * If container is in relaunch and we find its previous working directory
+ * could be read/write it will be reused as current working directory.
+ * We apply a simple heuristic to find its previous working directory:
+ * if a good work dir with the container tokens file already exists,
+ * it means it is container's previous working directory.
+ */
+ private Path getContainerWorkDir(Container container1) throws IOException {
+ Path containerWorkDir = null;
+ String relativeContainerWorkDir = ContainerLocalizer.USERCACHE
+ + Path.SEPARATOR + container1.getUser() + Path.SEPARATOR
+ + ContainerLocalizer.APPCACHE + Path.SEPARATOR
+ + app.getAppId().toString() + Path.SEPARATOR
+ + ConverterUtils.toString(container1.getContainerId());
+
+ if (container1.isRelaunch()) {
+ try {
+ // container's tokens file's parent is container's previous working dir
+ containerWorkDir = dirsHandler.getLocalPathForRead(
+ relativeContainerWorkDir + Path.SEPARATOR
+ + ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE).getParent();
+ // check whether containerWorDir could be read/write, it will throw
+ // IOException if not.
+ DiskChecker.checkDir(
+ new File(containerWorkDir.toUri().getPath()));
+ } catch (IOException e) {
+ containerWorkDir = null;
+ }
+ }
+ if (containerWorkDir == null) {
+ containerWorkDir =
+ dirsHandler.getLocalPathForWrite(relativeContainerWorkDir,
+ LocalDirAllocator.SIZE_UNKNOWN, false);
+ }
+
+ return containerWorkDir;
+ }
+
+ /**
+ * Get container's log directory.
+ * If container is in relaunch and we find its previous log directory
+ * could be read/write it will be reused as current log directory.
+ * We apply a simple heuristic to find its previous working directory:
+ * if a good work dir with file 'stdout' already exists, it means it
+ * is container's previous working directory. We assume there is 'stdout'
+ * in most containers's log directory.
+ */
+ private Path getContainerLogDir(Container container1) throws IOException {
+ Path containerLogDir = null;
+ String relativeContainerLogDir = ContainerLaunch
+ .getRelativeContainerLogDir(app.getAppId().toString(),
+ ConverterUtils.toString(container1.getContainerId()));
+
+ if (container1.isRelaunch()) {
+ try {
+ containerLogDir = dirsHandler.getLocalPathForRead(
+ relativeContainerLogDir + Path.SEPARATOR
+ + "stdout").getParent();
+ // check whether containerWorDir could be read/write, it will throw
+ // IOException if not.
+ DiskChecker.checkDir(
+ new File(containerLogDir.toUri().getPath()));
+ } catch (IOException e) {
+ containerLogDir = null;
+ }
+ }
+ if (containerLogDir == null) {
+ containerLogDir =
+ dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
+ }
+
+ return containerLogDir;
+ }
+
+ /**
+ * Clean up container's previous files for container relaunch.
+ */
+ private void cleanupContainerFilesForRelaunch(Container container1,
+ Path containerWorkDir) {
+ if (!container1.isRelaunch()) {
+ return;
+ }
+
+ try {
+ FileContext lfs = FileContext.getLocalFSFileContext();
+ // delete ContainerScriptPath
+ lfs.delete(new Path(containerWorkDir, CONTAINER_SCRIPT), false);
+ // delete TokensPath
+ lfs.delete(
+ new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE), false);
+ } catch (IOException e) {
+ LOG.error("Failed to delete container script file and token file", e);
+ }
+
+ String appIdStr = app.getAppId().toString();
+ String containerIdStr =
+ ConverterUtils.toString(container1.getContainerId());
+ // delete pid file and pid exit code file
+ String pidPath = getPidFileSubpath(appIdStr, containerIdStr);
+ deleteLocalPath(pidPath);
+ deleteLocalPath(getExitCodeFile(pidPath));
+ }
+
+ private void deleteLocalPath(String relativePath) {
+ try {
+ FileContext lfs = FileContext.getLocalFSFileContext();
+ Path path = dirsHandler.getLocalPathForRead(relativePath);
+ lfs.delete(path, false);
+ } catch (IOException e) {
+ LOG.error("Failed to delete " + relativePath, e);
+ }
+ }
+
+ /**
* Tries to tail and fetch TAIL_SIZE_IN_BYTES of data from the error log.
* ErrorLog filename is not fixed and depends upon app, hence file name
* pattern is used.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 89c71bb..db84485 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -106,6 +106,8 @@
"/resourceChanged";
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
+ private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX =
+ "/remainingRetryAttempts";
private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
@@ -238,6 +240,8 @@ private RecoveredContainerState loadContainerState(ContainerId containerId,
} else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) {
rcs.capability = new ResourcePBImpl(
ResourceProto.parseFrom(entry.getValue()));
+ } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) {
+ rcs.setRemainingRetryAttempts(Integer.parseInt(asString(entry.getValue())));
} else {
throw new IOException("Unexpected container state key: " + key);
}
@@ -321,6 +325,18 @@ public void storeContainerCompleted(ContainerId containerId,
}
@Override
+ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
+ int remainingRetryAttempts) throws IOException {
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_REMAIN_RETRIES_KEY_SUFFIX;
+ try {
+ db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts)));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
public void removeContainer(ContainerId containerId)
throws IOException {
String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index d5dce9b..68f4e87 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -104,6 +104,11 @@ public void storeContainerCompleted(ContainerId containerId, int exitCode)
}
@Override
+ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
+ int remainingRetryAttempts) throws IOException {
+ }
+
+ @Override
public void removeContainer(ContainerId containerId) throws IOException {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index e8ccf54..b733ffe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@@ -76,6 +77,7 @@ public NMStateStoreService(String name) {
String diagnostics = "";
StartContainerRequest startRequest;
Resource capability;
+ private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID;
public RecoveredContainerStatus getStatus() {
return status;
@@ -100,6 +102,14 @@ public StartContainerRequest getStartRequest() {
public Resource getCapability() {
return capability;
}
+
+ public int getRemainingRetryAttempts() {
+ return remainingRetryAttempts;
+ }
+
+ public void setRemainingRetryAttempts(int retryAttempts) {
+ this.remainingRetryAttempts = retryAttempts;
+ }
}
public static class LocalResourceTrackerState {
@@ -325,6 +335,15 @@ public abstract void storeContainerDiagnostics(ContainerId containerId,
StringBuilder diagnostics) throws IOException;
/**
+ * Record remaining retry attempts for a container.
+ * @param containerId the container ID
+ * @param remainingRetryAttempts the remain retry times when container fails to run
+ * @throws IOException
+ */
+ public abstract void storeContainerRemainingRetryAttempts(
+ ContainerId containerId, int remainingRetryAttempts) throws IOException;
+
+ /**
* Remove records corresponding to a container
* @param containerId the container ID
* @throws IOException
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index 2ab9842..7a26df5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -54,6 +54,8 @@
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -645,6 +647,66 @@ public void testLaunchAfterKillRequest() throws Exception {
}
}
+ @Test
+ public void testContainerRetry() throws Exception{
+ ContainerRetryContext containerRetryContext1 = ContainerRetryContext.newInstance(
+ ContainerRetryPolicy.NEVER_RETRY, null, 3, 0);
+ testContainerRetry(containerRetryContext1, 2, 0);
+
+ ContainerRetryContext containerRetryContext2 = ContainerRetryContext.newInstance(
+ ContainerRetryPolicy.ALWAYS_RETRY, null, 3, 0);
+ testContainerRetry(containerRetryContext2, 2, 3);
+
+ ContainerRetryContext containerRetryContext3 = ContainerRetryContext.newInstance(
+ ContainerRetryPolicy.ALWAYS_RETRY, null, 3, 0);
+ // If exit code is 0, it will not retry
+ testContainerRetry(containerRetryContext3, 0, 0);
+
+ ContainerRetryContext containerRetryContext4 = ContainerRetryContext.newInstance(
+ ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, null, 3, 0);
+ testContainerRetry(containerRetryContext4, 2, 0);
+
+ HashSet errorCodes = new HashSet<>();
+ errorCodes.add(2);
+ errorCodes.add(6);
+ ContainerRetryContext containerRetryContext5 = ContainerRetryContext.newInstance(
+ ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, errorCodes, 3, 0);
+ testContainerRetry(containerRetryContext5, 2, 3);
+
+ HashSet errorCodes2 = new HashSet<>();
+ errorCodes.add(143);
+ ContainerRetryContext containerRetryContext6 = ContainerRetryContext.newInstance(
+ ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, errorCodes2, 3, 0);
+ // If exit code is 143(SIGTERM), it will not retry even it is in errorCodes.
+ testContainerRetry(containerRetryContext5, 143, 0);
+ }
+
+ private void testContainerRetry(ContainerRetryContext containerRetryContext, int exitCode,
+ int expectedRetries) throws Exception{
+ WrappedContainer wc = null;
+ try {
+ int retryTimes = 0;
+ wc = new WrappedContainer(24, 314159265358979L, 4344, "yak",
+ containerRetryContext);
+ wc.initContainer();
+ wc.localizeResources();
+ wc.launchContainer();
+ while (true) {
+ wc.containerFailed(exitCode);
+ if (wc.c.getContainerState() == ContainerState.RUNNING) {
+ retryTimes ++;
+ } else {
+ break;
+ }
+ }
+ Assert.assertEquals(expectedRetries, retryTimes);
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
private void verifyCleanupCall(WrappedContainer wc) throws Exception {
ResourcesReleasedMatcher matchesReq =
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
@@ -782,12 +844,23 @@ public boolean matches(Object o) {
WrappedContainer(int appId, long timestamp, int id, String user)
throws IOException {
- this(appId, timestamp, id, user, true, false);
+ this(appId, timestamp, id, user, null);
+ }
+
+ WrappedContainer(int appId, long timestamp, int id, String user,
+ ContainerRetryContext containerRetryContext) throws IOException {
+ this(appId, timestamp, id, user, true, false, containerRetryContext);
}
- @SuppressWarnings("rawtypes")
WrappedContainer(int appId, long timestamp, int id, String user,
boolean withLocalRes, boolean withServiceData) throws IOException {
+ this(appId, timestamp, id, user, withLocalRes, withServiceData, null);
+ }
+
+ @SuppressWarnings("rawtypes")
+ WrappedContainer(int appId, long timestamp, int id, String user,
+ boolean withLocalRes, boolean withServiceData,
+ ContainerRetryContext containerRetryContext) throws IOException {
dispatcher = new DrainDispatcher();
dispatcher.init(new Configuration());
@@ -864,6 +937,8 @@ public boolean matches(Object o) {
}
when(ctxt.getServiceData()).thenReturn(serviceData);
+ when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext);
+
c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(),
ctxt, null, metrics, identifier);
dispatcher.register(ContainerEventType.class,
@@ -984,6 +1059,10 @@ public void containerFailed(int exitCode) {
assert containerStatus.getDiagnostics().contains(diagnosticMsg);
assert containerStatus.getExitStatus() == exitCode;
drainDispatcherEvents();
+ // If container needs retry, relaunch it
+ if (c.getContainerState() == ContainerState.LOCALIZED) {
+ launchContainer();
+ }
}
public void killContainer() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index a1c95ab..81ecf65 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -124,6 +124,7 @@ public synchronized void removeApplication(ApplicationId appId)
rcsCopy.diagnostics = rcs.diagnostics;
rcsCopy.startRequest = rcs.startRequest;
rcsCopy.capability = rcs.capability;
+ rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
result.add(rcsCopy);
}
return result;
@@ -177,6 +178,13 @@ public synchronized void storeContainerCompleted(ContainerId containerId,
}
@Override
+ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
+ int remainingRetryAttempts) throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ rcs.setRemainingRetryAttempts(remainingRetryAttempts);
+ }
+
+ @Override
public synchronized void removeContainer(ContainerId containerId)
throws IOException {
containerStates.remove(containerId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index 08b49e7..fe70f10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -337,6 +337,13 @@ public void testContainerStorage() throws IOException {
assertEquals(containerReq, rcs.getStartRequest());
assertEquals(diags.toString(), rcs.getDiagnostics());
+ // store remainRetries
+ stateStore.storeContainerRemainingRetryAttempts(containerId, 6);
+ restartStateStore();
+ recoveredContainers = stateStore.loadContainersState();
+ assertEquals(1, recoveredContainers.size());
+ assertEquals(6, recoveredContainers.get(0).getRemainingRetryAttempts());
+
// remove the container and verify not recovered
stateStore.removeContainer(containerId);
restartStateStore();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 394a92c..ba52a97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -144,4 +144,14 @@ public ContainerTokenIdentifier getContainerTokenIdentifier() {
public NMContainerStatus getNMContainerStatus() {
return null;
}
+
+ @Override
+ public boolean shouldRetry(int errorCode) {
+ return false;
+ }
+
+ @Override
+ public boolean isRelaunch() {
+ return false;
+ }
}
diff --git a/patch b/patch
new file mode 100644
index 0000000..7eea87c
--- /dev/null
+++ b/patch
@@ -0,0 +1,1382 @@
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
+index 932945b..ed64b54 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java
+@@ -24,6 +24,7 @@
+
+ import org.apache.hadoop.classification.InterfaceAudience.Public;
+ import org.apache.hadoop.classification.InterfaceStability.Stable;
++import org.apache.hadoop.classification.InterfaceStability.Unstable;
+ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+ import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+ import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+@@ -46,6 +47,7 @@
+ * Optional, application-specific binary service data.
+ * Environment variables for the launched process.
+ * Command to launch the container.
++ * Retry strategy when container fails to run.
+ *
+ *
+ * @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest)
+@@ -61,6 +63,18 @@ public static ContainerLaunchContext newInstance(
+ Map environment, List commands,
+ Map serviceData, ByteBuffer tokens,
+ Map acls) {
++ return newInstance(localResources, environment, commands, serviceData,
++ tokens, acls, null);
++ }
++
++ @Public
++ @Unstable
++ public static ContainerLaunchContext newInstance(
++ Map localResources,
++ Map environment, List commands,
++ Map serviceData, ByteBuffer tokens,
++ Map acls,
++ ContainerRetryContext containerRetryContext) {
+ ContainerLaunchContext container =
+ Records.newRecord(ContainerLaunchContext.class);
+ container.setLocalResources(localResources);
+@@ -69,6 +83,7 @@ public static ContainerLaunchContext newInstance(
+ container.setServiceData(serviceData);
+ container.setTokens(tokens);
+ container.setApplicationACLs(acls);
++ container.setContainerRetryContext(containerRetryContext);
+ return container;
+ }
+
+@@ -195,4 +210,22 @@ public static ContainerLaunchContext newInstance(
+ @Public
+ @Stable
+ public abstract void setApplicationACLs(Map acls);
++
++ /**
++ * Get the ContainerRetryContext to relaunch container.
++ * @return ContainerRetryContext to relaunch container.
++ */
++ @Public
++ @Unstable
++ public abstract ContainerRetryContext getContainerRetryContext();
++
++ /**
++ * Set the ContainerRetryContext to relaunch container.
++ * @param containerRetryContext ContainerRetryContext to
++ * relaunch container.
++ */
++ @Public
++ @Unstable
++ public abstract void setContainerRetryContext(
++ ContainerRetryContext containerRetryContext);
+ }
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java
+new file mode 100644
+index 0000000..f257967
+--- /dev/null
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java
+@@ -0,0 +1,84 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.yarn.api.records;
++
++import org.apache.hadoop.classification.InterfaceAudience.Private;
++import org.apache.hadoop.classification.InterfaceAudience.Public;
++import org.apache.hadoop.classification.InterfaceStability.Unstable;
++import org.apache.hadoop.yarn.util.Records;
++
++import java.util.Set;
++
++/**
++ * {@code ContainerRetryContext} indicates how container retry after it fails
++ * to run.
++ *
++ * It provides details such as:
++ *
++ * -
++ * {@link ContainerRetryPolicy} :
++ * - NEVER_RETRY(DEFAULT value): no matter what error code is when container
++ * fails to run, just do not retry.
++ * - ALWAYS_RETRY: no matter what error code is, when container fails to
++ * run, just retry.
++ * - RETRY_ON_SPECIFIC_ERROR_CODE: when container fails to run, do retry if
++ * the error code is one of errorCodes, otherwise do not retry.
++ *
++ * Note: if error code is 137(SIGKILL) or 143(SIGTERM), it will not retry
++ * because it is usually killed on purpose.
++ *
++ * -
++ * maxRetries specifies how many times to retry if need to retry.
++ * If the value is -1, it means retry forever.
++ *
++ * - retryInterval specifies delaying some time before relaunch
++ * container, the unit is millisecond.
++ *
++ */
++@Public
++@Unstable
++public abstract class ContainerRetryContext {
++ public static final int RETRY_FOREVER = -1;
++ public static final int RETRY_INVALID = -1000;
++ public static final ContainerRetryContext NEVER_RETRY_CONTEXT =
++ newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 0, 0);
++
++ @Private
++ @Unstable
++ public static ContainerRetryContext newInstance(
++ ContainerRetryPolicy retryPolicy, Set errorCodes,
++ int maxRetries, int retryInterval) {
++ ContainerRetryContext containerRetryContext =
++ Records.newRecord(ContainerRetryContext.class);
++ containerRetryContext.setRetryPolicy(retryPolicy);
++ containerRetryContext.setErrorCodes(errorCodes);
++ containerRetryContext.setMaxRetries(maxRetries);
++ containerRetryContext.setRetryInterval(retryInterval);
++ return containerRetryContext;
++ }
++
++ public abstract ContainerRetryPolicy getRetryPolicy();
++ public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy);
++ public abstract Set getErrorCodes();
++ public abstract void setErrorCodes(Set errorCodes);
++ public abstract int getMaxRetries();
++ public abstract void setMaxRetries(int maxRetries);
++ public abstract int getRetryInterval();
++ public abstract void setRetryInterval(int retryInterval);
++}
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java
+new file mode 100644
+index 0000000..a3816c1
+--- /dev/null
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java
+@@ -0,0 +1,35 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.yarn.api.records;
++
++import org.apache.hadoop.classification.InterfaceAudience.Public;
++import org.apache.hadoop.classification.InterfaceStability.Unstable;
++
++/**
++ * Retry policy for relaunching a Container.
++ */
++@Public
++@Unstable
++public enum ContainerRetryPolicy {
++ /** Never retry. */
++ NEVER_RETRY,
++ /** Always retry. */
++ ALWAYS_RETRY,
++ /** Retry for specific error codes. */
++ RETRY_ON_SPECIFIC_ERROR_CODE
++}
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+index d122f5a..ace442d 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+@@ -487,6 +487,7 @@ message ContainerLaunchContextProto {
+ repeated StringStringMapProto environment = 4;
+ repeated string command = 5;
+ repeated ApplicationACLMapProto application_ACLs = 6;
++ optional ContainerRetryContextProto container_retry_context = 7;
+ }
+
+ message ContainerStatusProto {
+@@ -510,6 +511,19 @@ message ContainerResourceChangeRequestProto {
+ optional ResourceProto capability = 2;
+ }
+
++message ContainerRetryContextProto {
++ optional ContainerRetryPolicyProto retry_policy = 1 [default = NEVER_RETRY];
++ repeated int32 error_codes = 2;
++ optional int32 max_retries = 3 [default = 0];
++ optional int32 retry_interval = 4 [default = 0];
++}
++
++enum ContainerRetryPolicyProto {
++ NEVER_RETRY = 0;
++ ALWAYS_RETRY = 1;
++ RETRY_ON_SPECIFIC_ERROR_CODE = 2;
++}
++
+ ////////////////////////////////////////////////////////////////////////
+ ////// From common//////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+index f410c43..f61059c 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+@@ -32,6 +32,7 @@
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
++import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+@@ -76,6 +77,8 @@
+ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+ import org.apache.hadoop.yarn.api.records.ContainerId;
+ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
++import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
++import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
+ import org.apache.hadoop.yarn.api.records.ContainerState;
+ import org.apache.hadoop.yarn.api.records.ContainerStatus;
+ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+@@ -253,6 +256,13 @@
+ // File length needed for local resource
+ private long shellScriptPathLen = 0;
+
++ // Container retry options
++ private ContainerRetryPolicy containerRetryPolicy =
++ ContainerRetryPolicy.NEVER_RETRY;
++ private Set errorCodes = null;
++ private int maxRetries = 0;
++ private int retryInterval = 0;
++
+ // Timeline domain ID
+ private String domainId = null;
+
+@@ -371,6 +381,17 @@ public boolean init(String[] args) throws ParseException, IOException {
+ opts.addOption("num_containers", true,
+ "No. of containers on which the shell command needs to be executed");
+ opts.addOption("priority", true, "Application Priority. Default 0");
++ opts.addOption("retry_policy", true,
++ "Retry policy when container fails to run, "
++ + "0: NEVER_RETRY, 1: ALWAYS_RETRY, "
++ + "2: RETRY_ON_SPECIFIC_ERROR_CODE");
++ opts.addOption("error_codes", true,
++ "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes "
++ + "is specified with this option, e.g. --error_codes 1,2,3");
++ opts.addOption("max_retries", true, "If container could retry, it specifies"
++ + " max retires");
++ opts.addOption("retry_interval", true, "interval between each retry, unit "
++ + "is milliseconds");
+ opts.addOption("debug", false, "Dump out debug information");
+
+ opts.addOption("help", false, "Print usage");
+@@ -508,6 +529,19 @@ public boolean init(String[] args) throws ParseException, IOException {
+ }
+ requestPriority = Integer.parseInt(cliParser
+ .getOptionValue("priority", "0"));
++
++ containerRetryPolicy = ContainerRetryPolicy.values()[
++ Integer.parseInt(cliParser.getOptionValue("retry_policy", "0"))];
++ if (cliParser.hasOption("error_codes")) {
++ errorCodes = new HashSet<>();
++ for (String errorCode :
++ cliParser.getOptionValue("error_codes").split(",")) {
++ errorCodes.add(Integer.parseInt(errorCode));
++ }
++ }
++ maxRetries = Integer.parseInt(cliParser.getOptionValue("max_retries", "0"));
++ retryInterval = Integer.parseInt(cliParser.getOptionValue(
++ "retry_interval", "0"));
+ return true;
+ }
+
+@@ -1062,9 +1096,11 @@ public void run() {
+ // "hadoop dfs" command inside the distributed shell.
+ Map myShellEnv = new HashMap(shellEnv);
+ myShellEnv.put(YARN_SHELL_ID, shellId);
++ ContainerRetryContext containerRetryContext = ContainerRetryContext.newInstance(
++ containerRetryPolicy, errorCodes, maxRetries, retryInterval);
+ ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
+ localResources, myShellEnv, commands, null, allTokens.duplicate(),
+- null);
++ null, containerRetryContext);
+ containerListener.addContainer(container.getId(), container);
+ nmClientAsync.startContainerAsync(container, ctx);
+ }
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+index 68d2bde..15df60b 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+@@ -168,6 +168,8 @@
+
+ private long attemptFailuresValidityInterval = -1;
+
++ private Vector containerRetryOptions = new Vector<>(5);
++
+ // Debug flag
+ boolean debugFlag = false;
+
+@@ -287,6 +289,17 @@ public Client(Configuration conf) throws Exception {
+ + " will be allocated, \"\" means containers"
+ + " can be allocated anywhere, if you don't specify the option,"
+ + " default node_label_expression of queue will be used.");
++ opts.addOption("retry_policy", true,
++ "Retry policy when container fails to run, "
++ + "0: NEVER_RETRY, 1: ALWAYS_RETRY, "
++ + "2: RETRY_ON_SPECIFIC_ERROR_CODE");
++ opts.addOption("error_codes", true,
++ "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODE, error codes "
++ + "is specified with this option, e.g. --error_codes 1,2,3");
++ opts.addOption("max_retries", true, "If container could retry, it specifies"
++ + " max retries");
++ opts.addOption("retry_interval", true, "interval between each retry, unit "
++ + "is milliseconds");
+ }
+
+ /**
+@@ -429,6 +442,24 @@ public boolean init(String[] args) throws ParseException {
+ }
+ }
+
++ // Get container retry options
++ if (cliParser.hasOption("retry_policy")) {
++ containerRetryOptions.add("--retry_policy "
++ + cliParser.getOptionValue("retry_policy"));
++ }
++ if (cliParser.hasOption("error_codes")) {
++ containerRetryOptions.add("--error_codes "
++ + cliParser.getOptionValue("error_codes"));
++ }
++ if (cliParser.hasOption("max_retries")) {
++ containerRetryOptions.add("--max_retries "
++ + cliParser.getOptionValue("max_retries"));
++ }
++ if (cliParser.hasOption("retry_interval")) {
++ containerRetryOptions.add("--retry_interval "
++ + cliParser.getOptionValue("retry_interval"));
++ }
++
+ return true;
+ }
+
+@@ -638,6 +669,8 @@ public boolean run() throws IOException, YarnException {
+ vargs.add("--debug");
+ }
+
++ vargs.addAll(containerRetryOptions);
++
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
+
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
+index 12dcfcd..37eea46 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java
+@@ -29,10 +29,12 @@
+ import org.apache.hadoop.classification.InterfaceStability.Unstable;
+ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
++import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+ import org.apache.hadoop.yarn.api.records.LocalResource;
+ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder;
++import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
+@@ -56,7 +58,8 @@
+ private Map environment = null;
+ private List commands = null;
+ private Map applicationACLS = null;
+-
++ private ContainerRetryContext containerRetryContext = null;
++
+ public ContainerLaunchContextPBImpl() {
+ builder = ContainerLaunchContextProto.newBuilder();
+ }
+@@ -120,6 +123,10 @@ private void mergeLocalToBuilder() {
+ if (this.applicationACLS != null) {
+ addApplicationACLs();
+ }
++ if (this.containerRetryContext != null) {
++ builder.setContainerRetryContext(
++ convertToProtoFormat(this.containerRetryContext));
++ }
+ }
+
+ private void mergeLocalToProto() {
+@@ -456,6 +463,27 @@ public void setApplicationACLs(
+ this.applicationACLS.putAll(appACLs);
+ }
+
++ public ContainerRetryContext getContainerRetryContext() {
++ ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
++ if (this.containerRetryContext != null) {
++ return this.containerRetryContext;
++ }
++ if (!p.hasContainerRetryContext()) {
++ return null;
++ }
++ this.containerRetryContext = convertFromProtoFormat(
++ p.getContainerRetryContext());
++ return this.containerRetryContext;
++ }
++
++ public void setContainerRetryContext(ContainerRetryContext retryContext) {
++ maybeInitBuilder();
++ if (retryContext == null) {
++ builder.clearContainerRetryContext();
++ }
++ this.containerRetryContext = retryContext;
++ }
++
+ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
+ return new LocalResourcePBImpl(p);
+ }
+@@ -463,4 +491,14 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
+ private LocalResourceProto convertToProtoFormat(LocalResource t) {
+ return ((LocalResourcePBImpl)t).getProto();
+ }
+-}
++
++ private ContainerRetryContextPBImpl convertFromProtoFormat(
++ ContainerRetryContextProto p) {
++ return new ContainerRetryContextPBImpl(p);
++ }
++
++ private ContainerRetryContextProto convertToProtoFormat(
++ ContainerRetryContext t) {
++ return ((ContainerRetryContextPBImpl)t).getProto();
++ }
++}
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
+new file mode 100644
+index 0000000..a5ef70d
+--- /dev/null
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
+@@ -0,0 +1,177 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.yarn.api.records.impl.pb;
++
++
++import com.google.protobuf.TextFormat;
++import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
++import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
++import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto;
++import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto;
++import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProtoOrBuilder;
++
++import java.util.HashSet;
++import java.util.Set;
++
++/**
++ * Implementation of ContainerRetryContext.
++ */
++public class ContainerRetryContextPBImpl extends ContainerRetryContext {
++ private ContainerRetryContextProto proto =
++ ContainerRetryContextProto.getDefaultInstance();
++ private ContainerRetryContextProto.Builder builder = null;
++ private boolean viaProto = false;
++
++ private Set errorCodes = null;
++
++ public ContainerRetryContextPBImpl() {
++ builder = ContainerRetryContextProto.newBuilder();
++ }
++
++ public ContainerRetryContextPBImpl(ContainerRetryContextProto proto) {
++ this.proto = proto;
++ viaProto = true;
++ }
++
++ public ContainerRetryContextProto getProto() {
++ mergeLocalToProto();
++ proto = viaProto ? proto : builder.build();
++ viaProto = true;
++ return proto;
++ }
++
++ @Override
++ public int hashCode() {
++ return getProto().hashCode();
++ }
++
++ @Override
++ public boolean equals(Object other) {
++ if (other == null) {
++ return false;
++ }
++ if (other.getClass().isAssignableFrom(this.getClass())) {
++ return this.getProto().equals(this.getClass().cast(other).getProto());
++ }
++ return false;
++ }
++
++ @Override
++ public String toString() {
++ return TextFormat.shortDebugString(getProto());
++ }
++
++ private void mergeLocalToBuilder() {
++ if (this.errorCodes != null) {
++ builder.clearErrorCodes();
++ builder.addAllErrorCodes(this.errorCodes);
++ }
++ }
++
++ private void mergeLocalToProto() {
++ if (viaProto) {
++ maybeInitBuilder();
++ }
++ mergeLocalToBuilder();
++ proto = builder.build();
++ viaProto = true;
++ }
++
++ private void maybeInitBuilder() {
++ if (viaProto || builder == null) {
++ builder = ContainerRetryContextProto.newBuilder(proto);
++ }
++ viaProto = false;
++ }
++
++ public ContainerRetryPolicy getRetryPolicy() {
++ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
++ if (!p.hasRetryPolicy()) {
++ return ContainerRetryPolicy.NEVER_RETRY;
++ }
++ return convertFromProtoFormat(p.getRetryPolicy());
++ }
++
++ public void setRetryPolicy(ContainerRetryPolicy containerRetryPolicy) {
++ maybeInitBuilder();
++ if (containerRetryPolicy == null) {
++ builder.clearRetryPolicy();
++ return;
++ }
++ builder.setRetryPolicy(convertToProtoFormat(containerRetryPolicy));
++ }
++
++ private void initErrorCodes() {
++ if (this.errorCodes != null) {
++ return;
++ }
++ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
++ this.errorCodes = new HashSet<>();
++ this.errorCodes.addAll(p.getErrorCodesList());
++ }
++
++ public Set getErrorCodes() {
++ initErrorCodes();
++ return this.errorCodes;
++ }
++
++ public void setErrorCodes(Set errCodes) {
++ maybeInitBuilder();
++ if (errCodes == null || errCodes.isEmpty()) {
++ builder.clearErrorCodes();
++ }
++ this.errorCodes = errCodes;
++ }
++
++ public int getMaxRetries() {
++ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
++ if (!p.hasMaxRetries()) {
++ return 0;
++ }
++ return p.getMaxRetries();
++ }
++
++ public void setMaxRetries(int maxRetries) {
++ maybeInitBuilder();
++ builder.setMaxRetries(maxRetries);
++ }
++
++ public int getRetryInterval() {
++ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
++ if (!p.hasRetryInterval()) {
++ return 0;
++ }
++ return p.getRetryInterval();
++ }
++
++ public void setRetryInterval(int retryInterval) {
++ maybeInitBuilder();
++ builder.setRetryInterval(retryInterval);
++ }
++
++ private ContainerRetryPolicyProto convertToProtoFormat(
++ ContainerRetryPolicy containerRetryPolicy) {
++ return ProtoUtils.convertToProtoFormat(containerRetryPolicy);
++ }
++
++ private ContainerRetryPolicy convertFromProtoFormat(
++ ContainerRetryPolicyProto containerRetryPolicyProto) {
++ return ProtoUtils.convertFromProtoFormat(containerRetryPolicyProto);
++ }
++}
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+index 29ed0f3..d3d5948 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+@@ -26,6 +26,7 @@
+ import org.apache.hadoop.yarn.api.records.AMCommand;
+ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
++import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
+ import org.apache.hadoop.yarn.api.records.ContainerState;
+ import org.apache.hadoop.yarn.api.records.ExecutionType;
+ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+@@ -54,6 +55,7 @@
+ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
++import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
+ import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+@@ -294,4 +296,17 @@ public static ExecutionTypeProto convertToProtoFormat(ExecutionType e) {
+ public static ExecutionType convertFromProtoFormat(ExecutionTypeProto e) {
+ return ExecutionType.valueOf(e.name());
+ }
++
++ /*
++ * ContainerRetryPolicy
++ */
++ public static ContainerRetryPolicyProto convertToProtoFormat(
++ ContainerRetryPolicy e) {
++ return ContainerRetryPolicyProto.valueOf(e.name());
++ }
++
++ public static ContainerRetryPolicy convertFromProtoFormat(
++ ContainerRetryPolicyProto e) {
++ return ContainerRetryPolicy.valueOf(e.name());
++ }
+ }
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+index b7f5ff7..96df10b 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+@@ -118,6 +118,7 @@
+ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+ import org.apache.hadoop.yarn.api.records.ContainerReport;
+ import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
++import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+ import org.apache.hadoop.yarn.api.records.ContainerStatus;
+ import org.apache.hadoop.yarn.api.records.LocalResource;
+ import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+@@ -159,6 +160,7 @@
+ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl;
+ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl;
++import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryContextPBImpl;
+ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
+ import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
+ import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
+@@ -192,6 +194,7 @@
+ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto;
++import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+ import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+@@ -345,7 +348,7 @@ private static Object genTypeValue(Type type) {
+ return rand.nextBoolean();
+ } else if (type.equals(byte.class)) {
+ return bytes[rand.nextInt(4)];
+- } else if (type.equals(int.class)) {
++ } else if (type.equals(int.class) || type.equals(Integer.class)) {
+ return rand.nextInt(1000000);
+ } else if (type.equals(long.class)) {
+ return Long.valueOf(rand.nextInt(1000000));
+@@ -469,6 +472,7 @@ public static void setup() throws Exception {
+ generateByNewInstance(ApplicationResourceUsageReport.class);
+ generateByNewInstance(ApplicationReport.class);
+ generateByNewInstance(Container.class);
++ generateByNewInstance(ContainerRetryContext.class);
+ generateByNewInstance(ContainerLaunchContext.class);
+ generateByNewInstance(ApplicationSubmissionContext.class);
+ generateByNewInstance(ContainerReport.class);
+@@ -957,6 +961,12 @@ public void testContainerIdPBImpl() throws Exception {
+ }
+
+ @Test
++ public void testContainerRetryPBImpl() throws Exception {
++ validatePBImplRecord(ContainerRetryContextPBImpl.class,
++ ContainerRetryContextProto.class);
++ }
++
++ @Test
+ public void testContainerLaunchContextPBImpl() throws Exception {
+ validatePBImplRecord(ContainerLaunchContextPBImpl.class,
+ ContainerLaunchContextProto.class);
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+index 8c74bf5..d08ee67 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+@@ -308,6 +308,7 @@ public void writeLaunchEnv(OutputStream out,
+ }
+
+ public enum ExitCode {
++ SUCCESS(0),
+ FORCE_KILLED(137),
+ TERMINATED(143),
+ LOST(154);
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
+index 5cc4e19..976df7c 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
+@@ -551,6 +551,10 @@ public Path getLocalPathForWrite(String pathStr, long size,
+ checkWrite);
+ }
+
++ public Path getLocalPathForRead(String pathStr) throws IOException {
++ return getPathToRead(pathStr, getLocalDirsForRead());
++ }
++
+ public Path getLogPathForWrite(String pathStr, boolean checkWrite)
+ throws IOException {
+ return logDirsAllocator.getLocalPathForWrite(pathStr,
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+index f44de59..fb37a06 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+@@ -349,7 +349,8 @@ private void recoverContainer(RecoveredContainerState rcs)
+ Container container = new ContainerImpl(getConfig(), dispatcher,
+ context.getNMStateStore(), req.getContainerLaunchContext(),
+ credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
+- rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability());
++ rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(),
++ rcs.getRemainingRetryAttempts());
+ context.getContainers().put(containerId, container);
+ dispatcher.getEventHandler().handle(
+ new ApplicationContainerInitEvent(container));
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+index 1d2ec56..310d8df 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+@@ -55,6 +55,10 @@
+
+ NMContainerStatus getNMContainerStatus();
+
++ boolean shouldRetry(int errorCode);
++
++ boolean isRelaunch();
++
+ String toString();
+
+ }
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+index e16ea93..60da9f3 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+@@ -41,6 +41,8 @@
+ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+ import org.apache.hadoop.yarn.api.records.ContainerId;
+ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
++import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
++import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
+ import org.apache.hadoop.yarn.api.records.ContainerStatus;
+ import org.apache.hadoop.yarn.api.records.LocalResource;
+ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+@@ -50,6 +52,7 @@
+ import org.apache.hadoop.yarn.event.EventHandler;
+ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
++import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
+ import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
+ import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
+@@ -100,6 +103,10 @@
+ private long containerLocalizationStartTime;
+ private long containerLaunchStartTime;
+ private static Clock clock = new SystemClock();
++ private final ContainerRetryContext containerRetryContext;
++ // remaining retries to relaunch container if needed
++ private int remainingRetryAttempts;
++ private boolean isRelaunch = false;
+
+ /** The NM-wide configuration - not specific to this container */
+ private final Configuration daemonConf;
+@@ -134,6 +141,13 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
+ this.dispatcher = dispatcher;
+ this.stateStore = stateStore;
+ this.launchContext = launchContext;
++ if (launchContext != null
++ && launchContext.getContainerRetryContext() != null) {
++ this.containerRetryContext = launchContext.getContainerRetryContext();
++ } else {
++ this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT;
++ }
++ this.remainingRetryAttempts = containerRetryContext.getMaxRetries();
+ this.containerTokenIdentifier = containerTokenIdentifier;
+ this.containerId = containerTokenIdentifier.getContainerID();
+ this.resource = containerTokenIdentifier.getResource();
+@@ -154,7 +168,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
+ Credentials creds, NodeManagerMetrics metrics,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ RecoveredContainerStatus recoveredStatus, int exitCode,
+- String diagnostics, boolean wasKilled, Resource recoveredCapability) {
++ String diagnostics, boolean wasKilled, Resource recoveredCapability,
++ int remainingRetryAttempts) {
+ this(conf, dispatcher, stateStore, launchContext, creds, metrics,
+ containerTokenIdentifier);
+ this.recoveredStatus = recoveredStatus;
+@@ -167,6 +182,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
+ this.resource = Resource.newInstance(recoveredCapability.getMemory(),
+ recoveredCapability.getVirtualCores());
+ }
++ if (remainingRetryAttempts != ContainerRetryContext.RETRY_INVALID) {
++ this.remainingRetryAttempts = remainingRetryAttempts;
++ }
+ }
+
+ private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
+@@ -246,9 +264,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ new ExitedWithSuccessTransition(true))
+ .addTransition(ContainerState.RUNNING,
+- ContainerState.EXITED_WITH_FAILURE,
++ EnumSet.of(ContainerState.EXITED_WITH_FAILURE,
++ ContainerState.LOCALIZED),
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+- new ExitedWithFailureTransition(true))
++ new RetryFailureTransition())
+ .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+@@ -854,6 +873,86 @@ public void transition(ContainerImpl container, ContainerEvent event) {
+ }
+
+ /**
++ * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon
++ * CONTAINER_EXITED_WITH_FAILURE state.
++ **/
++ @SuppressWarnings("unchecked") // dispatcher not typed
++ static class RetryFailureTransition implements
++ MultipleArcTransition {
++
++ @Override
++ public ContainerState transition(final ContainerImpl container,
++ ContainerEvent event) {
++ ContainerExitEvent exitEvent = (ContainerExitEvent) event;
++ container.exitCode = exitEvent.getExitCode();
++ if (exitEvent.getDiagnosticInfo() != null) {
++ container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
++ }
++
++ if (container.shouldRetry(container.exitCode)) {
++ if (container.remainingRetryAttempts > 0) {
++ container.remainingRetryAttempts--;
++ try {
++ container.stateStore.storeContainerRemainingRetryAttempts(
++ container.getContainerId(), container.remainingRetryAttempts);
++ } catch (IOException e) {
++ LOG.warn("Unable to update remainingRetryAttempts in state store for "
++ + container.getContainerId(), e);
++ }
++ }
++ LOG.info("Relaunch Container " + container.getContainerId()
++ + ". Remain retry attempts : " + container.remainingRetryAttempts
++ + ". Retry interval is "
++ + container.containerRetryContext.getRetryInterval() + "ms");
++ container.isRelaunch = true;
++ container.wasLaunched = false;
++ container.metrics.endRunningContainer();
++ // wait for some time, then relaunch
++ new Thread() {
++ @Override
++ public void run() {
++ try {
++ Thread.sleep(container.containerRetryContext.getRetryInterval());
++ container.sendLaunchEvent();
++ } catch (InterruptedException e) {
++ return;
++ }
++ }
++ }.start();
++ return ContainerState.LOCALIZED;
++ } else {
++ new ExitedWithFailureTransition(true).transition(container, event);
++ return ContainerState.EXITED_WITH_FAILURE;
++ }
++ }
++ }
++
++ @Override
++ public boolean shouldRetry(int errorCode) {
++ if (errorCode == ExitCode.SUCCESS.getExitCode()
++ || errorCode == ExitCode.FORCE_KILLED.getExitCode()
++ || errorCode == ExitCode.TERMINATED.getExitCode()) {
++ return false;
++ }
++
++ ContainerRetryPolicy retryPolicy = containerRetryContext.getRetryPolicy();
++ if (retryPolicy == ContainerRetryPolicy.ALWAYS_RETRY
++ || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE
++ && containerRetryContext.getErrorCodes() != null
++ && containerRetryContext.getErrorCodes().contains(errorCode))) {
++ return remainingRetryAttempts > 0
++ || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER;
++ }
++
++ return false;
++ }
++
++ @Override
++ public boolean isRelaunch() {
++ return isRelaunch;
++ }
++
++ /**
+ * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST
+ */
+ static class KilledExternallyTransition extends ExitedWithFailureTransition {
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+index 6371b21..0b48abd 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+@@ -48,6 +48,7 @@
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.security.Credentials;
++import org.apache.hadoop.util.DiskChecker;
+ import org.apache.hadoop.util.Shell;
+ import org.apache.hadoop.util.StringUtils;
+ import org.apache.hadoop.yarn.api.ApplicationConstants;
+@@ -177,6 +178,10 @@ public Integer call() {
+
+ Path containerLogDir;
+ try {
++ // Select the working directory for the container
++ Path containerWorkDir = getContainerWorkDir(container);
++ cleanupContainerFilesForRelaunch(container, containerWorkDir);
++
+ localResources = container.getLocalizedResources();
+ if (localResources == null) {
+ throw RPCUtil.getRemoteException(
+@@ -191,8 +196,7 @@ public Integer call() {
+ String appIdStr = app.getAppId().toString();
+ String relativeContainerLogDir = ContainerLaunch
+ .getRelativeContainerLogDir(appIdStr, containerIdStr);
+- containerLogDir =
+- dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
++ containerLogDir = getContainerLogDir(container);
+ for (String str : command) {
+ // TODO: Should we instead work via symlinks without this grammar?
+ newCmds.add(expandEnvironment(str, containerLogDir));
+@@ -226,14 +230,6 @@ public Integer call() {
+ DataOutputStream containerScriptOutStream = null;
+ DataOutputStream tokensOutStream = null;
+
+- // Select the working directory for the container
+- Path containerWorkDir =
+- dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE
+- + Path.SEPARATOR + user + Path.SEPARATOR
+- + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
+- + Path.SEPARATOR + containerIdStr,
+- LocalDirAllocator.SIZE_UNKNOWN, false);
+-
+ String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
+
+ // pid file should be in nm private dir so that it is not
+@@ -329,7 +325,9 @@ public Integer call() {
+ completed.set(true);
+ exec.deactivateContainer(containerID);
+ try {
+- context.getNMStateStore().storeContainerCompleted(containerID, ret);
++ if (!container.shouldRetry(ret)) {
++ context.getNMStateStore().storeContainerCompleted(containerID, ret);
++ }
+ } catch (IOException e) {
+ LOG.error("Unable to set exit code for container " + containerID);
+ }
+@@ -369,6 +367,120 @@ public Integer call() {
+ }
+
+ /**
++ * Get container's working directory.
++ * If container is in relaunch and we find its previous working directory
++ * could be read/write it will be reused as current working directory.
++ * We apply a simple heuristic to find its previous working directory:
++ * if a good work dir with the container tokens file already exists,
++ * it means it is container's previous working directory.
++ */
++ private Path getContainerWorkDir(Container container1) throws IOException {
++ Path containerWorkDir = null;
++ String relativeContainerWorkDir = ContainerLocalizer.USERCACHE
++ + Path.SEPARATOR + container1.getUser() + Path.SEPARATOR
++ + ContainerLocalizer.APPCACHE + Path.SEPARATOR
++ + app.getAppId().toString() + Path.SEPARATOR
++ + ConverterUtils.toString(container1.getContainerId());
++
++ if (container1.isRelaunch()) {
++ try {
++ // container's tokens file's parent is container's previous working dir
++ containerWorkDir = dirsHandler.getLocalPathForRead(
++ relativeContainerWorkDir + Path.SEPARATOR
++ + ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE).getParent();
++ // check whether containerWorDir could be read/write, it will throw
++ // IOException if not.
++ DiskChecker.checkDir(
++ new File(containerWorkDir.toUri().getPath()));
++ } catch (IOException e) {
++ containerWorkDir = null;
++ }
++ }
++ if (containerWorkDir == null) {
++ containerWorkDir =
++ dirsHandler.getLocalPathForWrite(relativeContainerWorkDir,
++ LocalDirAllocator.SIZE_UNKNOWN, false);
++ }
++
++ return containerWorkDir;
++ }
++
++ /**
++ * Get container's log directory.
++ * If container is in relaunch and we find its previous log directory
++ * could be read/write it will be reused as current log directory.
++ * We apply a simple heuristic to find its previous working directory:
++ * if a good work dir with file 'stdout' already exists, it means it
++ * is container's previous working directory. We assume there is 'stdout'
++ * in most containers's log directory.
++ */
++ private Path getContainerLogDir(Container container1) throws IOException {
++ Path containerLogDir = null;
++ String relativeContainerLogDir = ContainerLaunch
++ .getRelativeContainerLogDir(app.getAppId().toString(),
++ ConverterUtils.toString(container1.getContainerId()));
++
++ if (container1.isRelaunch()) {
++ try {
++ containerLogDir = dirsHandler.getLocalPathForRead(
++ relativeContainerLogDir + Path.SEPARATOR
++ + "stdout").getParent();
++ // check whether containerWorDir could be read/write, it will throw
++ // IOException if not.
++ DiskChecker.checkDir(
++ new File(containerLogDir.toUri().getPath()));
++ } catch (IOException e) {
++ containerLogDir = null;
++ }
++ }
++ if (containerLogDir == null) {
++ containerLogDir =
++ dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
++ }
++
++ return containerLogDir;
++ }
++
++ /**
++ * Clean up container's previous files for container relaunch.
++ */
++ private void cleanupContainerFilesForRelaunch(Container container1,
++ Path containerWorkDir) {
++ if (!container1.isRelaunch()) {
++ return;
++ }
++
++ try {
++ FileContext lfs = FileContext.getLocalFSFileContext();
++ // delete ContainerScriptPath
++ lfs.delete(new Path(containerWorkDir, CONTAINER_SCRIPT), false);
++ // delete TokensPath
++ lfs.delete(
++ new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE), false);
++ } catch (IOException e) {
++ LOG.error("Failed to delete container script file and token file", e);
++ }
++
++ String appIdStr = app.getAppId().toString();
++ String containerIdStr =
++ ConverterUtils.toString(container1.getContainerId());
++ // delete pid file and pid exit code file
++ String pidPath = getPidFileSubpath(appIdStr, containerIdStr);
++ deleteLocalPath(pidPath);
++ deleteLocalPath(getExitCodeFile(pidPath));
++ }
++
++ private void deleteLocalPath(String relativePath) {
++ try {
++ FileContext lfs = FileContext.getLocalFSFileContext();
++ Path path = dirsHandler.getLocalPathForRead(relativePath);
++ lfs.delete(path, false);
++ } catch (IOException e) {
++ LOG.error("Failed to delete " + relativePath, e);
++ }
++ }
++
++ /**
+ * Tries to tail and fetch TAIL_SIZE_IN_BYTES of data from the error log.
+ * ErrorLog filename is not fixed and depends upon app, hence file name
+ * pattern is used.
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+index 89c71bb..db84485 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+@@ -106,6 +106,8 @@
+ "/resourceChanged";
+ private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
+ private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
++ private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX =
++ "/remainingRetryAttempts";
+
+ private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
+ private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
+@@ -238,6 +240,8 @@ private RecoveredContainerState loadContainerState(ContainerId containerId,
+ } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) {
+ rcs.capability = new ResourcePBImpl(
+ ResourceProto.parseFrom(entry.getValue()));
++ } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) {
++ rcs.setRemainingRetryAttempts(Integer.parseInt(asString(entry.getValue())));
+ } else {
+ throw new IOException("Unexpected container state key: " + key);
+ }
+@@ -321,6 +325,18 @@ public void storeContainerCompleted(ContainerId containerId,
+ }
+
+ @Override
++ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
++ int remainingRetryAttempts) throws IOException {
++ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
++ + CONTAINER_REMAIN_RETRIES_KEY_SUFFIX;
++ try {
++ db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts)));
++ } catch (DBException e) {
++ throw new IOException(e);
++ }
++ }
++
++ @Override
+ public void removeContainer(ContainerId containerId)
+ throws IOException {
+ String keyPrefix = CONTAINERS_KEY_PREFIX + containerId.toString();
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+index d5dce9b..68f4e87 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+@@ -104,6 +104,11 @@ public void storeContainerCompleted(ContainerId containerId, int exitCode)
+ }
+
+ @Override
++ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
++ int remainingRetryAttempts) throws IOException {
++ }
++
++ @Override
+ public void removeContainer(ContainerId containerId) throws IOException {
+ }
+
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+index e8ccf54..b733ffe 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+@@ -34,6 +34,7 @@
+ import org.apache.hadoop.yarn.api.records.ApplicationId;
+ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+ import org.apache.hadoop.yarn.api.records.ContainerId;
++import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+ import org.apache.hadoop.yarn.api.records.Resource;
+ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
+@@ -76,6 +77,7 @@ public NMStateStoreService(String name) {
+ String diagnostics = "";
+ StartContainerRequest startRequest;
+ Resource capability;
++ private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID;
+
+ public RecoveredContainerStatus getStatus() {
+ return status;
+@@ -100,6 +102,14 @@ public StartContainerRequest getStartRequest() {
+ public Resource getCapability() {
+ return capability;
+ }
++
++ public int getRemainingRetryAttempts() {
++ return remainingRetryAttempts;
++ }
++
++ public void setRemainingRetryAttempts(int retryAttempts) {
++ this.remainingRetryAttempts = retryAttempts;
++ }
+ }
+
+ public static class LocalResourceTrackerState {
+@@ -325,6 +335,15 @@ public abstract void storeContainerDiagnostics(ContainerId containerId,
+ StringBuilder diagnostics) throws IOException;
+
+ /**
++ * Record remaining retry attempts for a container.
++ * @param containerId the container ID
++ * @param remainingRetryAttempts the remain retry times when container fails to run
++ * @throws IOException
++ */
++ public abstract void storeContainerRemainingRetryAttempts(
++ ContainerId containerId, int remainingRetryAttempts) throws IOException;
++
++ /**
+ * Remove records corresponding to a container
+ * @param containerId the container ID
+ * @throws IOException
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+index 2ab9842..7a26df5 100644
+--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+@@ -54,6 +54,8 @@
+ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+ import org.apache.hadoop.yarn.api.records.ContainerId;
+ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
++import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
++import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
+ import org.apache.hadoop.yarn.api.records.ContainerStatus;
+ import org.apache.hadoop.yarn.api.records.LocalResource;
+ import org.apache.hadoop.yarn.api.records.LocalResourceType;
+@@ -645,6 +647,66 @@ public void testLaunchAfterKillRequest() throws Exception {
+ }
+ }
+
++ @Test
++ public void testContainerRetry() throws Exception{
++ ContainerRetryContext containerRetryContext1 = ContainerRetryContext.newInstance(
++ ContainerRetryPolicy.NEVER_RETRY, null, 3, 0);
++ testContainerRetry(containerRetryContext1, 2, 0);
++
++ ContainerRetryContext containerRetryContext2 = ContainerRetryContext.newInstance(
++ ContainerRetryPolicy.ALWAYS_RETRY, null, 3, 0);
++ testContainerRetry(containerRetryContext2, 2, 3);
++
++ ContainerRetryContext containerRetryContext3 = ContainerRetryContext.newInstance(
++ ContainerRetryPolicy.ALWAYS_RETRY, null, 3, 0);
++ // If exit code is 0, it will not retry
++ testContainerRetry(containerRetryContext3, 0, 0);
++
++ ContainerRetryContext containerRetryContext4 = ContainerRetryContext.newInstance(
++ ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, null, 3, 0);
++ testContainerRetry(containerRetryContext4, 2, 0);
++
++ HashSet errorCodes = new HashSet<>();
++ errorCodes.add(2);
++ errorCodes.add(6);
++ ContainerRetryContext containerRetryContext5 = ContainerRetryContext.newInstance(
++ ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, errorCodes, 3, 0);
++ testContainerRetry(containerRetryContext5, 2, 3);
++
++ HashSet errorCodes2 = new HashSet<>();
++ errorCodes.add(143);
++ ContainerRetryContext containerRetryContext6 = ContainerRetryContext.newInstance(
++ ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODE, errorCodes2, 3, 0);
++ // If exit code is 143(SIGTERM), it will not retry even it is in errorCodes.
++ testContainerRetry(containerRetryContext5, 143, 0);
++ }
++
++ private void testContainerRetry(ContainerRetryContext containerRetryContext, int exitCode,
++ int expectedRetries) throws Exception{
++ WrappedContainer wc = null;
++ try {
++ int retryTimes = 0;
++ wc = new WrappedContainer(24, 314159265358979L, 4344, "yak",
++ containerRetryContext);
++ wc.initContainer();
++ wc.localizeResources();
++ wc.launchContainer();
++ while (true) {
++ wc.containerFailed(exitCode);
++ if (wc.c.getContainerState() == ContainerState.RUNNING) {
++ retryTimes ++;
++ } else {
++ break;
++ }
++ }
++ Assert.assertEquals(expectedRetries, retryTimes);
++ } finally {
++ if (wc != null) {
++ wc.finished();
++ }
++ }
++ }
++
+ private void verifyCleanupCall(WrappedContainer wc) throws Exception {
+ ResourcesReleasedMatcher matchesReq =
+ new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
+@@ -782,12 +844,23 @@ public boolean matches(Object o) {
+
+ WrappedContainer(int appId, long timestamp, int id, String user)
+ throws IOException {
+- this(appId, timestamp, id, user, true, false);
++ this(appId, timestamp, id, user, null);
++ }
++
++ WrappedContainer(int appId, long timestamp, int id, String user,
++ ContainerRetryContext containerRetryContext) throws IOException {
++ this(appId, timestamp, id, user, true, false, containerRetryContext);
+ }
+
+- @SuppressWarnings("rawtypes")
+ WrappedContainer(int appId, long timestamp, int id, String user,
+ boolean withLocalRes, boolean withServiceData) throws IOException {
++ this(appId, timestamp, id, user, withLocalRes, withServiceData, null);
++ }
++
++ @SuppressWarnings("rawtypes")
++ WrappedContainer(int appId, long timestamp, int id, String user,
++ boolean withLocalRes, boolean withServiceData,
++ ContainerRetryContext containerRetryContext) throws IOException {
+ dispatcher = new DrainDispatcher();
+ dispatcher.init(new Configuration());
+
+@@ -864,6 +937,8 @@ public boolean matches(Object o) {
+ }
+ when(ctxt.getServiceData()).thenReturn(serviceData);
+
++ when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext);
++
+ c = new ContainerImpl(conf, dispatcher, new NMNullStateStoreService(),
+ ctxt, null, metrics, identifier);
+ dispatcher.register(ContainerEventType.class,
+@@ -984,6 +1059,10 @@ public void containerFailed(int exitCode) {
+ assert containerStatus.getDiagnostics().contains(diagnosticMsg);
+ assert containerStatus.getExitStatus() == exitCode;
+ drainDispatcherEvents();
++ // If container needs retry, relaunch it
++ if (c.getContainerState() == ContainerState.LOCALIZED) {
++ launchContainer();
++ }
+ }
+
+ public void killContainer() {
+diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hado
\ No newline at end of file