containerRetryErrorCodes = null;
private int containerMaxRetries = 0;
private int containrRetryInterval = 0;
+ private long containerFailuresValidityInterval = -1;
// Timeline domain ID
private String domainId = null;
@@ -471,6 +472,9 @@ public boolean init(String[] args) throws ParseException, IOException {
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
+ opts.addOption("container_failures_validity_interval", true,
+ "Failures which are out of the time window will not be added to"
+ + " the number of container retry attempts");
opts.addOption("placement_spec", true, "Placement specification");
opts.addOption("debug", false, "Dump out debug information");
@@ -661,7 +665,8 @@ public boolean init(String[] args) throws ParseException, IOException {
cliParser.getOptionValue("container_max_retries", "0"));
containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
"container_retry_interval", "0"));
-
+ containerFailuresValidityInterval = Long.parseLong(
+ cliParser.getOptionValue("container_failures_validity_interval", "-1"));
if (!YarnConfiguration.timelineServiceEnabled(conf)) {
timelineClient = null;
timelineV2Client = null;
@@ -1385,7 +1390,8 @@ public void run() {
ContainerRetryContext containerRetryContext =
ContainerRetryContext.newInstance(
containerRetryPolicy, containerRetryErrorCodes,
- containerMaxRetries, containrRetryInterval);
+ containerMaxRetries, containrRetryInterval,
+ containerFailuresValidityInterval);
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, myShellEnv, commands, null, allTokens.duplicate(),
null, containerRetryContext);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 06f0fd23700..d6a753a44ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -373,6 +373,9 @@ public Client(Configuration conf) throws Exception {
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
+ opts.addOption("container_failures_validity_interval", true,
+ "Failures which are out of the time window will not be added to"
+ + " the number of container retry attempts");
opts.addOption("docker_client_config", true,
"The docker client configuration path. The scheme should be supplied"
+ " (i.e. file:// or hdfs://)."
@@ -579,6 +582,10 @@ public boolean init(String[] args) throws ParseException {
containerRetryOptions.add("--container_retry_interval "
+ cliParser.getOptionValue("container_retry_interval"));
}
+ if (cliParser.hasOption("container_failures_validity_interval")) {
+ containerRetryOptions.add("--container_failures_validity_interval "
+ + cliParser.getOptionValue("container_failures_validity_interval"));
+ }
if (cliParser.hasOption("flow_name")) {
flowName = cliParser.getOptionValue("flow_name");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
index a5ef70de2f6..a01d78320a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
@@ -165,6 +165,21 @@ public void setRetryInterval(int retryInterval) {
builder.setRetryInterval(retryInterval);
}
+ @Override
+ public long getFailuresValidityInterval() {
+ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasFailuresValidityInterval()) {
+ return -1;
+ }
+ return p.getFailuresValidityInterval();
+ }
+
+ @Override
+ public void setFailuresValidityInterval(long failuresValidityInterval) {
+ maybeInitBuilder();
+ builder.setFailuresValidityInterval(failuresValidityInterval);
+ }
+
private ContainerRetryPolicyProto convertToProtoFormat(
ContainerRetryPolicy containerRetryPolicy) {
return ProtoUtils.convertToProtoFormat(containerRetryPolicy);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 751beffeb03..714fb26b750 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -167,9 +167,11 @@ private ReInitializationContext createContextForRollback() {
private long containerLaunchStartTime;
private ContainerMetrics containerMetrics;
private static Clock clock = SystemClock.getInstance();
+
private ContainerRetryContext containerRetryContext;
- // remaining retries to relaunch container if needed
- private int remainingRetryAttempts;
+ private SlidingWindowRetryPolicy.RetryContext windowRetryContext;
+ private SlidingWindowRetryPolicy retryPolicy;
+
private String workDir;
private String logDir;
private String host;
@@ -246,7 +248,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
// Configure the Retry Context
this.containerRetryContext = configureRetryContext(
conf, launchContext, this.containerId);
- this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
+ this.windowRetryContext = new SlidingWindowRetryPolicy
+ .RetryContext(containerRetryContext);
+ this.retryPolicy = new SlidingWindowRetryPolicy(clock);
+
stateMachine = stateMachineFactory.make(this, ContainerState.NEW,
context.getContainerStateTransitionListener());
this.context = context;
@@ -289,7 +294,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
this.recoveredAsKilled = rcs.getKilled();
this.diagnostics.append(rcs.getDiagnostics());
this.version = rcs.getVersion();
- this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
+ this.windowRetryContext.setRemainingRetries(
+ rcs.getRemainingRetryAttempts());
+ this.windowRetryContext.setRestartTimes(rcs.getRestartTimes());
this.workDir = rcs.getWorkDir();
this.logDir = rcs.getLogDir();
this.resourceMappings = rcs.getResourceMappings();
@@ -1591,27 +1598,15 @@ public ContainerState transition(final ContainerImpl container,
if (exitEvent.getDiagnosticInfo() != null) {
if (container.containerRetryContext.getRetryPolicy()
!= ContainerRetryPolicy.NEVER_RETRY) {
- int n = container.containerRetryContext.getMaxRetries()
- - container.remainingRetryAttempts;
- container.addDiagnostics("Diagnostic message from attempt "
- + n + " : ", "\n");
+ container.addDiagnostics("Diagnostic message from attempt : \n");
}
container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n");
}
if (container.shouldRetry(container.exitCode)) {
- if (container.remainingRetryAttempts > 0) {
- container.remainingRetryAttempts--;
- try {
- container.stateStore.storeContainerRemainingRetryAttempts(
- container.getContainerId(), container.remainingRetryAttempts);
- } catch (IOException e) {
- LOG.warn(
- "Unable to update remainingRetryAttempts in state store for "
- + container.getContainerId(), e);
- }
- }
- doRelaunch(container, container.remainingRetryAttempts,
+ container.storeRetryContext();
+ doRelaunch(container,
+ container.windowRetryContext.getRemainingRetries(),
container.containerRetryContext.getRetryInterval());
return ContainerState.RELAUNCHING;
} else if (container.canRollback()) {
@@ -1671,29 +1666,14 @@ public boolean isRetryContextSet() {
@Override
public boolean shouldRetry(int errorCode) {
- return shouldRetry(errorCode, containerRetryContext,
- remainingRetryAttempts);
- }
-
- public static boolean shouldRetry(int errorCode,
- ContainerRetryContext retryContext, int remainingRetryAttempts) {
if (errorCode == ExitCode.SUCCESS.getExitCode()
|| errorCode == ExitCode.FORCE_KILLED.getExitCode()
|| errorCode == ExitCode.TERMINATED.getExitCode()) {
return false;
}
-
- ContainerRetryPolicy retryPolicy = retryContext.getRetryPolicy();
- if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
- || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
- && retryContext.getErrorCodes() != null
- && retryContext.getErrorCodes().contains(errorCode))) {
- return remainingRetryAttempts > 0
- || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER;
- }
-
- return false;
+ return retryPolicy.shouldRetry(windowRetryContext, errorCode);
}
+
/**
* Transition to EXITED_WITH_FAILURE
*/
@@ -1729,9 +1709,9 @@ public void transition(ContainerImpl container,
container.containerRetryContext =
configureRetryContext(container.context.getConf(),
container.launchContext, container.containerId);
- // Reset the retry attempts since its a fresh start
- container.remainingRetryAttempts =
- container.containerRetryContext.getMaxRetries();
+ container.windowRetryContext = new SlidingWindowRetryPolicy
+ .RetryContext(container.containerRetryContext);
+ container.retryPolicy = new SlidingWindowRetryPolicy(clock);
container.resourceSet =
container.reInitContext.mergedResourceSet(container.resourceSet);
@@ -2209,4 +2189,31 @@ private static void removeDockerContainer(ContainerImpl container) {
container.getContainerId().toString());
deletionService.delete(deletionTask);
}
+
+ private void storeRetryContext() {
+ if (windowRetryContext.getRestartTimes() != null) {
+ try {
+ stateStore.storeContainerRestartTimes(containerId,
+ windowRetryContext.getRestartTimes());
+ } catch (IOException e) {
+ LOG.warn(
+ "Unable to update finishTimeForRetryAttempts in state store for "
+ + containerId, e);
+ }
+ }
+ try {
+ stateStore.storeContainerRemainingRetryAttempts(containerId,
+ windowRetryContext.getRemainingRetries());
+ } catch (IOException e) {
+ LOG.warn(
+ "Unable to update remainingRetryAttempts in state store for "
+ + containerId, e);
+ }
+ }
+
+ @VisibleForTesting
+ void setClock(Clock targetClock) {
+ clock = targetClock;
+ retryPolicy.setClock(clock);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/SlidingWindowRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/SlidingWindowRetryPolicy.java
new file mode 100644
index 00000000000..02088794755
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/SlidingWindowRetryPolicy.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
+import org.apache.hadoop.yarn.util.Clock;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Sliding window retry policy for relaunching a
+ * Container in Yarn.
+ */
+@InterfaceStability.Unstable
+public class SlidingWindowRetryPolicy {
+
+ private Clock clock;
+
+ public SlidingWindowRetryPolicy(Clock clock) {
+ this.clock = Preconditions.checkNotNull(clock);
+ }
+
+ public boolean shouldRetry(RetryContext retryContext,
+ int errorCode) {
+ ContainerRetryContext containerRC = retryContext
+ .containerRetryContext;
+ Preconditions.checkNotNull(containerRC, "container retry context null");
+ ContainerRetryPolicy retryPolicy = containerRC.getRetryPolicy();
+ if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
+ || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
+ && containerRC.getErrorCodes() != null
+ && containerRC.getErrorCodes().contains(errorCode))) {
+ if (containerRC.getMaxRetries() == ContainerRetryContext.RETRY_FOREVER) {
+ return true;
+ }
+ int pendingRetries = calculatePendingRetries(retryContext);
+ updateRetryContext(retryContext, pendingRetries);
+ return pendingRetries > 0;
+ }
+ return false;
+ }
+
+ /**
+ * Calculates the pending number of retries.
+ *
+ * When failuresValidityInterval is > 0, it also removes time entries from
+ * restartTimes which are outside the validity interval.
+ *
+ * @return the pending retries.
+ */
+ private int calculatePendingRetries(RetryContext retryContext) {
+ ContainerRetryContext containerRC =
+ retryContext.containerRetryContext;
+ if (containerRC.getFailuresValidityInterval() > 0) {
+ Iterator iterator = retryContext.getRestartTimes().iterator();
+ long currentTime = clock.getTime();
+ while (iterator.hasNext()) {
+ long restartTime = iterator.next();
+ if (currentTime - restartTime
+ > containerRC.getFailuresValidityInterval()) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+ return containerRC.getMaxRetries() -
+ retryContext.getRestartTimes().size();
+ } else {
+ return retryContext.getRemainingRetries();
+ }
+ }
+
+ /**
+ * Updates remaining retries and the restart time when
+ * required in the retryContext.
+ */
+ private void updateRetryContext(RetryContext retryContext,
+ int pendingRetries) {
+ retryContext.setRemainingRetries(pendingRetries - 1);
+ if (retryContext.containerRetryContext.getFailuresValidityInterval()
+ > 0) {
+ retryContext.getRestartTimes().add(clock.getTime());
+ }
+ }
+
+ /**
+ * Sets the clock.
+ * @param clock clock
+ */
+ public void setClock(Clock clock) {
+ this.clock = Preconditions.checkNotNull(clock);
+ }
+
+ /**
+ * Sliding window container retry context.
+ *
+ * Besides {@link ContainerRetryContext}, it also provide details such as:
+ *
+ * -
+ * remainingRetries: specifies the number of pending retries. It is
+ * initially set to
containerRetryContext.maxRetries.
+ *
+ * -
+ * restartTimes: when
+ *
containerRetryContext.failuresValidityInterval is set,
+ * then this records the times when the container is set to restart.
+ *
+ *
+ */
+ static class RetryContext {
+
+ private final ContainerRetryContext containerRetryContext;
+ private List restartTimes = new ArrayList<>();
+ private int remainingRetries;
+
+ RetryContext(ContainerRetryContext containerRetryContext) {
+ this.containerRetryContext = Preconditions
+ .checkNotNull(containerRetryContext);
+ this.remainingRetries = containerRetryContext.getMaxRetries();
+ }
+
+ ContainerRetryContext getContainerRetryContext() {
+ return containerRetryContext;
+ }
+
+ int getRemainingRetries() {
+ return remainingRetries;
+ }
+
+ void setRemainingRetries(int remainingRetries) {
+ this.remainingRetries = remainingRetries;
+ }
+
+ List getRestartTimes() {
+ return restartTimes;
+ }
+
+ void setRestartTimes(List restartTimes) {
+ if (restartTimes != null) {
+ this.restartTimes.clear();
+ this.restartTimes.addAll(restartTimes);
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 0f659d90754..bf4c0ad5f61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -127,6 +127,8 @@
private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX =
"/remainingRetryAttempts";
+ private static final String CONTAINER_RESTART_TIMES_SUFFIX =
+ "/restartTimes";
private static final String CONTAINER_WORK_DIR_KEY_SUFFIX = "/workdir";
private static final String CONTAINER_LOG_DIR_KEY_SUFFIX = "/logdir";
@@ -338,6 +340,16 @@ private RecoveredContainerState loadContainerState(ContainerId containerId,
} else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) {
rcs.setRemainingRetryAttempts(
Integer.parseInt(asString(entry.getValue())));
+ } else if (suffix.equals(CONTAINER_RESTART_TIMES_SUFFIX)) {
+ String value = asString(entry.getValue());
+ // parse the string format of List, e.g. [34, 21, 22]
+ String[] unparsedRestartTimes =
+ value.substring(1, value.length() - 1).split(", ");
+ List restartTimes = new ArrayList<>();
+ for (String restartTime : unparsedRestartTimes) {
+ restartTimes.add(Long.parseLong(restartTime));
+ }
+ rcs.setRestartTimes(restartTimes);
} else if (suffix.equals(CONTAINER_WORK_DIR_KEY_SUFFIX)) {
rcs.setWorkDir(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
@@ -581,6 +593,18 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
}
}
+ @Override
+ public void storeContainerRestartTimes(ContainerId containerId,
+ List restartTimes) throws IOException {
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_RESTART_TIMES_SUFFIX;
+ try {
+ db.put(bytes(key), bytes(restartTimes.toString()));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
@Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 78137bba257..f217f2f8605 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -119,6 +119,11 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
int remainingRetryAttempts) throws IOException {
}
+ @Override
+ public void storeContainerRestartTimes(ContainerId containerId,
+ List restartTimes) throws IOException {
+ }
+
@Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index f9b86bf84ea..0ea0ef3b86c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -98,6 +98,7 @@ public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
StartContainerRequest startRequest;
Resource capability;
private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID;
+ private List restartTimes;
private String workDir;
private String logDir;
int version;
@@ -150,6 +151,15 @@ public void setRemainingRetryAttempts(int retryAttempts) {
this.remainingRetryAttempts = retryAttempts;
}
+ public List getRestartTimes() {
+ return restartTimes;
+ }
+
+ public void setRestartTimes(
+ List restartTimes) {
+ this.restartTimes = restartTimes;
+ }
+
public String getWorkDir() {
return workDir;
}
@@ -177,6 +187,7 @@ public String toString() {
.append(", Capability: ").append(getCapability())
.append(", StartRequest: ").append(getStartRequest())
.append(", RemainingRetryAttempts: ").append(remainingRetryAttempts)
+ .append(", RestartTimes: ").append(restartTimes)
.append(", WorkDir: ").append(workDir)
.append(", LogDir: ").append(logDir)
.toString();
@@ -486,6 +497,16 @@ public abstract void storeContainerDiagnostics(ContainerId containerId,
public abstract void storeContainerRemainingRetryAttempts(
ContainerId containerId, int remainingRetryAttempts) throws IOException;
+ /**
+ * Record restart times for a container.
+ * @param containerId
+ * @param restartTimes
+ * @throws IOException
+ */
+ public abstract void storeContainerRestartTimes(
+ ContainerId containerId, List restartTimes)
+ throws IOException;
+
/**
* Record working directory for a container.
* @param containerId the container ID
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index c32ff1a66be..728f26b1484 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -105,6 +105,8 @@
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
@@ -1109,6 +1111,38 @@ private void testContainerRestartInterval(
}
}
+ @Test
+ public void testContainerRetryFailureValidityInterval() throws Exception {
+ ContainerRetryContext containerRetryContext = ContainerRetryContext
+ .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 1, 0, 10);
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(25, 314159265358980L, 4200, "test",
+ containerRetryContext);
+ ControlledClock clock = new ControlledClock();
+ wc.setClock(clock);
+ wc.initContainer();
+ wc.localizeResources();
+ wc.launchContainer();
+ wc.containerFailed(12);
+ assertEquals(ContainerState.RUNNING, wc.c.getContainerState());
+ clock.setTime(20);
+ wc.containerFailed(12);
+ assertEquals(ContainerState.RUNNING, wc.c.getContainerState());
+ clock.setTime(40);
+ wc.containerFailed(12);
+ assertEquals(ContainerState.RUNNING, wc.c.getContainerState());
+ clock.setTime(45);
+ wc.containerFailed(12);
+ assertEquals(ContainerState.EXITED_WITH_FAILURE,
+ wc.c.getContainerState());
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
private void verifyCleanupCall(WrappedContainer wc) throws Exception {
ResourcesReleasedMatcher matchesReq =
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
@@ -1574,5 +1608,9 @@ public int getLocalResourceCount() {
public String getDiagnostics() {
return c.cloneAndGetContainerStatus().getDiagnostics();
}
+
+ public void setClock(Clock clock) {
+ ((ContainerImpl)c).setClock(clock);
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestSlidingWindowRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestSlidingWindowRetryPolicy.java
new file mode 100644
index 00000000000..04889a9ecf2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestSlidingWindowRetryPolicy.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for {@link SlidingWindowRetryPolicy}.
+ */
+public class TestSlidingWindowRetryPolicy {
+
+ private ControlledClock clock;
+ private SlidingWindowRetryPolicy retryPolicy;
+
+ @Before
+ public void setup() {
+ clock = new ControlledClock();
+ retryPolicy = new SlidingWindowRetryPolicy(clock);
+ }
+
+ @Test
+ public void testNeverRetry() {
+ ContainerRetryContext retryContext =
+ ContainerRetryContext.NEVER_RETRY_CONTEXT;
+ Assert.assertFalse("never retry", retryPolicy.shouldRetry(
+ new SlidingWindowRetryPolicy.RetryContext(retryContext), 12));
+ }
+
+ @Test
+ public void testAlwaysRetry() {
+ ContainerRetryContext retryContext = ContainerRetryContext.newInstance(
+ ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, -1,
+ 0, 10);
+ Assert.assertTrue("always retry", retryPolicy.shouldRetry(
+ new SlidingWindowRetryPolicy.RetryContext(retryContext), 12));
+ }
+
+ @Test
+ public void testFailuresValidityInterval() {
+ ContainerRetryContext retryContext = ContainerRetryContext
+ .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 1, 0, 10);
+ SlidingWindowRetryPolicy.RetryContext windowRetryContext =
+ new SlidingWindowRetryPolicy.RetryContext(retryContext);
+ Assert.assertTrue("retry 1",
+ retryPolicy.shouldRetry(windowRetryContext, 12));
+ clock.setTime(20);
+ Assert.assertTrue("retry 2",
+ retryPolicy.shouldRetry(windowRetryContext, 12));
+ clock.setTime(40);
+ Assert.assertTrue("retry 3",
+ retryPolicy.shouldRetry(windowRetryContext, 12));
+ clock.setTime(45);
+ Assert.assertFalse("retry failed",
+ retryPolicy.shouldRetry(windowRetryContext, 12));
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 3dca3676b59..b67d11fceb3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -121,6 +121,7 @@ public synchronized void removeApplication(ApplicationId appId)
rcsCopy.startRequest = rcs.startRequest;
rcsCopy.capability = rcs.capability;
rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
+ rcsCopy.setRestartTimes(rcs.getRestartTimes());
rcsCopy.setWorkDir(rcs.getWorkDir());
rcsCopy.setLogDir(rcs.getLogDir());
rcsCopy.setResourceMappings(rcs.getResourceMappings());
@@ -212,6 +213,14 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
rcs.setRemainingRetryAttempts(remainingRetryAttempts);
}
+ @Override
+ public void storeContainerRestartTimes(
+ ContainerId containerId, List restartTimes)
+ throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ rcs.setRestartTimes(restartTimes);
+ }
+
@Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index de667d159e3..c27019917d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -371,6 +371,7 @@ public void testContainerStorage() throws IOException {
assertEquals("/test/workdir", rcs.getWorkDir());
assertEquals("/test/logdir", rcs.getLogDir());
+ validateRetryAttempts(containerId);
// remove the container and verify not recovered
stateStore.removeContainer(containerId);
restartStateStore();
@@ -378,6 +379,21 @@ public void testContainerStorage() throws IOException {
assertTrue(recoveredContainers.isEmpty());
}
+ private void validateRetryAttempts(ContainerId containerId)
+ throws IOException {
+ // store finishTimeForRetryAttempts
+ List finishTimeForRetryAttempts = Arrays.asList(1462700529039L,
+ 1462700529050L, 1462700529120L);
+ stateStore.storeContainerRestartTimes(containerId,
+ finishTimeForRetryAttempts);
+ restartStateStore();
+ RecoveredContainerState rcs = stateStore.loadContainersState().get(0);
+ List recoveredRestartTimes = rcs.getRestartTimes();
+ assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0));
+ assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1));
+ assertEquals(1462700529120L, (long)recoveredRestartTimes.get(2));
+ }
+
private StartContainerRequest createContainerRequest(
ContainerId containerId) {
LocalResource lrsrc = LocalResource.newInstance(