diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java
index ef8bd17..ebe5d92 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java
@@ -49,6 +49,13 @@
*
*
retryInterval specifies delaying some time before relaunch
* container, the unit is millisecond.
+ *
+ * failuresValidityInterval: default value is -1.
+ * When failuresValidityInterval in milliseconds is set to {@literal >} 0,
+ * the failure number will not take failures which happen out of the
+ * failuresValidityInterval into failure count. If failure count
+ * reaches to maxRetries, the container will be failed.
+ *
*
*/
@Public
@@ -57,22 +64,31 @@
public static final int RETRY_FOREVER = -1;
public static final int RETRY_INVALID = -1000;
public static final ContainerRetryContext NEVER_RETRY_CONTEXT =
- newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 0, 0);
+ newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 0, 0, -1);
@Private
@Unstable
public static ContainerRetryContext newInstance(
ContainerRetryPolicy retryPolicy, Set errorCodes,
- int maxRetries, int retryInterval) {
+ int maxRetries, int retryInterval, long failuresValidityInterval) {
ContainerRetryContext containerRetryContext =
Records.newRecord(ContainerRetryContext.class);
containerRetryContext.setRetryPolicy(retryPolicy);
containerRetryContext.setErrorCodes(errorCodes);
containerRetryContext.setMaxRetries(maxRetries);
containerRetryContext.setRetryInterval(retryInterval);
+ containerRetryContext.setFailuresValidityInterval(failuresValidityInterval);
return containerRetryContext;
}
+ @Private
+ @Unstable
+ public static ContainerRetryContext newInstance(
+ ContainerRetryPolicy retryPolicy, Set errorCodes,
+ int maxRetries, int retryInterval) {
+ return newInstance(retryPolicy, errorCodes, maxRetries, retryInterval, -1);
+ }
+
public abstract ContainerRetryPolicy getRetryPolicy();
public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy);
public abstract Set getErrorCodes();
@@ -81,4 +97,7 @@ public static ContainerRetryContext newInstance(
public abstract void setMaxRetries(int maxRetries);
public abstract int getRetryInterval();
public abstract void setRetryInterval(int retryInterval);
+ public abstract long getFailuresValidityInterval();
+ public abstract void setFailuresValidityInterval(
+ long failuresValidityInterval);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 60cdfd1..1f99548 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -540,6 +540,7 @@ message ContainerRetryContextProto {
repeated int32 error_codes = 2;
optional int32 max_retries = 3 [default = 0];
optional int32 retry_interval = 4 [default = 0];
+ optional int64 failures_validity_interval = 5 [default = -1];
}
enum ContainerRetryPolicyProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 2973974..d569240 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -265,6 +265,7 @@
private Set containerRetryErrorCodes = null;
private int containerMaxRetries = 0;
private int containrRetryInterval = 0;
+ private long containerFailuresValidityInterval = -1;
// Timeline domain ID
private String domainId = null;
@@ -400,6 +401,9 @@ public boolean init(String[] args) throws ParseException, IOException {
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
+ opts.addOption("container_failures_validity_interval", true,
+ "Failures which are out of the time window will not be added to"
+ + " the number of container retry attempts");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("help", false, "Print usage");
@@ -552,6 +556,8 @@ public boolean init(String[] args) throws ParseException, IOException {
cliParser.getOptionValue("container_max_retries", "0"));
containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
"container_retry_interval", "0"));
+ containerFailuresValidityInterval = Long.parseLong(
+ cliParser.getOptionValue("container_failures_validity_interval", "-1"));
return true;
}
@@ -1109,7 +1115,8 @@ public void run() {
ContainerRetryContext containerRetryContext =
ContainerRetryContext.newInstance(
containerRetryPolicy, containerRetryErrorCodes,
- containerMaxRetries, containrRetryInterval);
+ containerMaxRetries, containrRetryInterval,
+ containerFailuresValidityInterval);
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, myShellEnv, commands, null, allTokens.duplicate(),
null, containerRetryContext);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 9139b08..e7b7ccd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -302,6 +302,9 @@ public Client(Configuration conf) throws Exception {
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
+ opts.addOption("container_failures_validity_interval", true,
+ "Failures which are out of the time window will not be added to"
+ + " the number of container retry attempts");
}
/**
@@ -461,6 +464,10 @@ public boolean init(String[] args) throws ParseException {
containerRetryOptions.add("--container_retry_interval "
+ cliParser.getOptionValue("container_retry_interval"));
}
+ if (cliParser.hasOption("container_failures_validity_interval")) {
+ containerRetryOptions.add("--container_failures_validity_interval "
+ + cliParser.getOptionValue("container_failures_validity_interval"));
+ }
return true;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
index a5ef70d..316378f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
@@ -165,6 +165,19 @@ public void setRetryInterval(int retryInterval) {
builder.setRetryInterval(retryInterval);
}
+ public long getFailuresValidityInterval() {
+ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasFailuresValidityInterval()) {
+ return -1;
+ }
+ return p.getFailuresValidityInterval();
+ }
+
+ public void setFailuresValidityInterval(long failuresValidityInterval) {
+ maybeInitBuilder();
+ builder.setFailuresValidityInterval(failuresValidityInterval);
+ }
+
private ContainerRetryPolicyProto convertToProtoFormat(
ContainerRetryPolicy containerRetryPolicy) {
return ProtoUtils.convertToProtoFormat(containerRetryPolicy);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index b1ddc2e..be5eeae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -25,6 +25,7 @@
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -109,8 +111,18 @@
private ContainerMetrics containerMetrics;
private static Clock clock = SystemClock.getInstance();
private final ContainerRetryContext containerRetryContext;
+ private final long failuresValidityInterval;
+ /*
+ * Record each finish time for retry attempts if failuresValidityInterval
+ * is > 0 and maxRetries is not RETRY_FOREVER. If failuresValidityInterval
+ * is < 0, we just record remain retry attempts and avoid recording each
+ * finish time. If maxRetries is not RETRY_FOREVER, container continues to
+ * retry, there is no need to record each finish time.
+ */
+ private List finishTimeForRetryAttempts = new ArrayList<>();
// remaining retries to relaunch container if needed
private int remainingRetryAttempts;
+ private boolean needRecordFinishTimes;
private String workDir;
private String logDir;
@@ -155,6 +167,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT;
}
this.remainingRetryAttempts = containerRetryContext.getMaxRetries();
+ this.failuresValidityInterval =
+ containerRetryContext.getFailuresValidityInterval();
+ this.needRecordFinishTimes = containerRetryContext.getMaxRetries()
+ != ContainerRetryContext.RETRY_FOREVER && failuresValidityInterval > 0;
this.diagnosticsMaxSize = conf.getInt(
YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE,
YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
@@ -208,6 +224,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
recoveredCapability.getVirtualCores());
}
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
+ this.finishTimeForRetryAttempts = rcs.getFinishTimeForRetryAttempts();
this.workDir = rcs.getWorkDir();
this.logDir = rcs.getLogDir();
}
@@ -970,20 +987,13 @@ public ContainerState transition(final ContainerImpl container,
}
if (container.shouldRetry(container.exitCode)) {
- if (container.remainingRetryAttempts > 0) {
- container.remainingRetryAttempts--;
- try {
- container.stateStore.storeContainerRemainingRetryAttempts(
- container.getContainerId(), container.remainingRetryAttempts);
- } catch (IOException e) {
- LOG.warn(
- "Unable to update remainingRetryAttempts in state store for "
- + container.getContainerId(), e);
- }
- }
+ container.updateAndStoreRetryAttemptsRecords(clock.getTime());
+
LOG.info("Relaunching Container " + container.getContainerId()
+ ". Remaining retry attempts(after relaunch) : "
- + container.remainingRetryAttempts
+ + container.getRemainingRetryAttempts()
+ + ". Failure validity interval is "
+ + container.failuresValidityInterval + " milliseconds"
+ ". Interval between retries is "
+ container.containerRetryContext.getRetryInterval() + "ms");
container.wasLaunched = false;
@@ -1013,6 +1023,45 @@ public void run() {
}
}
+ private void updateAndStoreRetryAttemptsRecords(long finishTime) {
+ if (needRecordFinishTimes) {
+ finishTimeForRetryAttempts.add(finishTime);
+
+ long currentTime = clock.getTime();
+ // remove element which is out of 'failuresValidityInterval'
+ Iterator iterator = finishTimeForRetryAttempts.iterator();
+ while (iterator.hasNext()) {
+ long time = iterator.next();
+ if (currentTime - time >= failuresValidityInterval) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+ try {
+ stateStore.storeContainerFinishTimeForRetryAttempts(containerId,
+ finishTimeForRetryAttempts);
+ } catch (IOException e) {
+ LOG.warn(
+ "Unable to update finishTimeForRetryAttempts in state store for "
+ + containerId, e);
+ }
+ } else {
+ // update and save 'remainingRetryAttempts'
+ if (remainingRetryAttempts > 0) {
+ remainingRetryAttempts--;
+ try {
+ stateStore.storeContainerRemainingRetryAttempts(containerId,
+ remainingRetryAttempts);
+ } catch (IOException e) {
+ LOG.warn(
+ "Unable to update remainingRetryAttempts in state store for "
+ + containerId, e);
+ }
+ }
+ }
+ }
+
@Override
public boolean isRetryContextSet() {
return containerRetryContext.getRetryPolicy()
@@ -1032,13 +1081,29 @@ public boolean shouldRetry(int errorCode) {
|| (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
&& containerRetryContext.getErrorCodes() != null
&& containerRetryContext.getErrorCodes().contains(errorCode))) {
- return remainingRetryAttempts > 0
- || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER;
+ int n = getRemainingRetryAttempts();
+ return n > 0 || n == ContainerRetryContext.RETRY_FOREVER;
}
return false;
}
+ private int getRemainingRetryAttempts() {
+ if (needRecordFinishTimes) {
+ int numFailedRetryAttempts = finishTimeForRetryAttempts.size();
+ long currentTime = clock.getTime();
+ for (long finishTime : finishTimeForRetryAttempts) {
+ if (currentTime - finishTime < failuresValidityInterval) {
+ break;
+ }
+ numFailedRetryAttempts--;
+ }
+ return containerRetryContext.getMaxRetries() - numFailedRetryAttempts;
+ } else {
+ return remainingRetryAttempts;
+ }
+ }
+
/**
* Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST
*/
@@ -1368,4 +1433,9 @@ private static boolean shouldBeUploadedToSharedCache(ContainerImpl container,
LocalResourceRequest resource) {
return container.resourcesUploadPolicies.get(resource);
}
+
+ @VisibleForTesting
+ void setClock(Clock targetClock) {
+ clock = targetClock;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 6e9efe1..7121743 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -112,6 +112,8 @@
private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX =
"/remainingRetryAttempts";
+ private static final String CONTAINER_RETRY_ATTEMPTS_FIINISH_TIME_SUFFIX =
+ "/finishTimeForRetryAttempts";
private static final String CONTAINER_WORK_DIR_KEY_SUFFIX = "/workdir";
private static final String CONTAINER_LOG_DIR_KEY_SUFFIX = "/logdir";
@@ -254,6 +256,16 @@ private RecoveredContainerState loadContainerState(ContainerId containerId,
} else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) {
rcs.setRemainingRetryAttempts(
Integer.parseInt(asString(entry.getValue())));
+ } else if (suffix.equals(CONTAINER_RETRY_ATTEMPTS_FIINISH_TIME_SUFFIX)) {
+ String value = asString(entry.getValue());
+ // parse the string format of List, e.g. [34, 21, 22]
+ String[] finishTimes =
+ value.substring(1, value.length() - 1).split(", ");
+ List finishTimeForRetryAttempts = new ArrayList<>();
+ for (String finishTime : finishTimes) {
+ finishTimeForRetryAttempts.add(Long.parseLong(finishTime));
+ }
+ rcs.setFinishTimeForRetryAttempts(finishTimeForRetryAttempts);
} else if (suffix.equals(CONTAINER_WORK_DIR_KEY_SUFFIX)) {
rcs.setWorkDir(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
@@ -380,6 +392,19 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
}
@Override
+ public void storeContainerFinishTimeForRetryAttempts(
+ ContainerId containerId, List finishTimeForRetryAttempts)
+ throws IOException {
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_RETRY_ATTEMPTS_FIINISH_TIME_SUFFIX;
+ try {
+ db.put(bytes(key), bytes(finishTimeForRetryAttempts.toString()));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 08b80e9..db7f347 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -105,6 +105,12 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
}
@Override
+ public void storeContainerFinishTimeForRetryAttempts(
+ ContainerId containerId, List finishTimeForRetryAttempts)
+ throws IOException {
+ }
+
+ @Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index ccf1e70..dc85a3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -74,6 +74,7 @@ public NMStateStoreService(String name) {
StartContainerRequest startRequest;
Resource capability;
private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID;
+ private List finishTimeForRetryAttempts;
private String workDir;
private String logDir;
@@ -109,6 +110,15 @@ public void setRemainingRetryAttempts(int retryAttempts) {
this.remainingRetryAttempts = retryAttempts;
}
+ public List getFinishTimeForRetryAttempts() {
+ return finishTimeForRetryAttempts;
+ }
+
+ public void setFinishTimeForRetryAttempts(
+ List finishTimeForRetryAttempts) {
+ this.finishTimeForRetryAttempts = finishTimeForRetryAttempts;
+ }
+
public String getWorkDir() {
return workDir;
}
@@ -134,6 +144,8 @@ public String toString() {
.append(", Capability: ").append(getCapability())
.append(", StartRequest: ").append(getStartRequest())
.append(", RemainingRetryAttempts: ").append(remainingRetryAttempts)
+ .append(", FinishTimeForRetryAttempts: ")
+ .append(finishTimeForRetryAttempts)
.append(", WorkDir: ").append(workDir)
.append(", LogDir: ").append(logDir)
.toString();
@@ -365,6 +377,16 @@ public abstract void storeContainerRemainingRetryAttempts(
ContainerId containerId, int remainingRetryAttempts) throws IOException;
/**
+ * Record retry attempts' finish time for a container.
+ * @param containerId
+ * @param finishTimeForRetryAttempts
+ * @throws IOException
+ */
+ public abstract void storeContainerFinishTimeForRetryAttempts(
+ ContainerId containerId, List finishTimeForRetryAttempts)
+ throws IOException;
+
+ /**
* Record working directory for a container.
* @param containerId the container ID
* @param workDir the working directory
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index 118bc42..b26908f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -94,6 +95,8 @@
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
@@ -725,6 +728,38 @@ private void testContainerRetry(ContainerRetryContext containerRetryContext,
}
}
+ @Test
+ public void testContainerRetryFailureValidityInterval() throws Exception {
+ ContainerRetryContext containerRetryContext = ContainerRetryContext
+ .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 1, 0, 10);
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(25, 314159265358980L, 4200, "test",
+ containerRetryContext);
+ ControlledClock clock = new ControlledClock();
+ wc.setClock(clock);
+ wc.initContainer();
+ wc.localizeResources();
+ wc.launchContainer();
+ wc.containerFailed(12);
+ assertEquals(ContainerState.RUNNING, wc.c.getContainerState());
+ clock.setTime(20);
+ wc.containerFailed(12);
+ assertEquals(ContainerState.RUNNING, wc.c.getContainerState());
+ clock.setTime(40);
+ wc.containerFailed(12);
+ assertEquals(ContainerState.RUNNING, wc.c.getContainerState());
+ clock.setTime(45);
+ wc.containerFailed(12);
+ assertEquals(ContainerState.EXITED_WITH_FAILURE,
+ wc.c.getContainerState());
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
private void verifyCleanupCall(WrappedContainer wc) throws Exception {
ResourcesReleasedMatcher matchesReq =
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
@@ -1114,5 +1149,9 @@ public int getLocalResourceCount() {
public String getDiagnostics() {
return c.cloneAndGetContainerStatus().getDiagnostics();
}
+
+ public void setClock(Clock clock) {
+ ((ContainerImpl)c).setClock(clock);
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 4652245..d461893 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -116,6 +116,8 @@ public synchronized void removeApplication(ApplicationId appId)
rcsCopy.startRequest = rcs.startRequest;
rcsCopy.capability = rcs.capability;
rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
+ rcsCopy.setFinishTimeForRetryAttempts(
+ rcs.getFinishTimeForRetryAttempts());
rcsCopy.setWorkDir(rcs.getWorkDir());
rcsCopy.setLogDir(rcs.getLogDir());
result.add(rcsCopy);
@@ -178,6 +180,14 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
}
@Override
+ public void storeContainerFinishTimeForRetryAttempts(
+ ContainerId containerId, List finishTimeForRetryAttempts)
+ throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ rcs.setFinishTimeForRetryAttempts(finishTimeForRetryAttempts);
+ }
+
+ @Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index ccc9254..0eb37aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -32,6 +32,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -338,6 +339,11 @@ public void testContainerStorage() throws IOException {
stateStore.storeContainerRemainingRetryAttempts(containerId, 6);
stateStore.storeContainerWorkDir(containerId, "/test/workdir");
stateStore.storeContainerLogDir(containerId, "/test/logdir");
+ // store finishTimeForRetryAttempts
+ List finishTimeForRetryAttempts = Arrays.asList(1462700529039L,
+ 1462700529050L, 1462700529120L);
+ stateStore.storeContainerFinishTimeForRetryAttempts(containerId,
+ finishTimeForRetryAttempts);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
@@ -345,6 +351,10 @@ public void testContainerStorage() throws IOException {
assertEquals(6, rcs.getRemainingRetryAttempts());
assertEquals("/test/workdir", rcs.getWorkDir());
assertEquals("/test/logdir", rcs.getLogDir());
+ List recoveredFishTimes = rcs.getFinishTimeForRetryAttempts();
+ assertEquals(1462700529039L, (long)recoveredFishTimes.get(0));
+ assertEquals(1462700529050L, (long)recoveredFishTimes.get(1));
+ assertEquals(1462700529120L, (long)recoveredFishTimes.get(2));
// remove the container and verify not recovered
stateStore.removeContainer(containerId);