diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java index ef8bd1763e8..aa2b56bc771 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.util.Records; +import java.util.List; import java.util.Set; /** @@ -49,6 +50,13 @@ * *
  • retryInterval specifies delaying some time before relaunch * container, the unit is millisecond.
  • + *
  • + * failuresValidityInterval: default value is -1. + * When failuresValidityInterval in milliseconds is set to {@literal >} 0, + * the failure number will not take failures which happen out of the + * failuresValidityInterval into failure count. If failure count + * reaches to maxRetries, the container will be failed. + *
  • * */ @Public @@ -63,16 +71,29 @@ @Unstable public static ContainerRetryContext newInstance( ContainerRetryPolicy retryPolicy, Set errorCodes, - int maxRetries, int retryInterval) { + int maxRetries, int retryInterval, long failuresValidityInterval, + int remainingRetries, List restartTimes) { ContainerRetryContext containerRetryContext = Records.newRecord(ContainerRetryContext.class); containerRetryContext.setRetryPolicy(retryPolicy); containerRetryContext.setErrorCodes(errorCodes); containerRetryContext.setMaxRetries(maxRetries); containerRetryContext.setRetryInterval(retryInterval); + containerRetryContext.setFailuresValidityInterval(failuresValidityInterval); + containerRetryContext.setRemainingRetries(remainingRetries); + containerRetryContext.setRestartTimes(restartTimes); return containerRetryContext; } + @Private + @Unstable + public static ContainerRetryContext newInstance( + ContainerRetryPolicy retryPolicy, Set errorCodes, + int maxRetries, int retryInterval) { + return newInstance(retryPolicy, errorCodes, maxRetries, retryInterval, -1, + 0, null); + } + public abstract ContainerRetryPolicy getRetryPolicy(); public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy); public abstract Set getErrorCodes(); @@ -81,4 +102,11 @@ public static ContainerRetryContext newInstance( public abstract void setMaxRetries(int maxRetries); public abstract int getRetryInterval(); public abstract void setRetryInterval(int retryInterval); + public abstract long getFailuresValidityInterval(); + public abstract void setFailuresValidityInterval( + long failuresValidityInterval); + public abstract int getRemainingRetries(); + public abstract void setRemainingRetries(int remainingRetries); + public abstract List getRestartTimes(); + public abstract void setRestartTimes(List restartTimes); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 25c85696f0e..9e23462ef72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -759,6 +759,9 @@ message ContainerRetryContextProto { repeated int32 error_codes = 2; optional int32 max_retries = 3 [default = 0]; optional int32 retry_interval = 4 [default = 0]; + optional int64 failures_validity_interval = 5 [default = -1]; + optional int32 remaining_retries = 6 [default = 0]; + repeated int64 restart_times = 7; } enum ContainerRetryPolicyProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 9ba2138d3ac..bbf4b5a1e59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -307,6 +307,7 @@ private Set containerRetryErrorCodes = null; private int containerMaxRetries = 0; private int containrRetryInterval = 0; + private long containerFailuresValidityInterval = -1; // Timeline domain ID private String domainId = null; @@ -470,6 +471,9 @@ public boolean init(String[] args) throws ParseException, IOException { "If container could retry, it specifies max retires"); opts.addOption("container_retry_interval", true, "Interval between each retry, unit is milliseconds"); + opts.addOption("container_failures_validity_interval", true, + "Failures which are out of the time window will not be added to" + + " the number of container retry attempts"); opts.addOption("placement_spec", true, "Placement specification"); opts.addOption("debug", false, "Dump out debug information"); @@ -660,7 +664,8 @@ public boolean init(String[] args) throws ParseException, IOException { cliParser.getOptionValue("container_max_retries", "0")); containrRetryInterval = Integer.parseInt(cliParser.getOptionValue( "container_retry_interval", "0")); - + containerFailuresValidityInterval = Long.parseLong( + cliParser.getOptionValue("container_failures_validity_interval", "-1")); if (!YarnConfiguration.timelineServiceEnabled(conf)) { timelineClient = null; timelineV2Client = null; @@ -1378,7 +1383,8 @@ public void run() { ContainerRetryContext containerRetryContext = ContainerRetryContext.newInstance( containerRetryPolicy, containerRetryErrorCodes, - containerMaxRetries, containrRetryInterval); + containerMaxRetries, containrRetryInterval, + containerFailuresValidityInterval); ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, myShellEnv, commands, null, allTokens.duplicate(), null, containerRetryContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 0aef83f8b38..0728a544acf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -372,6 +372,9 @@ public Client(Configuration conf) throws Exception { "If container could retry, it specifies max retires"); opts.addOption("container_retry_interval", true, "Interval between each retry, unit is milliseconds"); + opts.addOption("container_failures_validity_interval", true, + "Failures which are out of the time window will not be added to" + + " the number of container retry attempts"); opts.addOption("docker_client_config", true, "The docker client configuration path. The scheme should be supplied" + " (i.e. file:// or hdfs://)." @@ -578,6 +581,10 @@ public boolean init(String[] args) throws ParseException { containerRetryOptions.add("--container_retry_interval " + cliParser.getOptionValue("container_retry_interval")); } + if (cliParser.hasOption("container_failures_validity_interval")) { + containerRetryOptions.add("--container_failures_validity_interval " + + cliParser.getOptionValue("container_failures_validity_interval")); + } if (cliParser.hasOption("flow_name")) { flowName = cliParser.getOptionValue("flow_name"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java index a5ef70de2f6..a2eac54674d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java @@ -26,7 +26,9 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProtoOrBuilder; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; /** @@ -39,6 +41,7 @@ private boolean viaProto = false; private Set errorCodes = null; + private List restartTimes = null; public ContainerRetryContextPBImpl() { builder = ContainerRetryContextProto.newBuilder(); @@ -82,6 +85,10 @@ private void mergeLocalToBuilder() { builder.clearErrorCodes(); builder.addAllErrorCodes(this.errorCodes); } + if (this.restartTimes != null) { + builder.clearRestartTimes(); + builder.addAllRestartTimes(this.restartTimes); + } } private void mergeLocalToProto() { @@ -165,6 +172,58 @@ public void setRetryInterval(int retryInterval) { builder.setRetryInterval(retryInterval); } + @Override + public long getFailuresValidityInterval() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFailuresValidityInterval()) { + return -1; + } + return p.getFailuresValidityInterval(); + } + + @Override + public void setFailuresValidityInterval(long failuresValidityInterval) { + maybeInitBuilder(); + builder.setFailuresValidityInterval(failuresValidityInterval); + } + + @Override + public int getRemainingRetries() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRemainingRetries()) { + return 0; + } + return p.getRemainingRetries(); + } + + @Override + public void setRemainingRetries(int remainingRetries) { + maybeInitBuilder(); + builder.setRemainingRetries(remainingRetries); + } + + private void initRestartTimes() { + if (this.restartTimes != null) { + return; + } + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + this.restartTimes = new ArrayList<>(); + this.restartTimes.addAll(p.getRestartTimesList()); + } + + @Override + public List getRestartTimes() { + initRestartTimes(); + return this.restartTimes; + } + + @Override + public void setRestartTimes(List restartTimes) { + maybeInitBuilder(); + builder.clearRestartTimes(); + this.restartTimes = restartTimes; + } + private ContainerRetryPolicyProto convertToProtoFormat( ContainerRetryPolicy containerRetryPolicy) { return ProtoUtils.convertToProtoFormat(containerRetryPolicy); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/retry/SlidingWindowRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/retry/SlidingWindowRetryPolicy.java new file mode 100644 index 00000000000..62b5fbd0b3d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/retry/SlidingWindowRetryPolicy.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.retry; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; +import org.apache.hadoop.yarn.util.Clock; + +import java.util.Iterator; + +/** + *

    Sliding window retry policy for relaunching a + * Container in Yarn

    + */ +@InterfaceStability.Unstable +public class SlidingWindowRetryPolicy { + + private Clock clock; + + public SlidingWindowRetryPolicy(Clock clock) { + this.clock = Preconditions.checkNotNull(clock); + } + + public boolean shouldRetry(ContainerRetryContext retryContext, + int errorCode) { + ContainerRetryPolicy retryPolicy = retryContext.getRetryPolicy(); + if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS + || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES + && retryContext.getErrorCodes() != null + && retryContext.getErrorCodes().contains(errorCode))) { + updateRetryContext(retryContext); + return retryContext.getRemainingRetries() > 0 || + retryContext.getMaxRetries() == ContainerRetryContext.RETRY_FOREVER; + } + return false; + } + + + /** + * Update restart time if failuresValidityInterval is > 0 and + * maxRetries is not RETRY_FOREVER. Otherwise, when + * failuresValidityInterval is < 0 or maxRetries is RETRY_FOREVER + * we just record remaining retry attempts and avoid recording each restart + * time. + */ + private void updateRetryContext(ContainerRetryContext retryContext) { + if (retryContext.getMaxRetries() != + ContainerRetryContext.RETRY_FOREVER && + retryContext.getFailuresValidityInterval() > 0) { + + Iterator iterator = retryContext.getRestartTimes().iterator(); + long currentTime = clock.getTime(); + while (iterator.hasNext()) { + long restartTime = iterator.next(); + if (currentTime - restartTime > + retryContext.getFailuresValidityInterval()) { + iterator.remove(); + } else { + break; + } + } + int remainingRetries = retryContext.getMaxRetries() - + retryContext.getRestartTimes().size(); + retryContext.setRemainingRetries(remainingRetries); + retryContext.getRestartTimes().add(clock.getTime()); + } else { + int remainingRetries = retryContext.getRemainingRetries() - 1; + retryContext.setRemainingRetries(remainingRetries); + } + } + + public void setClock(Clock clock) { + this.clock = Preconditions.checkNotNull(clock); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/retry/TestSlidingWindowRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/retry/TestSlidingWindowRetryPolicy.java new file mode 100644 index 00000000000..ef344cab804 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/retry/TestSlidingWindowRetryPolicy.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.retry; + +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; +import org.apache.hadoop.yarn.util.ControlledClock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestSlidingWindowRetryPolicy { + + private ControlledClock clock; + private SlidingWindowRetryPolicy retryPolicy; + + @Before + public void setup() { + clock = new ControlledClock(); + retryPolicy = new SlidingWindowRetryPolicy(clock); + } + + @Test + public void testNeverRetry() { + ContainerRetryContext retryContext = + ContainerRetryContext.NEVER_RETRY_CONTEXT; + Assert.assertFalse("never retry", + retryPolicy.shouldRetry(retryContext, 12)); + } + + @Test + public void testAlwaysRetry() { + ContainerRetryContext retryContext = ContainerRetryContext.newInstance( + ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, -1, + 0, 10, 0, null); + Assert.assertTrue("always retry", + retryPolicy.shouldRetry(retryContext, 12)); + } + + @Test + public void testFailuresValidityInterval() { + ContainerRetryContext retryContext = ContainerRetryContext + .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 1, 0, 10, + 0, null); + Assert.assertTrue("retry 1", + retryPolicy.shouldRetry(retryContext, 12)); + clock.setTime(20); + Assert.assertTrue("retry 2", + retryPolicy.shouldRetry(retryContext, 12)); + clock.setTime(40); + Assert.assertTrue("retry 3", + retryPolicy.shouldRetry(retryContext, 12)); + clock.setTime(45); + Assert.assertFalse("retry failed", + retryPolicy.shouldRetry(retryContext, 12)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 751beffeb03..7ed712aef5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -23,13 +23,7 @@ import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -37,6 +31,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerSubState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent; +import org.apache.hadoop.yarn.server.retry.SlidingWindowRetryPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,9 +162,10 @@ private ReInitializationContext createContextForRollback() { private long containerLaunchStartTime; private ContainerMetrics containerMetrics; private static Clock clock = SystemClock.getInstance(); + private ContainerRetryContext containerRetryContext; - // remaining retries to relaunch container if needed - private int remainingRetryAttempts; + private SlidingWindowRetryPolicy retryPolicy; + private String workDir; private String logDir; private String host; @@ -246,7 +242,8 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, // Configure the Retry Context this.containerRetryContext = configureRetryContext( conf, launchContext, this.containerId); - this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries(); + this.retryPolicy = new SlidingWindowRetryPolicy(clock); + stateMachine = stateMachineFactory.make(this, ContainerState.NEW, context.getContainerStateTransitionListener()); this.context = context; @@ -289,7 +286,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.recoveredAsKilled = rcs.getKilled(); this.diagnostics.append(rcs.getDiagnostics()); this.version = rcs.getVersion(); - this.remainingRetryAttempts = rcs.getRemainingRetryAttempts(); + this.containerRetryContext.setRemainingRetries( + rcs.getRemainingRetryAttempts()); + this.containerRetryContext.setRestartTimes(rcs.getRestartTimes()); this.workDir = rcs.getWorkDir(); this.logDir = rcs.getLogDir(); this.resourceMappings = rcs.getResourceMappings(); @@ -1592,7 +1591,7 @@ public ContainerState transition(final ContainerImpl container, if (container.containerRetryContext.getRetryPolicy() != ContainerRetryPolicy.NEVER_RETRY) { int n = container.containerRetryContext.getMaxRetries() - - container.remainingRetryAttempts; + - container.containerRetryContext.getRemainingRetries(); container.addDiagnostics("Diagnostic message from attempt " + n + " : ", "\n"); } @@ -1600,18 +1599,9 @@ public ContainerState transition(final ContainerImpl container, } if (container.shouldRetry(container.exitCode)) { - if (container.remainingRetryAttempts > 0) { - container.remainingRetryAttempts--; - try { - container.stateStore.storeContainerRemainingRetryAttempts( - container.getContainerId(), container.remainingRetryAttempts); - } catch (IOException e) { - LOG.warn( - "Unable to update remainingRetryAttempts in state store for " - + container.getContainerId(), e); - } - } - doRelaunch(container, container.remainingRetryAttempts, + container.storeRetryContext(); + doRelaunch(container, + container.containerRetryContext.getRemainingRetries(), container.containerRetryContext.getRetryInterval()); return ContainerState.RELAUNCHING; } else if (container.canRollback()) { @@ -1671,29 +1661,14 @@ public boolean isRetryContextSet() { @Override public boolean shouldRetry(int errorCode) { - return shouldRetry(errorCode, containerRetryContext, - remainingRetryAttempts); - } - - public static boolean shouldRetry(int errorCode, - ContainerRetryContext retryContext, int remainingRetryAttempts) { if (errorCode == ExitCode.SUCCESS.getExitCode() || errorCode == ExitCode.FORCE_KILLED.getExitCode() || errorCode == ExitCode.TERMINATED.getExitCode()) { return false; } - - ContainerRetryPolicy retryPolicy = retryContext.getRetryPolicy(); - if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS - || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES - && retryContext.getErrorCodes() != null - && retryContext.getErrorCodes().contains(errorCode))) { - return remainingRetryAttempts > 0 - || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER; - } - - return false; + return retryPolicy.shouldRetry(containerRetryContext, errorCode); } + /** * Transition to EXITED_WITH_FAILURE */ @@ -1729,9 +1704,7 @@ public void transition(ContainerImpl container, container.containerRetryContext = configureRetryContext(container.context.getConf(), container.launchContext, container.containerId); - // Reset the retry attempts since its a fresh start - container.remainingRetryAttempts = - container.containerRetryContext.getMaxRetries(); + container.retryPolicy = new SlidingWindowRetryPolicy(clock); container.resourceSet = container.reInitContext.mergedResourceSet(container.resourceSet); @@ -2209,4 +2182,31 @@ private static void removeDockerContainer(ContainerImpl container) { container.getContainerId().toString()); deletionService.delete(deletionTask); } + + private void storeRetryContext() { + if (containerRetryContext.getRestartTimes() != null) { + try { + stateStore.storeContainerRestartTimes(containerId, + containerRetryContext.getRestartTimes()); + } catch (IOException e) { + LOG.warn( + "Unable to update finishTimeForRetryAttempts in state store for " + + containerId, e); + } + } + try { + stateStore.storeContainerRemainingRetryAttempts(containerId, + containerRetryContext.getRemainingRetries()); + } catch (IOException e) { + LOG.warn( + "Unable to update remainingRetryAttempts in state store for " + + containerId, e); + } + } + + @VisibleForTesting + void setClock(Clock targetClock) { + clock = targetClock; + retryPolicy.setClock(clock); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 0f659d90754..bf4c0ad5f61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -127,6 +127,8 @@ private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX = "/remainingRetryAttempts"; + private static final String CONTAINER_RESTART_TIMES_SUFFIX = + "/restartTimes"; private static final String CONTAINER_WORK_DIR_KEY_SUFFIX = "/workdir"; private static final String CONTAINER_LOG_DIR_KEY_SUFFIX = "/logdir"; @@ -338,6 +340,16 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { rcs.setRemainingRetryAttempts( Integer.parseInt(asString(entry.getValue()))); + } else if (suffix.equals(CONTAINER_RESTART_TIMES_SUFFIX)) { + String value = asString(entry.getValue()); + // parse the string format of List, e.g. [34, 21, 22] + String[] unparsedRestartTimes = + value.substring(1, value.length() - 1).split(", "); + List restartTimes = new ArrayList<>(); + for (String restartTime : unparsedRestartTimes) { + restartTimes.add(Long.parseLong(restartTime)); + } + rcs.setRestartTimes(restartTimes); } else if (suffix.equals(CONTAINER_WORK_DIR_KEY_SUFFIX)) { rcs.setWorkDir(asString(entry.getValue())); } else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) { @@ -581,6 +593,18 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId, } } + @Override + public void storeContainerRestartTimes(ContainerId containerId, + List restartTimes) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_RESTART_TIMES_SUFFIX; + try { + db.put(bytes(key), bytes(restartTimes.toString())); + } catch (DBException e) { + throw new IOException(e); + } + } + @Override public void storeContainerWorkDir(ContainerId containerId, String workDir) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 2d522a9b6fa..b9466ed8a6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -119,6 +119,11 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId, int remainingRetryAttempts) throws IOException { } + @Override + public void storeContainerRestartTimes(ContainerId containerId, + List restartTimes) throws IOException { + } + @Override public void storeContainerWorkDir(ContainerId containerId, String workDir) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index f9b86bf84ea..0ea0ef3b86c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -98,6 +98,7 @@ public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { StartContainerRequest startRequest; Resource capability; private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID; + private List restartTimes; private String workDir; private String logDir; int version; @@ -150,6 +151,15 @@ public void setRemainingRetryAttempts(int retryAttempts) { this.remainingRetryAttempts = retryAttempts; } + public List getRestartTimes() { + return restartTimes; + } + + public void setRestartTimes( + List restartTimes) { + this.restartTimes = restartTimes; + } + public String getWorkDir() { return workDir; } @@ -177,6 +187,7 @@ public String toString() { .append(", Capability: ").append(getCapability()) .append(", StartRequest: ").append(getStartRequest()) .append(", RemainingRetryAttempts: ").append(remainingRetryAttempts) + .append(", RestartTimes: ").append(restartTimes) .append(", WorkDir: ").append(workDir) .append(", LogDir: ").append(logDir) .toString(); @@ -486,6 +497,16 @@ public abstract void storeContainerDiagnostics(ContainerId containerId, public abstract void storeContainerRemainingRetryAttempts( ContainerId containerId, int remainingRetryAttempts) throws IOException; + /** + * Record restart times for a container. + * @param containerId + * @param restartTimes + * @throws IOException + */ + public abstract void storeContainerRestartTimes( + ContainerId containerId, List restartTimes) + throws IOException; + /** * Record working directory for a container. * @param containerId the container ID diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index c7094a05a3a..157d249f4c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -105,6 +105,8 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentMatcher; @@ -1088,6 +1090,39 @@ private void testContainerRestartInterval( } } + @Test + public void testContainerRetryFailureValidityInterval() throws Exception { + ContainerRetryContext containerRetryContext = ContainerRetryContext + .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 1, 0, 10, + 0, null); + WrappedContainer wc = null; + try { + wc = new WrappedContainer(25, 314159265358980L, 4200, "test", + containerRetryContext); + ControlledClock clock = new ControlledClock(); + wc.setClock(clock); + wc.initContainer(); + wc.localizeResources(); + wc.launchContainer(); + wc.containerFailed(12); + assertEquals(ContainerState.RUNNING, wc.c.getContainerState()); + clock.setTime(20); + wc.containerFailed(12); + assertEquals(ContainerState.RUNNING, wc.c.getContainerState()); + clock.setTime(40); + wc.containerFailed(12); + assertEquals(ContainerState.RUNNING, wc.c.getContainerState()); + clock.setTime(45); + wc.containerFailed(12); + assertEquals(ContainerState.EXITED_WITH_FAILURE, + wc.c.getContainerState()); + } finally { + if (wc != null) { + wc.finished(); + } + } + } + private void verifyCleanupCall(WrappedContainer wc) throws Exception { ResourcesReleasedMatcher matchesReq = new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( @@ -1548,5 +1583,9 @@ public int getLocalResourceCount() { public String getDiagnostics() { return c.cloneAndGetContainerStatus().getDiagnostics(); } + + public void setClock(Clock clock) { + ((ContainerImpl)c).setClock(clock); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 3dca3676b59..b67d11fceb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -121,6 +121,7 @@ public synchronized void removeApplication(ApplicationId appId) rcsCopy.startRequest = rcs.startRequest; rcsCopy.capability = rcs.capability; rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts()); + rcsCopy.setRestartTimes(rcs.getRestartTimes()); rcsCopy.setWorkDir(rcs.getWorkDir()); rcsCopy.setLogDir(rcs.getLogDir()); rcsCopy.setResourceMappings(rcs.getResourceMappings()); @@ -212,6 +213,14 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId, rcs.setRemainingRetryAttempts(remainingRetryAttempts); } + @Override + public void storeContainerRestartTimes( + ContainerId containerId, List restartTimes) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.setRestartTimes(restartTimes); + } + @Override public void storeContainerWorkDir(ContainerId containerId, String workDir) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index de667d159e3..d8b3f415307 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -363,6 +363,11 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerRemainingRetryAttempts(containerId, 6); stateStore.storeContainerWorkDir(containerId, "/test/workdir"); stateStore.storeContainerLogDir(containerId, "/test/logdir"); + // store finishTimeForRetryAttempts + List finishTimeForRetryAttempts = Arrays.asList(1462700529039L, + 1462700529050L, 1462700529120L); + stateStore.storeContainerRestartTimes(containerId, + finishTimeForRetryAttempts); restartStateStore(); recoveredContainers = stateStore.loadContainersState(); assertEquals(1, recoveredContainers.size()); @@ -370,6 +375,10 @@ public void testContainerStorage() throws IOException { assertEquals(6, rcs.getRemainingRetryAttempts()); assertEquals("/test/workdir", rcs.getWorkDir()); assertEquals("/test/logdir", rcs.getLogDir()); + List recoveredRestartTimes = rcs.getRestartTimes(); + assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0)); + assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1)); + assertEquals(1462700529120L, (long)recoveredRestartTimes.get(2)); // remove the container and verify not recovered stateStore.removeContainer(containerId);