diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java index ef8bd17..ebe5d92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java @@ -49,6 +49,13 @@ * *
  • retryInterval specifies delaying some time before relaunch * container, the unit is millisecond.
  • + *
  • + * failuresValidityInterval: default value is -1. + * When failuresValidityInterval in milliseconds is set to {@literal >} 0, + * the failure number will not take failures which happen out of the + * failuresValidityInterval into failure count. If failure count + * reaches to maxRetries, the container will be failed. + *
  • * */ @Public @@ -57,22 +64,31 @@ public static final int RETRY_FOREVER = -1; public static final int RETRY_INVALID = -1000; public static final ContainerRetryContext NEVER_RETRY_CONTEXT = - newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 0, 0); + newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 0, 0, -1); @Private @Unstable public static ContainerRetryContext newInstance( ContainerRetryPolicy retryPolicy, Set errorCodes, - int maxRetries, int retryInterval) { + int maxRetries, int retryInterval, long failuresValidityInterval) { ContainerRetryContext containerRetryContext = Records.newRecord(ContainerRetryContext.class); containerRetryContext.setRetryPolicy(retryPolicy); containerRetryContext.setErrorCodes(errorCodes); containerRetryContext.setMaxRetries(maxRetries); containerRetryContext.setRetryInterval(retryInterval); + containerRetryContext.setFailuresValidityInterval(failuresValidityInterval); return containerRetryContext; } + @Private + @Unstable + public static ContainerRetryContext newInstance( + ContainerRetryPolicy retryPolicy, Set errorCodes, + int maxRetries, int retryInterval) { + return newInstance(retryPolicy, errorCodes, maxRetries, retryInterval, -1); + } + public abstract ContainerRetryPolicy getRetryPolicy(); public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy); public abstract Set getErrorCodes(); @@ -81,4 +97,7 @@ public static ContainerRetryContext newInstance( public abstract void setMaxRetries(int maxRetries); public abstract int getRetryInterval(); public abstract void setRetryInterval(int retryInterval); + public abstract long getFailuresValidityInterval(); + public abstract void setFailuresValidityInterval( + long failuresValidityInterval); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 60cdfd1..1f99548 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -540,6 +540,7 @@ message ContainerRetryContextProto { repeated int32 error_codes = 2; optional int32 max_retries = 3 [default = 0]; optional int32 retry_interval = 4 [default = 0]; + optional int64 failures_validity_interval = 5 [default = -1]; } enum ContainerRetryPolicyProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 2973974..d569240 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -265,6 +265,7 @@ private Set containerRetryErrorCodes = null; private int containerMaxRetries = 0; private int containrRetryInterval = 0; + private long containerFailuresValidityInterval = -1; // Timeline domain ID private String domainId = null; @@ -400,6 +401,9 @@ public boolean init(String[] args) throws ParseException, IOException { "If container could retry, it specifies max retires"); opts.addOption("container_retry_interval", true, "Interval between each retry, unit is milliseconds"); + opts.addOption("container_failures_validity_interval", true, + "Failures which are out of the time window will not be added to" + + " the number of container retry attempts"); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -552,6 +556,8 @@ public boolean init(String[] args) throws ParseException, IOException { cliParser.getOptionValue("container_max_retries", "0")); containrRetryInterval = Integer.parseInt(cliParser.getOptionValue( "container_retry_interval", "0")); + containerFailuresValidityInterval = Long.parseLong( + cliParser.getOptionValue("container_failures_validity_interval", "-1")); return true; } @@ -1109,7 +1115,8 @@ public void run() { ContainerRetryContext containerRetryContext = ContainerRetryContext.newInstance( containerRetryPolicy, containerRetryErrorCodes, - containerMaxRetries, containrRetryInterval); + containerMaxRetries, containrRetryInterval, + containerFailuresValidityInterval); ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, myShellEnv, commands, null, allTokens.duplicate(), null, containerRetryContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 9139b08..e7b7ccd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -302,6 +302,9 @@ public Client(Configuration conf) throws Exception { "If container could retry, it specifies max retires"); opts.addOption("container_retry_interval", true, "Interval between each retry, unit is milliseconds"); + opts.addOption("container_failures_validity_interval", true, + "Failures which are out of the time window will not be added to" + + " the number of container retry attempts"); } /** @@ -461,6 +464,10 @@ public boolean init(String[] args) throws ParseException { containerRetryOptions.add("--container_retry_interval " + cliParser.getOptionValue("container_retry_interval")); } + if (cliParser.hasOption("container_failures_validity_interval")) { + containerRetryOptions.add("--container_failures_validity_interval " + + cliParser.getOptionValue("container_failures_validity_interval")); + } return true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java index a5ef70d..316378f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java @@ -165,6 +165,19 @@ public void setRetryInterval(int retryInterval) { builder.setRetryInterval(retryInterval); } + public long getFailuresValidityInterval() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasFailuresValidityInterval()) { + return -1; + } + return p.getFailuresValidityInterval(); + } + + public void setFailuresValidityInterval(long failuresValidityInterval) { + maybeInitBuilder(); + builder.setFailuresValidityInterval(failuresValidityInterval); + } + private ContainerRetryPolicyProto convertToProtoFormat( ContainerRetryPolicy containerRetryPolicy) { return ProtoUtils.convertToProtoFormat(containerRetryPolicy); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index b1ddc2e..be5eeae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -33,6 +34,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -109,8 +111,18 @@ private ContainerMetrics containerMetrics; private static Clock clock = SystemClock.getInstance(); private final ContainerRetryContext containerRetryContext; + private final long failuresValidityInterval; + /* + * Record each finish time for retry attempts if failuresValidityInterval + * is > 0 and maxRetries is not RETRY_FOREVER. If failuresValidityInterval + * is < 0, we just record remain retry attempts and avoid recording each + * finish time. If maxRetries is not RETRY_FOREVER, container continues to + * retry, there is no need to record each finish time. + */ + private List finishTimeForRetryAttempts = new ArrayList<>(); // remaining retries to relaunch container if needed private int remainingRetryAttempts; + private boolean needRecordFinishTimes; private String workDir; private String logDir; @@ -155,6 +167,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT; } this.remainingRetryAttempts = containerRetryContext.getMaxRetries(); + this.failuresValidityInterval = + containerRetryContext.getFailuresValidityInterval(); + this.needRecordFinishTimes = containerRetryContext.getMaxRetries() + != ContainerRetryContext.RETRY_FOREVER && failuresValidityInterval > 0; this.diagnosticsMaxSize = conf.getInt( YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE, YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE); @@ -208,6 +224,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, recoveredCapability.getVirtualCores()); } this.remainingRetryAttempts = rcs.getRemainingRetryAttempts(); + this.finishTimeForRetryAttempts = rcs.getFinishTimeForRetryAttempts(); this.workDir = rcs.getWorkDir(); this.logDir = rcs.getLogDir(); } @@ -970,20 +987,13 @@ public ContainerState transition(final ContainerImpl container, } if (container.shouldRetry(container.exitCode)) { - if (container.remainingRetryAttempts > 0) { - container.remainingRetryAttempts--; - try { - container.stateStore.storeContainerRemainingRetryAttempts( - container.getContainerId(), container.remainingRetryAttempts); - } catch (IOException e) { - LOG.warn( - "Unable to update remainingRetryAttempts in state store for " - + container.getContainerId(), e); - } - } + container.updateAndStoreRetryAttemptsRecords(clock.getTime()); + LOG.info("Relaunching Container " + container.getContainerId() + ". Remaining retry attempts(after relaunch) : " - + container.remainingRetryAttempts + + container.getRemainingRetryAttempts() + + ". Failure validity interval is " + + container.failuresValidityInterval + " milliseconds" + ". Interval between retries is " + container.containerRetryContext.getRetryInterval() + "ms"); container.wasLaunched = false; @@ -1013,6 +1023,45 @@ public void run() { } } + private void updateAndStoreRetryAttemptsRecords(long finishTime) { + if (needRecordFinishTimes) { + finishTimeForRetryAttempts.add(finishTime); + + long currentTime = clock.getTime(); + // remove element which is out of 'failuresValidityInterval' + Iterator iterator = finishTimeForRetryAttempts.iterator(); + while (iterator.hasNext()) { + long time = iterator.next(); + if (currentTime - time >= failuresValidityInterval) { + iterator.remove(); + } else { + break; + } + } + try { + stateStore.storeContainerFinishTimeForRetryAttempts(containerId, + finishTimeForRetryAttempts); + } catch (IOException e) { + LOG.warn( + "Unable to update finishTimeForRetryAttempts in state store for " + + containerId, e); + } + } else { + // update and save 'remainingRetryAttempts' + if (remainingRetryAttempts > 0) { + remainingRetryAttempts--; + try { + stateStore.storeContainerRemainingRetryAttempts(containerId, + remainingRetryAttempts); + } catch (IOException e) { + LOG.warn( + "Unable to update remainingRetryAttempts in state store for " + + containerId, e); + } + } + } + } + @Override public boolean isRetryContextSet() { return containerRetryContext.getRetryPolicy() @@ -1032,13 +1081,29 @@ public boolean shouldRetry(int errorCode) { || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES && containerRetryContext.getErrorCodes() != null && containerRetryContext.getErrorCodes().contains(errorCode))) { - return remainingRetryAttempts > 0 - || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER; + int n = getRemainingRetryAttempts(); + return n > 0 || n == ContainerRetryContext.RETRY_FOREVER; } return false; } + private int getRemainingRetryAttempts() { + if (needRecordFinishTimes) { + int numFailedRetryAttempts = finishTimeForRetryAttempts.size(); + long currentTime = clock.getTime(); + for (long finishTime : finishTimeForRetryAttempts) { + if (currentTime - finishTime < failuresValidityInterval) { + break; + } + numFailedRetryAttempts--; + } + return containerRetryContext.getMaxRetries() - numFailedRetryAttempts; + } else { + return remainingRetryAttempts; + } + } + /** * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST */ @@ -1368,4 +1433,9 @@ private static boolean shouldBeUploadedToSharedCache(ContainerImpl container, LocalResourceRequest resource) { return container.resourcesUploadPolicies.get(resource); } + + @VisibleForTesting + void setClock(Clock targetClock) { + clock = targetClock; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 6e9efe1..7121743 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -112,6 +112,8 @@ private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX = "/remainingRetryAttempts"; + private static final String CONTAINER_RETRY_ATTEMPTS_FIINISH_TIME_SUFFIX = + "/finishTimeForRetryAttempts"; private static final String CONTAINER_WORK_DIR_KEY_SUFFIX = "/workdir"; private static final String CONTAINER_LOG_DIR_KEY_SUFFIX = "/logdir"; @@ -254,6 +256,16 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { rcs.setRemainingRetryAttempts( Integer.parseInt(asString(entry.getValue()))); + } else if (suffix.equals(CONTAINER_RETRY_ATTEMPTS_FIINISH_TIME_SUFFIX)) { + String value = asString(entry.getValue()); + // parse the string format of List, e.g. [34, 21, 22] + String[] finishTimes = + value.substring(1, value.length() - 1).split(", "); + List finishTimeForRetryAttempts = new ArrayList<>(); + for (String finishTime : finishTimes) { + finishTimeForRetryAttempts.add(Long.parseLong(finishTime)); + } + rcs.setFinishTimeForRetryAttempts(finishTimeForRetryAttempts); } else if (suffix.equals(CONTAINER_WORK_DIR_KEY_SUFFIX)) { rcs.setWorkDir(asString(entry.getValue())); } else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) { @@ -380,6 +392,19 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId, } @Override + public void storeContainerFinishTimeForRetryAttempts( + ContainerId containerId, List finishTimeForRetryAttempts) + throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_RETRY_ATTEMPTS_FIINISH_TIME_SUFFIX; + try { + db.put(bytes(key), bytes(finishTimeForRetryAttempts.toString())); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override public void storeContainerWorkDir(ContainerId containerId, String workDir) throws IOException { String key = CONTAINERS_KEY_PREFIX + containerId.toString() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 08b80e9..db7f347 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -105,6 +105,12 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId, } @Override + public void storeContainerFinishTimeForRetryAttempts( + ContainerId containerId, List finishTimeForRetryAttempts) + throws IOException { + } + + @Override public void storeContainerWorkDir(ContainerId containerId, String workDir) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index ccf1e70..dc85a3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -74,6 +74,7 @@ public NMStateStoreService(String name) { StartContainerRequest startRequest; Resource capability; private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID; + private List finishTimeForRetryAttempts; private String workDir; private String logDir; @@ -109,6 +110,15 @@ public void setRemainingRetryAttempts(int retryAttempts) { this.remainingRetryAttempts = retryAttempts; } + public List getFinishTimeForRetryAttempts() { + return finishTimeForRetryAttempts; + } + + public void setFinishTimeForRetryAttempts( + List finishTimeForRetryAttempts) { + this.finishTimeForRetryAttempts = finishTimeForRetryAttempts; + } + public String getWorkDir() { return workDir; } @@ -134,6 +144,8 @@ public String toString() { .append(", Capability: ").append(getCapability()) .append(", StartRequest: ").append(getStartRequest()) .append(", RemainingRetryAttempts: ").append(remainingRetryAttempts) + .append(", FinishTimeForRetryAttempts: ") + .append(finishTimeForRetryAttempts) .append(", WorkDir: ").append(workDir) .append(", LogDir: ").append(logDir) .toString(); @@ -365,6 +377,16 @@ public abstract void storeContainerRemainingRetryAttempts( ContainerId containerId, int remainingRetryAttempts) throws IOException; /** + * Record retry attempts' finish time for a container. + * @param containerId + * @param finishTimeForRetryAttempts + * @throws IOException + */ + public abstract void storeContainerFinishTimeForRetryAttempts( + ContainerId containerId, List finishTimeForRetryAttempts) + throws IOException; + + /** * Record working directory for a container. * @param containerId the container ID * @param workDir the working directory diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index 118bc42..b26908f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -94,6 +95,8 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentMatcher; @@ -725,6 +728,38 @@ private void testContainerRetry(ContainerRetryContext containerRetryContext, } } + @Test + public void testContainerRetryFailureValidityInterval() throws Exception { + ContainerRetryContext containerRetryContext = ContainerRetryContext + .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 1, 0, 10); + WrappedContainer wc = null; + try { + wc = new WrappedContainer(25, 314159265358980L, 4200, "test", + containerRetryContext); + ControlledClock clock = new ControlledClock(); + wc.setClock(clock); + wc.initContainer(); + wc.localizeResources(); + wc.launchContainer(); + wc.containerFailed(12); + assertEquals(ContainerState.RUNNING, wc.c.getContainerState()); + clock.setTime(20); + wc.containerFailed(12); + assertEquals(ContainerState.RUNNING, wc.c.getContainerState()); + clock.setTime(40); + wc.containerFailed(12); + assertEquals(ContainerState.RUNNING, wc.c.getContainerState()); + clock.setTime(45); + wc.containerFailed(12); + assertEquals(ContainerState.EXITED_WITH_FAILURE, + wc.c.getContainerState()); + } finally { + if (wc != null) { + wc.finished(); + } + } + } + private void verifyCleanupCall(WrappedContainer wc) throws Exception { ResourcesReleasedMatcher matchesReq = new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( @@ -1114,5 +1149,9 @@ public int getLocalResourceCount() { public String getDiagnostics() { return c.cloneAndGetContainerStatus().getDiagnostics(); } + + public void setClock(Clock clock) { + ((ContainerImpl)c).setClock(clock); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 4652245..d461893 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -116,6 +116,8 @@ public synchronized void removeApplication(ApplicationId appId) rcsCopy.startRequest = rcs.startRequest; rcsCopy.capability = rcs.capability; rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts()); + rcsCopy.setFinishTimeForRetryAttempts( + rcs.getFinishTimeForRetryAttempts()); rcsCopy.setWorkDir(rcs.getWorkDir()); rcsCopy.setLogDir(rcs.getLogDir()); result.add(rcsCopy); @@ -178,6 +180,14 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId, } @Override + public void storeContainerFinishTimeForRetryAttempts( + ContainerId containerId, List finishTimeForRetryAttempts) + throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.setFinishTimeForRetryAttempts(finishTimeForRetryAttempts); + } + + @Override public void storeContainerWorkDir(ContainerId containerId, String workDir) throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index ccc9254..0eb37aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -338,6 +339,11 @@ public void testContainerStorage() throws IOException { stateStore.storeContainerRemainingRetryAttempts(containerId, 6); stateStore.storeContainerWorkDir(containerId, "/test/workdir"); stateStore.storeContainerLogDir(containerId, "/test/logdir"); + // store finishTimeForRetryAttempts + List finishTimeForRetryAttempts = Arrays.asList(1462700529039L, + 1462700529050L, 1462700529120L); + stateStore.storeContainerFinishTimeForRetryAttempts(containerId, + finishTimeForRetryAttempts); restartStateStore(); recoveredContainers = stateStore.loadContainersState(); assertEquals(1, recoveredContainers.size()); @@ -345,6 +351,10 @@ public void testContainerStorage() throws IOException { assertEquals(6, rcs.getRemainingRetryAttempts()); assertEquals("/test/workdir", rcs.getWorkDir()); assertEquals("/test/logdir", rcs.getLogDir()); + List recoveredFishTimes = rcs.getFinishTimeForRetryAttempts(); + assertEquals(1462700529039L, (long)recoveredFishTimes.get(0)); + assertEquals(1462700529050L, (long)recoveredFishTimes.get(1)); + assertEquals(1462700529120L, (long)recoveredFishTimes.get(2)); // remove the container and verify not recovered stateStore.removeContainer(containerId);