diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java
index ef8bd1763e8..227a740c799 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
+import java.util.List;
import java.util.Set;
/**
@@ -49,6 +50,22 @@
*
*
retryInterval specifies delaying some time before relaunch
* container, the unit is millisecond.
+ *
+ * failuresValidityInterval: default value is -1.
+ * When failuresValidityInterval in milliseconds is set to {@literal >} 0,
+ * the failure number will not take failures which happen out of the
+ * failuresValidityInterval into failure count. If failure count
+ * reaches to maxRetries, the container will be failed.
+ *
+ *
+ * remainingRetries: default value is -1.
+ * If the value is -1, it means this is not set. Otherwise, it indicates
+ * the remaining number of retries.
+ *
+ *
+ * restartTimes: when failuresValidityInterval is set, then this
+ * records the times when the container is set to restart.
+ *
*
*/
@Public
@@ -63,16 +80,40 @@
@Unstable
public static ContainerRetryContext newInstance(
ContainerRetryPolicy retryPolicy, Set errorCodes,
- int maxRetries, int retryInterval) {
+ int maxRetries, int retryInterval, long failuresValidityInterval,
+ int remainingRetries, List restartTimes) {
ContainerRetryContext containerRetryContext =
Records.newRecord(ContainerRetryContext.class);
containerRetryContext.setRetryPolicy(retryPolicy);
containerRetryContext.setErrorCodes(errorCodes);
containerRetryContext.setMaxRetries(maxRetries);
containerRetryContext.setRetryInterval(retryInterval);
+ containerRetryContext.setFailuresValidityInterval(failuresValidityInterval);
+ containerRetryContext.setRemainingRetries(remainingRetries);
+ containerRetryContext.setRestartTimes(restartTimes);
return containerRetryContext;
}
+ @Private
+ @Unstable
+ public static ContainerRetryContext newInstance(
+ ContainerRetryPolicy retryPolicy, Set errorCodes,
+ int maxRetries, int retryInterval, long failuresValidityInterval) {
+ return newInstance(retryPolicy, errorCodes, maxRetries, retryInterval,
+ failuresValidityInterval,
+ -1, null);
+ }
+
+
+ @Private
+ @Unstable
+ public static ContainerRetryContext newInstance(
+ ContainerRetryPolicy retryPolicy, Set errorCodes,
+ int maxRetries, int retryInterval) {
+ return newInstance(retryPolicy, errorCodes, maxRetries, retryInterval, -1,
+ -1, null);
+ }
+
public abstract ContainerRetryPolicy getRetryPolicy();
public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy);
public abstract Set getErrorCodes();
@@ -81,4 +122,11 @@ public static ContainerRetryContext newInstance(
public abstract void setMaxRetries(int maxRetries);
public abstract int getRetryInterval();
public abstract void setRetryInterval(int retryInterval);
+ public abstract long getFailuresValidityInterval();
+ public abstract void setFailuresValidityInterval(
+ long failuresValidityInterval);
+ public abstract int getRemainingRetries();
+ public abstract void setRemainingRetries(int remainingRetries);
+ public abstract List getRestartTimes();
+ public abstract void setRestartTimes(List restartTimes);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 25c85696f0e..5468ed36ef9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -759,6 +759,9 @@ message ContainerRetryContextProto {
repeated int32 error_codes = 2;
optional int32 max_retries = 3 [default = 0];
optional int32 retry_interval = 4 [default = 0];
+ optional int64 failures_validity_interval = 5 [default = -1];
+ optional int32 remaining_retries = 6 [default = -1];
+ repeated int64 restart_times = 7;
}
enum ContainerRetryPolicyProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index a06ee7c6d36..94501274a7d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -309,6 +309,7 @@
private Set containerRetryErrorCodes = null;
private int containerMaxRetries = 0;
private int containrRetryInterval = 0;
+ private long containerFailuresValidityInterval = -1;
// Timeline domain ID
private String domainId = null;
@@ -472,6 +473,9 @@ public boolean init(String[] args) throws ParseException, IOException {
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
+ opts.addOption("container_failures_validity_interval", true,
+ "Failures which are out of the time window will not be added to"
+ + " the number of container retry attempts");
opts.addOption("placement_spec", true, "Placement specification");
opts.addOption("debug", false, "Dump out debug information");
@@ -662,7 +666,8 @@ public boolean init(String[] args) throws ParseException, IOException {
cliParser.getOptionValue("container_max_retries", "0"));
containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
"container_retry_interval", "0"));
-
+ containerFailuresValidityInterval = Long.parseLong(
+ cliParser.getOptionValue("container_failures_validity_interval", "-1"));
if (!YarnConfiguration.timelineServiceEnabled(conf)) {
timelineClient = null;
timelineV2Client = null;
@@ -1386,7 +1391,8 @@ public void run() {
ContainerRetryContext containerRetryContext =
ContainerRetryContext.newInstance(
containerRetryPolicy, containerRetryErrorCodes,
- containerMaxRetries, containrRetryInterval);
+ containerMaxRetries, containrRetryInterval,
+ containerFailuresValidityInterval);
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, myShellEnv, commands, null, allTokens.duplicate(),
null, containerRetryContext);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index ac58662258f..5f1b6509842 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -374,6 +374,9 @@ public Client(Configuration conf) throws Exception {
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
+ opts.addOption("container_failures_validity_interval", true,
+ "Failures which are out of the time window will not be added to"
+ + " the number of container retry attempts");
opts.addOption("docker_client_config", true,
"The docker client configuration path. The scheme should be supplied"
+ " (i.e. file:// or hdfs://)."
@@ -580,6 +583,10 @@ public boolean init(String[] args) throws ParseException {
containerRetryOptions.add("--container_retry_interval "
+ cliParser.getOptionValue("container_retry_interval"));
}
+ if (cliParser.hasOption("container_failures_validity_interval")) {
+ containerRetryOptions.add("--container_failures_validity_interval "
+ + cliParser.getOptionValue("container_failures_validity_interval"));
+ }
if (cliParser.hasOption("flow_name")) {
flowName = cliParser.getOptionValue("flow_name");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
index a5ef70de2f6..f595bd1119d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java
@@ -26,7 +26,9 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProtoOrBuilder;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
/**
@@ -39,6 +41,7 @@
private boolean viaProto = false;
private Set errorCodes = null;
+ private List restartTimes = null;
public ContainerRetryContextPBImpl() {
builder = ContainerRetryContextProto.newBuilder();
@@ -82,6 +85,10 @@ private void mergeLocalToBuilder() {
builder.clearErrorCodes();
builder.addAllErrorCodes(this.errorCodes);
}
+ if (this.restartTimes != null) {
+ builder.clearRestartTimes();
+ builder.addAllRestartTimes(this.restartTimes);
+ }
}
private void mergeLocalToProto() {
@@ -165,6 +172,58 @@ public void setRetryInterval(int retryInterval) {
builder.setRetryInterval(retryInterval);
}
+ @Override
+ public long getFailuresValidityInterval() {
+ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasFailuresValidityInterval()) {
+ return -1;
+ }
+ return p.getFailuresValidityInterval();
+ }
+
+ @Override
+ public void setFailuresValidityInterval(long failuresValidityInterval) {
+ maybeInitBuilder();
+ builder.setFailuresValidityInterval(failuresValidityInterval);
+ }
+
+ @Override
+ public int getRemainingRetries() {
+ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasRemainingRetries()) {
+ return -1;
+ }
+ return p.getRemainingRetries();
+ }
+
+ @Override
+ public void setRemainingRetries(int remainingRetries) {
+ maybeInitBuilder();
+ builder.setRemainingRetries(remainingRetries);
+ }
+
+ private void initRestartTimes() {
+ if (this.restartTimes != null) {
+ return;
+ }
+ ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
+ this.restartTimes = new ArrayList<>();
+ this.restartTimes.addAll(p.getRestartTimesList());
+ }
+
+ @Override
+ public List getRestartTimes() {
+ initRestartTimes();
+ return this.restartTimes;
+ }
+
+ @Override
+ public void setRestartTimes(List restartTimes) {
+ maybeInitBuilder();
+ builder.clearRestartTimes();
+ this.restartTimes = restartTimes;
+ }
+
private ContainerRetryPolicyProto convertToProtoFormat(
ContainerRetryPolicy containerRetryPolicy) {
return ProtoUtils.convertToProtoFormat(containerRetryPolicy);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/retry/SlidingWindowRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/retry/SlidingWindowRetryPolicy.java
new file mode 100644
index 00000000000..3ada5097928
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/retry/SlidingWindowRetryPolicy.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.retry;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
+import org.apache.hadoop.yarn.util.Clock;
+
+import java.util.Iterator;
+
+/**
+ * Sliding window retry policy for relaunching a
+ * Container in Yarn.
+ */
+@InterfaceStability.Unstable
+public class SlidingWindowRetryPolicy {
+
+ private Clock clock;
+
+ public SlidingWindowRetryPolicy(Clock clock) {
+ this.clock = Preconditions.checkNotNull(clock);
+ }
+
+ public boolean shouldRetry(ContainerRetryContext retryContext,
+ int errorCode) {
+ ContainerRetryPolicy retryPolicy = retryContext.getRetryPolicy();
+ if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
+ || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
+ && retryContext.getErrorCodes() != null
+ && retryContext.getErrorCodes().contains(errorCode))) {
+ int pendingRetries = calculatePendingRetries(retryContext);
+ updateRetryContext(retryContext, pendingRetries);
+ return pendingRetries > 0 ||
+ retryContext.getMaxRetries() == ContainerRetryContext.RETRY_FOREVER;
+ }
+ return false;
+ }
+
+ /**
+ * Updates restart time if failuresValidityInterval is > 0 and
+ * maxRetries is not RETRY_FOREVER. Otherwise, when
+ * failuresValidityInterval is < 0 or maxRetries is RETRY_FOREVER
+ * we just record remaining retry attempts and avoid recording each restart
+ * time.
+ *
+ * @return the pending retries.
+ */
+ private int calculatePendingRetries(ContainerRetryContext retryContext) {
+ if (retryContext.getMaxRetries() !=
+ ContainerRetryContext.RETRY_FOREVER &&
+ retryContext.getFailuresValidityInterval() > 0) {
+
+ Iterator iterator = retryContext.getRestartTimes().iterator();
+ long currentTime = clock.getTime();
+ while (iterator.hasNext()) {
+ long restartTime = iterator.next();
+ if (currentTime - restartTime >
+ retryContext.getFailuresValidityInterval()) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+ return retryContext.getMaxRetries() -
+ retryContext.getRestartTimes().size();
+ } else {
+ return retryContext.getRemainingRetries() == -1 ?
+ retryContext.getMaxRetries() :
+ retryContext.getRemainingRetries();
+ }
+ }
+
+ /**
+ * Updates remaining retries and the restart time when
+ * required in the retryContext.
+ */
+ private void updateRetryContext(ContainerRetryContext retryContext,
+ int pendingRetries) {
+ retryContext.setRemainingRetries(pendingRetries -1);
+ if (retryContext.getMaxRetries() !=
+ ContainerRetryContext.RETRY_FOREVER &&
+ retryContext.getFailuresValidityInterval() > 0) {
+ retryContext.getRestartTimes().add(clock.getTime());
+ }
+ }
+
+ public void setClock(Clock clock) {
+ this.clock = Preconditions.checkNotNull(clock);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/retry/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/retry/package-info.java
new file mode 100644
index 00000000000..30bf5c95286
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/retry/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utility classes for retrying container launch.
+ */
+package org.apache.hadoop.yarn.server.retry;
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/retry/TestSlidingWindowRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/retry/TestSlidingWindowRetryPolicy.java
new file mode 100644
index 00000000000..4ee8c9a960d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/retry/TestSlidingWindowRetryPolicy.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.retry;
+
+import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
+import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for {@link SlidingWindowRetryPolicy}.
+ */
+public class TestSlidingWindowRetryPolicy {
+
+ private ControlledClock clock;
+ private SlidingWindowRetryPolicy retryPolicy;
+
+ @Before
+ public void setup() {
+ clock = new ControlledClock();
+ retryPolicy = new SlidingWindowRetryPolicy(clock);
+ }
+
+ @Test
+ public void testNeverRetry() {
+ ContainerRetryContext retryContext =
+ ContainerRetryContext.NEVER_RETRY_CONTEXT;
+ Assert.assertFalse("never retry",
+ retryPolicy.shouldRetry(retryContext, 12));
+ }
+
+ @Test
+ public void testAlwaysRetry() {
+ ContainerRetryContext retryContext = ContainerRetryContext.newInstance(
+ ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, -1,
+ 0, 10);
+ Assert.assertTrue("always retry",
+ retryPolicy.shouldRetry(retryContext, 12));
+ }
+
+ @Test
+ public void testFailuresValidityInterval() {
+ ContainerRetryContext retryContext = ContainerRetryContext
+ .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 1, 0, 10);
+ Assert.assertTrue("retry 1",
+ retryPolicy.shouldRetry(retryContext, 12));
+ clock.setTime(20);
+ Assert.assertTrue("retry 2",
+ retryPolicy.shouldRetry(retryContext, 12));
+ clock.setTime(40);
+ Assert.assertTrue("retry 3",
+ retryPolicy.shouldRetry(retryContext, 12));
+ clock.setTime(45);
+ Assert.assertFalse("retry failed",
+ retryPolicy.shouldRetry(retryContext, 12));
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 751beffeb03..7ed712aef5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -23,13 +23,7 @@
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -37,6 +31,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
+import org.apache.hadoop.yarn.server.retry.SlidingWindowRetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -167,9 +162,10 @@ private ReInitializationContext createContextForRollback() {
private long containerLaunchStartTime;
private ContainerMetrics containerMetrics;
private static Clock clock = SystemClock.getInstance();
+
private ContainerRetryContext containerRetryContext;
- // remaining retries to relaunch container if needed
- private int remainingRetryAttempts;
+ private SlidingWindowRetryPolicy retryPolicy;
+
private String workDir;
private String logDir;
private String host;
@@ -246,7 +242,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
// Configure the Retry Context
this.containerRetryContext = configureRetryContext(
conf, launchContext, this.containerId);
- this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
+ this.retryPolicy = new SlidingWindowRetryPolicy(clock);
+
stateMachine = stateMachineFactory.make(this, ContainerState.NEW,
context.getContainerStateTransitionListener());
this.context = context;
@@ -289,7 +286,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
this.recoveredAsKilled = rcs.getKilled();
this.diagnostics.append(rcs.getDiagnostics());
this.version = rcs.getVersion();
- this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
+ this.containerRetryContext.setRemainingRetries(
+ rcs.getRemainingRetryAttempts());
+ this.containerRetryContext.setRestartTimes(rcs.getRestartTimes());
this.workDir = rcs.getWorkDir();
this.logDir = rcs.getLogDir();
this.resourceMappings = rcs.getResourceMappings();
@@ -1592,7 +1591,7 @@ public ContainerState transition(final ContainerImpl container,
if (container.containerRetryContext.getRetryPolicy()
!= ContainerRetryPolicy.NEVER_RETRY) {
int n = container.containerRetryContext.getMaxRetries()
- - container.remainingRetryAttempts;
+ - container.containerRetryContext.getRemainingRetries();
container.addDiagnostics("Diagnostic message from attempt "
+ n + " : ", "\n");
}
@@ -1600,18 +1599,9 @@ public ContainerState transition(final ContainerImpl container,
}
if (container.shouldRetry(container.exitCode)) {
- if (container.remainingRetryAttempts > 0) {
- container.remainingRetryAttempts--;
- try {
- container.stateStore.storeContainerRemainingRetryAttempts(
- container.getContainerId(), container.remainingRetryAttempts);
- } catch (IOException e) {
- LOG.warn(
- "Unable to update remainingRetryAttempts in state store for "
- + container.getContainerId(), e);
- }
- }
- doRelaunch(container, container.remainingRetryAttempts,
+ container.storeRetryContext();
+ doRelaunch(container,
+ container.containerRetryContext.getRemainingRetries(),
container.containerRetryContext.getRetryInterval());
return ContainerState.RELAUNCHING;
} else if (container.canRollback()) {
@@ -1671,29 +1661,14 @@ public boolean isRetryContextSet() {
@Override
public boolean shouldRetry(int errorCode) {
- return shouldRetry(errorCode, containerRetryContext,
- remainingRetryAttempts);
- }
-
- public static boolean shouldRetry(int errorCode,
- ContainerRetryContext retryContext, int remainingRetryAttempts) {
if (errorCode == ExitCode.SUCCESS.getExitCode()
|| errorCode == ExitCode.FORCE_KILLED.getExitCode()
|| errorCode == ExitCode.TERMINATED.getExitCode()) {
return false;
}
-
- ContainerRetryPolicy retryPolicy = retryContext.getRetryPolicy();
- if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
- || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
- && retryContext.getErrorCodes() != null
- && retryContext.getErrorCodes().contains(errorCode))) {
- return remainingRetryAttempts > 0
- || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER;
- }
-
- return false;
+ return retryPolicy.shouldRetry(containerRetryContext, errorCode);
}
+
/**
* Transition to EXITED_WITH_FAILURE
*/
@@ -1729,9 +1704,7 @@ public void transition(ContainerImpl container,
container.containerRetryContext =
configureRetryContext(container.context.getConf(),
container.launchContext, container.containerId);
- // Reset the retry attempts since its a fresh start
- container.remainingRetryAttempts =
- container.containerRetryContext.getMaxRetries();
+ container.retryPolicy = new SlidingWindowRetryPolicy(clock);
container.resourceSet =
container.reInitContext.mergedResourceSet(container.resourceSet);
@@ -2209,4 +2182,31 @@ private static void removeDockerContainer(ContainerImpl container) {
container.getContainerId().toString());
deletionService.delete(deletionTask);
}
+
+ private void storeRetryContext() {
+ if (containerRetryContext.getRestartTimes() != null) {
+ try {
+ stateStore.storeContainerRestartTimes(containerId,
+ containerRetryContext.getRestartTimes());
+ } catch (IOException e) {
+ LOG.warn(
+ "Unable to update finishTimeForRetryAttempts in state store for "
+ + containerId, e);
+ }
+ }
+ try {
+ stateStore.storeContainerRemainingRetryAttempts(containerId,
+ containerRetryContext.getRemainingRetries());
+ } catch (IOException e) {
+ LOG.warn(
+ "Unable to update remainingRetryAttempts in state store for "
+ + containerId, e);
+ }
+ }
+
+ @VisibleForTesting
+ void setClock(Clock targetClock) {
+ clock = targetClock;
+ retryPolicy.setClock(clock);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index 0f659d90754..bf4c0ad5f61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -127,6 +127,8 @@
private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX =
"/remainingRetryAttempts";
+ private static final String CONTAINER_RESTART_TIMES_SUFFIX =
+ "/restartTimes";
private static final String CONTAINER_WORK_DIR_KEY_SUFFIX = "/workdir";
private static final String CONTAINER_LOG_DIR_KEY_SUFFIX = "/logdir";
@@ -338,6 +340,16 @@ private RecoveredContainerState loadContainerState(ContainerId containerId,
} else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) {
rcs.setRemainingRetryAttempts(
Integer.parseInt(asString(entry.getValue())));
+ } else if (suffix.equals(CONTAINER_RESTART_TIMES_SUFFIX)) {
+ String value = asString(entry.getValue());
+ // parse the string format of List, e.g. [34, 21, 22]
+ String[] unparsedRestartTimes =
+ value.substring(1, value.length() - 1).split(", ");
+ List restartTimes = new ArrayList<>();
+ for (String restartTime : unparsedRestartTimes) {
+ restartTimes.add(Long.parseLong(restartTime));
+ }
+ rcs.setRestartTimes(restartTimes);
} else if (suffix.equals(CONTAINER_WORK_DIR_KEY_SUFFIX)) {
rcs.setWorkDir(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
@@ -581,6 +593,18 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
}
}
+ @Override
+ public void storeContainerRestartTimes(ContainerId containerId,
+ List restartTimes) throws IOException {
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ + CONTAINER_RESTART_TIMES_SUFFIX;
+ try {
+ db.put(bytes(key), bytes(restartTimes.toString()));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
@Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 2d522a9b6fa..b9466ed8a6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -119,6 +119,11 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
int remainingRetryAttempts) throws IOException {
}
+ @Override
+ public void storeContainerRestartTimes(ContainerId containerId,
+ List restartTimes) throws IOException {
+ }
+
@Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index f9b86bf84ea..0ea0ef3b86c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -98,6 +98,7 @@ public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
StartContainerRequest startRequest;
Resource capability;
private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID;
+ private List restartTimes;
private String workDir;
private String logDir;
int version;
@@ -150,6 +151,15 @@ public void setRemainingRetryAttempts(int retryAttempts) {
this.remainingRetryAttempts = retryAttempts;
}
+ public List getRestartTimes() {
+ return restartTimes;
+ }
+
+ public void setRestartTimes(
+ List restartTimes) {
+ this.restartTimes = restartTimes;
+ }
+
public String getWorkDir() {
return workDir;
}
@@ -177,6 +187,7 @@ public String toString() {
.append(", Capability: ").append(getCapability())
.append(", StartRequest: ").append(getStartRequest())
.append(", RemainingRetryAttempts: ").append(remainingRetryAttempts)
+ .append(", RestartTimes: ").append(restartTimes)
.append(", WorkDir: ").append(workDir)
.append(", LogDir: ").append(logDir)
.toString();
@@ -486,6 +497,16 @@ public abstract void storeContainerDiagnostics(ContainerId containerId,
public abstract void storeContainerRemainingRetryAttempts(
ContainerId containerId, int remainingRetryAttempts) throws IOException;
+ /**
+ * Record restart times for a container.
+ * @param containerId
+ * @param restartTimes
+ * @throws IOException
+ */
+ public abstract void storeContainerRestartTimes(
+ ContainerId containerId, List restartTimes)
+ throws IOException;
+
/**
* Record working directory for a container.
* @param containerId the container ID
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index c7094a05a3a..ba1ffc10e1d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -105,6 +105,8 @@
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
@@ -1088,6 +1090,38 @@ private void testContainerRestartInterval(
}
}
+ @Test
+ public void testContainerRetryFailureValidityInterval() throws Exception {
+ ContainerRetryContext containerRetryContext = ContainerRetryContext
+ .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 1, 0, 10);
+ WrappedContainer wc = null;
+ try {
+ wc = new WrappedContainer(25, 314159265358980L, 4200, "test",
+ containerRetryContext);
+ ControlledClock clock = new ControlledClock();
+ wc.setClock(clock);
+ wc.initContainer();
+ wc.localizeResources();
+ wc.launchContainer();
+ wc.containerFailed(12);
+ assertEquals(ContainerState.RUNNING, wc.c.getContainerState());
+ clock.setTime(20);
+ wc.containerFailed(12);
+ assertEquals(ContainerState.RUNNING, wc.c.getContainerState());
+ clock.setTime(40);
+ wc.containerFailed(12);
+ assertEquals(ContainerState.RUNNING, wc.c.getContainerState());
+ clock.setTime(45);
+ wc.containerFailed(12);
+ assertEquals(ContainerState.EXITED_WITH_FAILURE,
+ wc.c.getContainerState());
+ } finally {
+ if (wc != null) {
+ wc.finished();
+ }
+ }
+ }
+
private void verifyCleanupCall(WrappedContainer wc) throws Exception {
ResourcesReleasedMatcher matchesReq =
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
@@ -1548,5 +1582,9 @@ public int getLocalResourceCount() {
public String getDiagnostics() {
return c.cloneAndGetContainerStatus().getDiagnostics();
}
+
+ public void setClock(Clock clock) {
+ ((ContainerImpl)c).setClock(clock);
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index 3dca3676b59..b67d11fceb3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -121,6 +121,7 @@ public synchronized void removeApplication(ApplicationId appId)
rcsCopy.startRequest = rcs.startRequest;
rcsCopy.capability = rcs.capability;
rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
+ rcsCopy.setRestartTimes(rcs.getRestartTimes());
rcsCopy.setWorkDir(rcs.getWorkDir());
rcsCopy.setLogDir(rcs.getLogDir());
rcsCopy.setResourceMappings(rcs.getResourceMappings());
@@ -212,6 +213,14 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
rcs.setRemainingRetryAttempts(remainingRetryAttempts);
}
+ @Override
+ public void storeContainerRestartTimes(
+ ContainerId containerId, List restartTimes)
+ throws IOException {
+ RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+ rcs.setRestartTimes(restartTimes);
+ }
+
@Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index de667d159e3..c27019917d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -371,6 +371,7 @@ public void testContainerStorage() throws IOException {
assertEquals("/test/workdir", rcs.getWorkDir());
assertEquals("/test/logdir", rcs.getLogDir());
+ validateRetryAttempts(containerId);
// remove the container and verify not recovered
stateStore.removeContainer(containerId);
restartStateStore();
@@ -378,6 +379,21 @@ public void testContainerStorage() throws IOException {
assertTrue(recoveredContainers.isEmpty());
}
+ private void validateRetryAttempts(ContainerId containerId)
+ throws IOException {
+ // store finishTimeForRetryAttempts
+ List finishTimeForRetryAttempts = Arrays.asList(1462700529039L,
+ 1462700529050L, 1462700529120L);
+ stateStore.storeContainerRestartTimes(containerId,
+ finishTimeForRetryAttempts);
+ restartStateStore();
+ RecoveredContainerState rcs = stateStore.loadContainersState().get(0);
+ List recoveredRestartTimes = rcs.getRestartTimes();
+ assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0));
+ assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1));
+ assertEquals(1462700529120L, (long)recoveredRestartTimes.get(2));
+ }
+
private StartContainerRequest createContainerRequest(
ContainerId containerId) {
LocalResource lrsrc = LocalResource.newInstance(